forked from nhat416/skystore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
send.py
66 lines (54 loc) · 1.93 KB
/
send.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from fastapi import FastAPI
import httpx
import json
import asyncio
import argparse
app = FastAPI()
"""
This file will be run as a separate process and will read from the metrics.json file,
and send the metrics to the store server periodically. When you try to establish the clients,
you can include this script into your client script.
"""
@app.post("/send_message/")
async def send_message(message, host):
async with httpx.AsyncClient() as client:
response = await client.post(f"http://{host}:3000/update_metrics", json=message)
return response.json()
# read from metrics.json line by line and pass to the store server using the above function
def read_metrics():
metrics = []
with open("metrics.json", "r") as f:
# read the file line by line
for line in f:
metrics.append(json.loads(line))
return metrics
async def main(server_addr):
last_file_line = 0
sleep_time = 0
while True:
await asyncio.sleep(30)
metrics = read_metrics()
if len(metrics) == last_file_line:
sleep_time += 1
continue
print("new metrics", metrics[last_file_line:])
for metric in metrics[last_file_line:]:
await send_message(
{
"timestamp": metric["timestamp"],
"latency": metric["latency"],
"request_region": metric["request_region"],
"destination_region": metric["destination_region"],
"key": metric["key"],
"size": metric["size"],
"op": metric["op"],
},
server_addr,
)
last_file_line = len(metrics)
# Initialize parser
parser = argparse.ArgumentParser()
# Adding optional argument
parser.add_argument("--server_addr", help = "ip address of the server")
args = parser.parse_args()
asyncio.run(main(args.server_addr))