Skip to content

Commit

Permalink
Create kafka_consumer.py
Browse files Browse the repository at this point in the history
  • Loading branch information
KOSASIH authored Jun 1, 2024
1 parent 2044b36 commit b195fa2
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions data_ingestion/kafka_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from kafka import KafkaConsumer

KAFKA_TOPIC = "data_ingestion"

class KafkaDataConsumer:
def __init__(self, topic, bootstrap_servers="localhost:9092"):
self.consumer = KafkaConsumer(topic,
bootstrap_servers=bootstrap_servers,
value_deserializer=lambda m: json.loads(m.decode("utf-8")))

def consume(self):
for message in self.consumer:
print(message.value)

if __name__ == "__main__":
consumer = KafkaDataConsumer(KAFKA_TOPIC)
consumer.consume()

0 comments on commit b195fa2

Please sign in to comment.