Skip to main content

Check out Port for yourself ➜ 

Kafka

Loading version...

Port's Kafka integration allows you to model Kafka resources in your software catalog and ingest data into them.

Setup

Choose one of the following installation methods: Not sure which method is right for your use case? Check the available installation methods.

Advanced integration configuration

For advanced configuration such as proxies or self-signed certificates, click here.

Cluster config mapping examples

The clusterConfMapping parameter is crucial for connecting to your Kafka clusters. The cluster configuration mapping should be a JSON object with cluster names as keys and Kafka client configurations as values. Each client configuration follows the standard Kafka client properties format.

Below are some examples of how to configure the clusterConfMapping parameter for different Kafka cluster configurations.

Basic SASL_SSL authentication

For clusters with SASL authentication over SSL:

Basic SASL_SSL authentication (click to expand)
integration:
secrets:
clusterConfMapping: |
{
"production-cluster": {
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "your-username",
"sasl.password": "your-password"
},
"staging-cluster": {
"bootstrap.servers": "staging-broker1:9092,staging-broker2:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "staging-username",
"sasl.password": "staging-password"
}
}

Plain text authentication

For clusters without SSL encryption:

Plain text authentication (click to expand)
integration:
secrets:
clusterConfMapping: |
{
"internal-cluster": {
"bootstrap.servers": "internal-broker1:9092,internal-broker2:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "PLAIN",
"sasl.username": "internal-username",
"sasl.password": "internal-password"
}
}

mTLS authentication

For clusters with mutual TLS authentication:

mTLS authentication (click to expand)
integration:
secrets:
clusterConfMapping: |
{
"secure-cluster": {
"bootstrap.servers": "secure-broker1:9092,secure-broker2:9092",
"security.protocol": "SSL",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.certificate.location": "/path/to/client-cert.pem",
"ssl.key.location": "/path/to/client-key.pem",
"ssl.key.password": "key-password"
}
}

Confluent cloud configuration

For Confluent Cloud clusters:

Confluent cloud configuration (click to expand)
integration:
secrets:
clusterConfMapping: |
{
"confluent-cloud-cluster": {
"bootstrap.servers": "pkc-abcd85.us-west-2.aws.confluent.cloud:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "PLAIN",
"sasl.username": "your-api-key",
"sasl.password": "your-api-secret"
}
}

Advanced configuration with custom properties

For clusters requiring additional custom properties:

Advanced configuration with custom properties (click to expand)
integration:
secrets:
clusterConfMapping: |
{
"advanced-cluster": {
"bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.username": "advanced-username",
"sasl.password": "advanced-password",
"ssl.ca.location": "/path/to/ca-cert.pem",
"ssl.endpoint.identification.algorithm": "https",
"request.timeout.ms": 30000,
"session.timeout.ms": 10000,
"enable.auto.commit": false
}
}

You can configure multiple clusters in the same mapping by adding additional cluster configurations as shown in the examples above. Each cluster will be processed independently by the integration.

Security considerations
  • Store sensitive information like passwords and API keys securely using your platform's secret management system.
  • For Kubernetes deployments, use Kubernetes secrets to store the clusterConfMapping value.
  • For CI/CD pipelines, use environment variable encryption features provided by your CI/CD platform.

Configuration

Port integrations use a YAML mapping block to ingest data from the third-party api into Port.

The mapping makes use of the JQ JSON processor to select, modify, concatenate, transform and perform other operations on existing fields and values from the integration API.

Default mapping configuration

This is the default mapping configuration for this integration:

Default mapping configuration (Click to expand)
resources:
- kind: cluster
selector:
query: 'true'
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id
- kind: broker
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name
- kind: topic
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'
- kind: consumer_group
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .group_id
title: .group_id
blueprint: '"kafkaConsumerGroup"'
properties:
state: .state
members: '[.members[].client_id]'
coordinator: .coordinator.id
partition_assignor: .partition_assignor
is_simple_consumer_group: .is_simple_consumer_group
authorized_operations: .authorized_operations
relations:
cluster: .cluster_name

Mapping & examples per resource

Examples of blueprints and the relevant integration configurations:

Monitoring and sync status

To learn more about how to monitor and check the sync status of your integration, see the relevant documentation.