The kafka-transport component of Hub-of-Hubs.
This repo holds the common logic of kafka producer/consumer and is used by the different Hub-of-Hubs components as part of using kafka as transport.
Go to the Contributing guide to learn how to get involved.
To use the producer/consumer wrappers provided, a kafka.ConfigMap must be passed to the initialization functions to configure the clients.
If SSL connection is required, you must call SetCertificate with a Base64-encoded PEM formatted certificate, and set the following keys in the ConfigMap mentioned above:
security.protocol: ssl
ssl.ca.location: the string returned from SetCertificate() call
The certificate would be written to /opt/kafka.
Helpful links:
- Confluent - Consumer Configurations
- Confluent - Producer Configurations
- librdkafka - Configuration
To set up a Kafka cluster we use the Red Hat Integration - AMQ Streams operator (v1.7.2) to install Kafka v2.7.0 on the ACM cluster.
-
Create kafka namespace:
kubectl create namespace kafka
-
Deploy the AMQ streams operator to your cluster (subscription watches kafka namespace):
kubectl apply -f deploy/amq_streams_operator.yaml
-
Deploy Kafka Cluster CR
kubectl apply -f deploy/kafka_cluster.yaml
Result:
- AMQ Streams operator running and watching kafka namespace
- Kafka instance "kafka-brokers-cluster" deployed:
- 3 broker pods
- 3 zookeeper pods
- an internal plaintext listener (port 9092)
- an external TLS secure listener (port 9093)
- status Topic deployed
- spec Topic deployed
Run the following command to fetch the required information (if it doesn't appear than cluster is still being created, need to retry again in several seconds):
kubectl -n kafka get Kafka kafka-brokers-cluster -o json | jq -r '.status.listeners[] | {bootstrapServers, certificates}' | sed 's/\\n/\n/g'
Output:
{
"bootstrapServers": "kafka-brokers-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092",
"certificates": null
}
{
"bootstrapServers": "kafka-brokers-cluster-kafka-external-bootstrap-kafka.apps.veisenbe-hoh2.dev10.red-chesterfield.com:443",
"certificates": [
"-----BEGIN CERTIFICATE-----
...
-----END CERTIFICATE-----
"
]
}
The first entry should be used for clients deployed in the cluster (unsecure connection).
The second entry should be used for clients deployed outside the cluster (TLS protected).