-
Notifications
You must be signed in to change notification settings - Fork 0
/
runproduce.py
35 lines (30 loc) · 1.47 KB
/
runproduce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
import random
import argparse
from myqueue import MyProducer,readProduceLog
parser=argparse.ArgumentParser(description='example: python runproduce.py --id 1 --topics T1:P1 T2:P3 --broker localhost --log_loc ./test')
parser.add_argument('--id', type=int, help='id of producer/consumer', required=True)
parser.add_argument('--topics', type=lambda e: e.split(':'), nargs='+', help='topics to produce/consume', required=True)
parser.add_argument('--broker', type=str, help='broker address ip:port', required=True)
parser.add_argument('--log_loc', type=str, help='log folder', required=True)
args = parser.parse_args()
def produce(pid=1,topics=[['T1', 'P1'], ['T2','*']],host='localhost',log_loc='./test'):
producer = MyProducer(
topics=topics,
host=host)
try:
log_gen=readProduceLog(f'{log_loc}/producer_{pid}.txt')
while True:
for t,p in topics:
topic,part,msg=next(log_gen)
if p=='*':
if t!=topic:
raise Exception("log file:incompitable, make sure msg for topics are in order for a perticular time")
elif t!=topic and p!=part:
raise Exception("log file:incompitable, make sure msg for topics are in order for a perticular time")
producer.send(t,p,msg)
time.sleep(random.random())
except StopIteration:pass
finally:
producer.stop()
produce(args.id,args.topics,args.broker,args.log_loc)