-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathproducer_sub_entry_batch.py
44 lines (33 loc) · 1.32 KB
/
producer_sub_entry_batch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import asyncio
import time
from rstream import AMQPMessage, CompressionType, Producer
STREAM = "my-test-stream"
LOOP = 5000
BATCH = 100
async def publish():
async with Producer("localhost", username="guest", password="guest") as producer:
await producer.create_stream(STREAM, exists_ok=True)
start_time = time.perf_counter()
print("Sending compressed and not compressed messages with sub_entry")
for j in range(LOOP):
messages = []
for i in range(BATCH):
amqp_message = AMQPMessage(
body=bytes("hello: {}".format(i), "utf-8"),
)
messages.append(amqp_message)
# sending with compression
await producer.send_sub_entry(
STREAM, compression_type=CompressionType.Gzip, sub_entry_messages=messages
)
# sending without compression
await producer.send_sub_entry(
STREAM, compression_type=CompressionType.No, sub_entry_messages=messages
)
end_time = time.perf_counter()
print(
f"Sent {LOOP * BATCH * 2} messages in {end_time - start_time:0.4f} seconds {LOOP * BATCH} "
f"compressed and {LOOP * BATCH} not compressed"
)
await producer.close()
asyncio.run(publish())