From b195fa296e6fbae95027c5bf14e4a4b99276c3ac Mon Sep 17 00:00:00 2001 From: KOSASIH Date: Sat, 1 Jun 2024 12:31:08 +0700 Subject: [PATCH] Create kafka_consumer.py --- data_ingestion/kafka_consumer.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 data_ingestion/kafka_consumer.py diff --git a/data_ingestion/kafka_consumer.py b/data_ingestion/kafka_consumer.py new file mode 100644 index 0000000..9a9438a --- /dev/null +++ b/data_ingestion/kafka_consumer.py @@ -0,0 +1,17 @@ +from kafka import KafkaConsumer + +KAFKA_TOPIC = "data_ingestion" + +class KafkaDataConsumer: + def __init__(self, topic, bootstrap_servers="localhost:9092"): + self.consumer = KafkaConsumer(topic, + bootstrap_servers=bootstrap_servers, + value_deserializer=lambda m: json.loads(m.decode("utf-8"))) + + def consume(self): + for message in self.consumer: + print(message.value) + +if __name__ == "__main__": + consumer = KafkaDataConsumer(KAFKA_TOPIC) + consumer.consume()