Aggregate Kafka traces produced by the trace interceptor in order to:
- Enrich each trace with topic, partition, offset and correlationId
- Compute the latency between message produced and consumed
- Compute the over consumption
All aggregated traces are kafka messages sent to the topic _aggregatedTrace.
The trace interceptor produces in the topic _tracing for each record produced and consumed. The kstream kafka-tracing-aggregator enriches the traces and sends them in topic _aggregatedTrace. A kafka connect connect consume _aggregatedTrace and insert document in Elasticsearch. A kibana dashboard provide some visualization.
./bin/tracing-aggregator.sh ./config/config.properties
The kafka configuration file correspond to the standard kafka kafka stream config.
Exemple of configuration file
bootstrap.servers=localhost:9092
application.id=_aggregator-trace-stream
state.dir=/tmp
Export metric with prometheus
JMX exporter can be used to export metrics for Prometheus.
export java_option=-javaagent:/jmx_prometheus_javaagent-0.11.0.jar=8080:/config/prometheus-exporter.yml
./bin/tracing-aggregator.sh ./config/config.properties
docker run -e KAFKATRACE_BOOTSTRAP_SERVERS=kafkaip:9092 -p 8080:8080 ianitrix/kafka-tracing-aggregator:latest
Environment variables
All kafka configuration is done with environment variables prefixed with KAFKATRACE_
All dot is replaced by underscore and the variable name must be in upper case.
All traces are kafka messages sent to the topic _aggregatedTrace.
key:
{
"topic": "test",
"partition": 0,
"offset": 0
}
value:
{
"date": "2020-03-25T15:41:01.365287Z",
"type": "SEND",
"topic": "test",
"partition": 0,
"offset": 0
"correlationId": "af8074bc-a042-46ef-8064-203fa26cd9b3",
"clientId": "myProducer"
}
key:
{
"topic": "test",
"partition": 0,
"offset": 0
}
value:
{
"date": "2020-03-25T15:41:01.966082Z",
"type": "ACK",
"topic": "test",
"partition": 0,
"offset": 0,
"correlationId": "af8074bc-a042-46ef-8064-203fa26cd9b3",
"clientId": "myProducer",
"durationMs": 600
}
key:
{
"topic": "test",
"partition": 0,
"offset": 0
}
value:
{
"date": "2020-03-25T15:41:37.844039Z",
"type": "CONSUME",
"topic": "test",
"partition": 0,
"offset": 0,
"correlationId": "af8074bc-a042-46ef-8064-203fa26cd9b3",
"clientId": "myConsumer",
"groupId": "myGroup",
"durationMs": 36478
}
key:
{
"topic": "test",
"partition": 0,
"offset": 0
}
value:
{
"date": "2020-03-25T15:41:37.851339Z",
"type": "COMMIT",
"topic": "test",
"partition": 0,
"offset": 0,
"correlationId": "af8074bc-a042-46ef-8064-203fa26cd9b3",
"clientId": "myConsumer",
"groupId": "myGroup",
"durationMs": 7
}
Docker-compose files provide a full environment with kafka/Elasticsearch/kibana and a simple producer/consumer.
There are 2 docker-composes file:
- Kafka environment and raw trace
First, start the docker-compose of the Test section explain in trace interceptor Git Project.
- Elasticsearch environment and aggregated trace
cd src/test/resources
docker-compose up -d
This docker-compose file start kafka connect, Elasticsearch and Kibana. Then, it will start a connector that indexes aggregated trace inside Elasticsearch. It will also load the Elasticsearch dashboard
Then open http://localhost:5601/ or http://localhost:8080/