-
Notifications
You must be signed in to change notification settings - Fork 8
/
progressReporter.py
57 lines (49 loc) · 1.9 KB
/
progressReporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import threading
import configparser
import requests
import queue
import json
import consts
import time
import typing
from config import get_endpoint
from logger import log
class ProgressReporter(threading.Thread):
def __init__(self, queue_size: int, conf: configparser.ConfigParser) -> None:
super(ProgressReporter, self).__init__()
self.http = requests.Session()
self.conf = conf
self.queue: queue.Queue = queue.Queue(maxsize=queue_size)
self._poison_pill = object()
def send(self, job: typing.Any, result: typing.Any) -> None:
path = "analysis/%s" % job["work"]["id"]
data = json.dumps(result).encode("utf-8")
try:
self.queue.put_nowait((path, data))
except queue.Full:
log.debug(
"Could not keep up with progress reports. Dropping one.")
def stop(self) -> None:
while not self.queue.empty():
self.queue.get_nowait()
self.queue.put(self._poison_pill)
def run(self) -> None:
while True:
item = self.queue.get()
if item == self._poison_pill:
return
path, data = item
try:
response = self.http.post(get_endpoint(self.conf, path),
data=data,
timeout=consts.HTTP_TIMEOUT)
if response.status_code == 429:
log.error(
"Too many requests. Suspending progress reports for 60s ...")
time.sleep(60.0)
elif response.status_code != 204:
log.error(
"Expected status 204 for progress report, got %d", response.status_code)
except requests.RequestException as err:
log.warning(
"Could not send progress report (%s). Continuing.", err)