- Create a Storage account and get the connection string (under Access Keys).
- Under Containers, we create a
tieredstorage
container.
We mount a credentials file containing the connectionString
into the container and
we add to the broker configuration:
KAFKA_CONFLUENT_TIER_FEATURE: "true"
KAFKA_CONFLUENT_TIER_METADATA_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_TIER_TOPIC_DELETE_CHECK_INTERVAL: 60000
## Azure Blob Storage
KAFKA_CONFLUENT_TIER_BACKEND: AzureBlockBlob
KAFKA_CONFLUENT_TIER_AZURE_BLOCK_BLOB_CONTAINER: "tieredstorage"
KAFKA_CONFLUENT_TIER_AZURE_BLOCK_BLOB_PREFIX: "ts-demo"
KAFKA_CONFLUENT_TIER_AZURE_BLOCK_BLOB_CRED_FILE_PATH: /home/appuser/credentials.txt
kafka-topics --bootstrap-server localhost:9092 \
--create --topic test-topic \
--partitions 1 \
--replication-factor 1 \
--config confluent.tier.enable=true \
--config confluent.tier.local.hotset.ms=30000 \
--config segment.bytes=2000000
kafka-producer-perf-test \
--producer-props bootstrap.servers=localhost:9092 \
--topic test-topic \
--record-size 1000 \
--throughput 1000 \
--num-records 3600000
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
kafka-topics \
--bootstrap-server localhost:9092 \
--delete \
--topic test-topic \
The files in the Blob storage are deleted after some period (confluent.tier.topic.delete.check.interval.ms
= 1 min) too.
With CP 7.6, we also support Tiered Storage for compacted topics, see the documentation. We add to the broker configuration:
## Compacted Topic
KAFKA_CONFLUENT_TIER_CLEANER_FEATURE_ENABLE: "true"
KAFKA_CONFLUENT_TIER_CLEANER_ENABLE: "true"
and restart it.
kafka-topics --bootstrap-server localhost:9092 \
--create --topic test-topic-compacted \
--partitions 1 \
--replication-factor 1 \
--config cleanup.policy="compact" \
--config confluent.tier.enable=true \
--config confluent.tier.local.hotset.ms=30000 \
--config segment.bytes=2000000
kafka-producer-perf-test
produces messages without a key. Therefore, we need to run a manual Kafka Producer via
./gradlew run