This session will guide you through setting up and running a Kafka project using sbt and Python. You will also perform exercises to explore key Kafka concepts.
Use the Giter8 template to bootstrap your Kafka project:
sbt new osekoo/kafka-py.g8
Follow the appropriate guide for your operating system:
- Windows: Install sbt on Windows
- macOS: Use Homebrew:
brew install sbt
- Linux: Use your package manager:
- Ubuntu/Debian:
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x99E82A75642AC823" | sudo apt-key add sudo apt update sudo apt install sbt
- Ubuntu/Debian:
From the project folder, run:
./kafka-start
Install required Python packages:
python -m pip install -r requirements.txt
Run the consumer to start listening for messages:
python consumer.py
Send messages to the Kafka topic using:
python producer.py
Observe the messages processed by the consumer in the terminal.
Open your browser and navigate to:
http://localhost:9094
Use the Kafka dashboard to inspect:
- Topics
- Offsets
- Partitions
- Message content
Modify the consumer code to specify a consumer group. Update the consumer.py
file with a group_id
in the KafkaConsumer configuration:
consumer = KafkaConsumer(
TOPIC_NAME,
bootstrap_servers=BROKER,
value_deserializer=data_deserializer,
auto_offset_reset='earliest'
# Add group_id
)
- Save the updated code.
- Re-run the consumer:
python consumer.py
- What happens to the consumer output when re-run?
- If multiple consumers are started with the same group ID, how are messages distributed?
- Kafka ensures messages are distributed among consumers in the same group. Each partition will be processed by only one consumer in the group.
Update the producer to publish numbers with a key where the key is n % m
(where m
is the number of partitions). Modify producer.py
as follows:
def send(producer, size, partitions):
for i in range(size):
key = str(i % partitions).encode('utf-8') # Create key
message = {'message': i}
print(f'Sending: {message} with key: {key}')
producer.send(TOPIC_NAME,
value=message
# specify the key
) # Send with key
time.sleep(1)
producer.flush()
- Define the number of partitions (
m
) based on your topic configuration. - Save the updated code.
- Re-run the producer:
python producer.py
- How do messages appear in the Kafka dashboard?
- Are messages evenly distributed across partitions?
- Messages are partitioned based on the hash of the key.
- For a given key (e.g.,
n % m
), all messages will go to the same partition. - Check the Kafka dashboard to verify message repartitioning.