-
Notifications
You must be signed in to change notification settings - Fork 0
/
file_queue.py
109 lines (90 loc) · 2.99 KB
/
file_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import time, random, marshal, os
MAX_AGE = 5
class FileQueue:
def __init__(self, est_time=60, id=None):
queue:list = self.load()
self.id = random.randint(0, 2**16) if id is None else id
self.est_time = est_time
self.start = time.time()
queue.append((self.id, self.est_time, self.start, self.start))
self.save(queue)
def load(self) -> list:
if not os.path.exists("queue"):
self.save([])
try:
with open("queue", "rb") as f:
return marshal.load(f)
except EOFError:
time.sleep(random.random())
return self.load()
def save(self, queue:list):
try:
with open("queue", "wb") as f:
marshal.dump(queue, f)
except OSError:
time.sleep(random.random())
self.save(queue)
def heartbeat(self):
queue = self.load()
for i, q in enumerate(queue):
if q[0] == self.id:
queue[i] = (self.id, self.est_time, self.start, time.time())
break
self.save(queue)
def should_run(self) -> bool:
queue = self.load()
queue = [q for q in queue if q[3] > time.time() - MAX_AGE and q[2] < self.start]
queue.sort(key=lambda x: x[2])
if len(queue) == 0:
return True
return queue[0][0] == self.id # First in queue
def update_est_time(self, est_time:float):
queue = self.load()
for i, q in enumerate(queue):
if q[0] == self.id:
queue[i] = (self.id, est_time, self.start, time.time())
break
self.save(queue)
def get_queue_len(self) -> int:
queue = self.load()
count = 0
for q in queue:
if q[3] > time.time() - MAX_AGE and q[2] < self.start:
count += 1
return count
def get_queue_est_time(self) -> float:
queue = self.load()
count = 0
for q in queue:
if q[3] > time.time() - MAX_AGE and q[2] < self.start:
count += q[1]
return count
def quit(self):
queue = self.load()
for i, q in enumerate(queue):
if q[0] == self.id:
del queue[i]
break
self.save(queue)
def __del__(self):
self.quit()
if __name__ == '__main__':
import threading
def test(worker_id):
q = FileQueue()
# Wait to be first in queue
while not q.should_run():
time.sleep(1)
q.heartbeat()
# Do stuff
print(f"Worker {worker_id} started")
for i in range(10):
time.sleep(1)
q.heartbeat()
print(f"Worker {worker_id} progress: {i + 1}/10")
# Leave queue
print(f"Worker {worker_id} finished")
q.quit()
for i in range(5):
threading.Thread(target=test, args=(i,)).start()
time.sleep(0.123)