Confluent Platform 7.7 and CFK 2.9.0 released OAuth support. This repository sets up a basic CP cluster via CFK running on Azure Kubernetes Service (AKS) enforcing external clients to use OAuth for authentication with Azure AD (Entra ID) as the identity provider.
In technical detail it deploys:
- 1 KraftController
- 3 Kafka brokers
- 1 Producer application (client)
- 1 Control Center
General resources:
- Quickstart: Deploy an Azure Kubernetes Service (AKS) cluster using Azure portal
- Confluent for Kubernetes Quick Start
- CP OAuth documentation
This repositry only focusses on Confluent Platform, not Confluent Cloud. For an example for OAuth with Confluent Cloud check out
For the later configuration we need to set the token_endpoint
, the jwks_uri
, and the issuer
We can obtain all information via
curl<tenant-id>/v2.0/.well-known/openid-configuration | jq
Generally, those are
token_endpoint =<tenant-id>/oauth2/v2.0/token
jwks_uri =<tenant-id>/discovery/v2.0/keys
issuer =<tenant-id>/v2.0
To retrieve the JWT token, CP is using the client credentials grant flow. So, we need to register an application in Azure AD and create a secret. We can get a JWT token via:
curl -X POST -H "Content-Type: application/x-www-form-urlencoded" \
-d 'client_id=[client_id]&client_secret=[client_secret value]&grant_type=client_credentials' \[tenant_id]/oauth2/token
In this example, we only register one application in Azure AD. Consider different applications with its secret per CP component and client.
Store the clientId and secret in a file and deploy it as a k8s secret.
kubectl create -n confluent secret generic oauth-jass --from-file=oauth.txt=client-credentials.txt
Afterwards, we configure the Kafka CR
type: oauth
secretRef: oauth-jass
groupsClaimName: groups
subClaimName: sub
audience: <client-id>
expectedIssuer: see above
jwksEndpointUri: see above
tokenEndpointUri: see above
- required;
Once the confluent-operator
is running, we deploy the cluster with
kubectl apply -f ./cluster.yaml -n confluent
We develop a simple producer application producing events in a for loop. We set the configuration to:
settings.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
settings.setProperty(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER")
settings.setProperty(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, "15000")
settings.setProperty(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, "<tenant-id>/oauth2/v2.0/token")
settings.setProperty(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "")
settings.setProperty(SaslConfigs.SASL_JAAS_CONFIG, " required clientId='<client-id>' clientSecret='<client-secret' scope='<Azure client id of the broker application>/.default';")
Note, that we need to add the scope to the sasl.jaas.config
which is <Azure client id of the broker application>/.default
To deploy the Producer application in the AKS cluster, we need to build and push an image to the Azure Container Registry (ACR) and integrate it with the AKS cluster.
# Login to Container Registry
az login
az acr login --name <acr-name>
# Attach Container Registry to AKS cluster
az aks update --name <aks-name> --resource-group <rg-group> --attach-acr <aks-name>
# Check if the attachment was successful
az aks check-acr --resource-group <rg-group> --name <aks-name> --acr <aks-name>
We build the image via
# Build fatJar
./gradlew fatJar
# Build image (fatJar needs to be located in main folder)
docker build -t kafkaproducer .
# Tag the built image
docker tag kafkaproducer <aks-name>
# Push the image to the registry via
docker push <aks-name>
Deploy the Kafka Producer via
kubectl apply -f ./KafkaProducer/Deployment.yaml -n confluent
If everything runs successfully we should obtain similar logs
16:44:18.015 [Thread-0] INFO - Successfully logged in.
16:44:18.042 [kafka-expiring-relogin-thread-4113761a-d187-402f-a48a-8ae0e86f78dd] INFO - [Principal=:4113761a-d187-402f-a48a-8ae0e86f78dd]: Expiring credential re-login thread started.
16:44:18.042 [kafka-expiring-relogin-thread-4113761a-d187-402f-a48a-8ae0e86f78dd] INFO - [Principal=4113761a-d187-402f-a48a-8ae0e86f78dd]: Expiring credential valid from 2024-08-18T16:39:17.000+0000 to 2024-08-18T17:44:17.000+0000
16:44:18.042 [kafka-expiring-relogin-thread-4113761a-d187-402f-a48a-8ae0e86f78dd] INFO - [Principal=:4113761a-d187-402f-a48a-8ae0e86f78dd]: Expiring credential re-login sleeping until: 2024-08-18T17:34:15.583+0000
16:44:18.066 [Thread-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.7.0
16:44:18.067 [Thread-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 2ae524ed625438c5
16:44:18.067 [Thread-0] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1723999458064
16:44:18.326 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: 3b658def-2b27-4765-b0a
16:44:18.440 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 3000 with epoch 0
16:44:28.071 [Thread-0] INFO KafkaProducer - Kafka Producer started
16:44:28.115 [kafka-producer-network-thread | producer-1] INFO KafkaProducer - event produced to test-topic
Finally, we would like to see the produced events also in C3. We need to add OAuth configurations as well to authenticate to the Kafka brokers.
bootstrapEndpoint: broker.confluent.svc.cluster.local:9092
type: oauth
secretRef: oauth-jass
tokenEndpointUri: see above
scope: <Azure client id of the broker application>/.default>
# Update cluster
kubectl apply -f ./cluster.yaml -n confluent
# Port forward C3
kubectl port-forward controlcenter-0 9021:9021
We access C3 via localhost:9021
and see the produced events: