Skip to content

Commit

Permalink
Create kafka_producer.py
Browse files Browse the repository at this point in the history
  • Loading branch information
KOSASIH authored Jun 1, 2024
1 parent 632fa72 commit 2044b36
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions data_ingestion/kafka_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import time
from kafka import KafkaProducer

class KafkaDataProducer:
def __init__(self, topic, bootstrap_servers="localhost:9092"):
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"))
self.topic = topic

def send_message(self, message):
self.producer.send(self.topic, message)
self.producer.flush()

def close(self):
self.producer.close()

if __name__ == "__main__":
producer = KafkaDataProducer(KAFKA_TOPIC)
while True:
data = {"sensor_id": 1, "temperature": 25.5, "timestamp": int(time.time())}
producer.send_message(data)
time.sleep(1)

0 comments on commit 2044b36

Please sign in to comment.