Kafka
Port's Kafka integration allows you to model Kafka resources in your software catalog and ingest data into them.
Setup
Choose one of the following installation methods: Not sure which method is right for your use case? Check the available installation methods.
For advanced configuration such as proxies or self-signed certificates, click here.
Cluster config mapping examples
The clusterConfMapping parameter is crucial for connecting to your Kafka clusters.
The cluster configuration mapping should be a JSON object with cluster names as keys and Kafka client configurations as values. Each client configuration follows the standard Kafka client properties format.
Below are some examples of how to configure the clusterConfMapping parameter for different Kafka cluster configurations.
Basic SASL_SSL authentication
For clusters with SASL authentication over SSL:
Basic SASL_SSL authentication (click to expand)
- Helm/ArgoCD
- CI/CD Environment Variable
integration:
secrets:
clusterConfMapping: |
{
"production-cluster": {
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "your-username",
"sasl.password": "your-password"
},
"staging-cluster": {
"bootstrap.servers": "staging-broker1:9092,staging-broker2:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "staging-username",
"sasl.password": "staging-password"
}
}
export OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING='{
"production-cluster": {
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "your-username",
"sasl.password": "your-password"
},
"staging-cluster": {
"bootstrap.servers": "staging-broker1:9092,staging-broker2:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "staging-username",
"sasl.password": "staging-password"
}
}'
Plain text authentication
For clusters without SSL encryption:
Plain text authentication (click to expand)
- Helm/ArgoCD
- CI/CD Environment Variable
integration:
secrets:
clusterConfMapping: |
{
"internal-cluster": {
"bootstrap.servers": "internal-broker1:9092,internal-broker2:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"sasl.username": "internal-username",
"sasl.password": "internal-password"
}
}
export OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING='{
"internal-cluster": {
"bootstrap.servers": "internal-broker1:9092,internal-broker2:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"sasl.username": "internal-username",
"sasl.password": "internal-password"
}
}'
mTLS authentication
For clusters with mutual TLS authentication:
mTLS authentication (click to expand)
- Helm/ArgoCD
- CI/CD Environment Variable
integration:
secrets:
clusterConfMapping: |
{
"secure-cluster": {
"bootstrap.servers": "secure-broker1:9092,secure-broker2:9092",
"security.protocol": "SSL",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.certificate.location": "/path/to/client-cert.pem",
"ssl.key.location": "/path/to/client-key.pem",
"ssl.key.password": "key-password"
}
}
export OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING='{
"secure-cluster": {
"bootstrap.servers": "secure-broker1:9092,secure-broker2:9092",
"security.protocol": "SSL",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.certificate.location": "/path/to/client-cert.pem",
"ssl.key.location": "/path/to/client-key.pem",
"ssl.key.password": "key-password"
}
}'
Confluent cloud configuration
For Confluent Cloud clusters:
Confluent cloud configuration (click to expand)
- Helm/ArgoCD
- CI/CD Environment Variable
integration:
secrets:
clusterConfMapping: |
{
"confluent-cloud-cluster": {
"bootstrap.servers": "pkc-abcd85.us-west-2.aws.confluent.cloud:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-api-key",
"sasl.password": "your-api-secret"
}
}
export OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING='{
"confluent-cloud-cluster": {
"bootstrap.servers": "pkc-abcd85.us-west-2.aws.confluent.cloud:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-api-key",
"sasl.password": "your-api-secret"
}
}'
Advanced configuration with custom properties
For clusters requiring additional custom properties:
Advanced configuration with custom properties (click to expand)
- Helm/ArgoCD
- CI/CD Environment Variable
integration:
secrets:
clusterConfMapping: |
{
"advanced-cluster": {
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "advanced-username",
"sasl.password": "advanced-password",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.endpoint.identification.algorithm": "https",
"request.timeout.ms": 30000,
"session.timeout.ms": 10000,
"enable.auto.commit": false
}
}
export OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING='{
"advanced-cluster": {
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "advanced-username",
"sasl.password": "advanced-password",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.endpoint.identification.algorithm": "https",
"request.timeout.ms": 30000,
"session.timeout.ms": 10000,
"enable.auto.commit": false
}
}'
You can configure multiple clusters in the same mapping by adding additional cluster configurations as shown in the examples above. Each cluster will be processed independently by the integration.
- Store sensitive information like passwords and API keys securely using your platform's secret management system.
- For Kubernetes deployments, use Kubernetes secrets to store the
clusterConfMappingvalue. - For CI/CD pipelines, use environment variable encryption features provided by your CI/CD platform.
Configuration
Port integrations use a YAML mapping block to ingest data from the third-party api into Port.
The mapping makes use of the JQ JSON processor to select, modify, concatenate, transform and perform other operations on existing fields and values from the integration API.
Default mapping configuration
This is the default mapping configuration for this integration:
Default mapping configuration (Click to expand)
resources:
- kind: cluster
selector:
query: 'true'
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id
- kind: broker
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name
- kind: topic
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'
- kind: consumer_group
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .group_id
title: .group_id
blueprint: '"kafkaConsumerGroup"'
properties:
state: .state
members: '[.members[].client_id]'
coordinator: .coordinator.id
partition_assignor: .partition_assignor
is_simple_consumer_group: .is_simple_consumer_group
authorized_operations: .authorized_operations
relations:
cluster: .cluster_name
Mapping & examples per resource
Examples of blueprints and the relevant integration configurations:
Monitoring and sync status
To learn more about how to monitor and check the sync status of your integration, see the relevant documentation.