-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkers_asyncio.py
93 lines (69 loc) · 2.31 KB
/
workers_asyncio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import asyncio
from random import random
import sys
from easy_timing import timer
class Factory:
"""
Factory
"""
def __init__(self, max_workers, max_tasks):
self.task_queued = 0
self.task_completed = 0
self.max_workers = max_workers
self.max_tasks = max_tasks
self.queue = asyncio.Queue()
def print_ident(self, string, task_id):
print((task_id.count(".") + 1) * " " + string)
async def task(self, task_id):
"""Task
Arguments:
task_id {string} --
"""
sleep_time = 0.5 + random()
self.print_ident("Begin task #{}".format(task_id), task_id)
await asyncio.sleep(sleep_time)
if self.task_queued < self.max_tasks:
await self.queue.put(task_id + ".1")
self.task_queued += 1
if self.task_queued < self.max_tasks:
await self.queue.put(task_id + ".2")
self.task_queued += 1
self.task_completed += 1
self.print_ident(
"End task #{} ({} in queue, {} completed)".format(
task_id, self.queue.qsize(), self.task_completed
),
task_id,
)
async def worker(self, worker_id):
"""Worker
Arguments:
worker_id {int} --
"""
while True:
task_id = await self.queue.get()
print("Worker #{} takes charge of task {}".format(worker_id, task_id))
await self.task(task_id)
self.queue.task_done()
async def organize_work(self):
print("Begin work \n")
await self.queue.put("1") # We add one task to the queue to start
self.task_queued += 1
workers = [
asyncio.create_task((self.worker(worker_id + 1)))
for worker_id in range(self.max_workers)
]
await self.queue.join()
print("Queue is empty, {} tasks completed".format(self.task_queued))
for w in workers:
w.cancel()
print("\nEnd work")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
factory = Factory(max_workers=10, max_tasks=100)
try:
with timer():
loop.run_until_complete(factory.organize_work())
except KeyboardInterrupt:
print("\nBye bye")
sys.exit(0)