-
Notifications
You must be signed in to change notification settings - Fork 0
/
Producer.py
40 lines (31 loc) · 1.03 KB
/
Producer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from kafka import KafkaClient
from kafka import SimpleProducer
import json
class Producer:
def __init__(self,semaphore):
# Setting producer variables.
self.port = 9092
self.IP = 'localhost'
self.topic = 'test'
def setPort(self, port):
self.port = port
def setIP(self, ID):
self.IP = ID
def setTopic(self, topic):
self.topic = topic
def getPort(self,):
return(self.port)
def getID(self):
return (self.IP)
def getTopic(self):
return (self.topic)
def startConnection(self):
idPlusPort = self.IP + ":" + str(self.port)
kafka = KafkaClient(idPlusPort)
self.producer = SimpleProducer(kafka, async=True)
def sendMessage(self, msg):
print("Sending messages.....")
msg_jason = json.dumps(msg, default=lambda o: o.__dict__)
msg_jason_string = str(msg_jason)
msg_jason_string_bytes = str.encode(msg_jason_string)
self.producer.send_messages(self.getTopic(), msg_jason_string_bytes)