-
Notifications
You must be signed in to change notification settings - Fork 0
/
tracker.py
249 lines (191 loc) · 7.75 KB
/
tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from typing import Iterable, Set, Mapping
import aioredis
import asyncio
import collections
import logging
from util import grouper
class ItemTracker(object):
_logger: logging.Logger
crawl_manager: 'CrawlManager'
def __init__(self):
self._logger = logging.getLogger(self.__class__.__name__)
# self.crawl_manager is set by the CrawlManager itself when it is initialized
async def async_init(self):
"""
Perform any late initialization asynchronously
"""
raise NotImplementedError()
async def add_items(self, items: Iterable[str]):
"""
Add `items` to the set of all known items, and add any items
not previously known to the set of unexplored items.
"""
raise NotImplementedError()
async def mark_explored(self, items: Iterable[str]):
"""
Mark `items` as explored.
"""
raise NotImplementedError()
async def get_worker_id(self):
"""
Create a new, unique worker id used for tracking what each
worker is currently assigned.
"""
raise NotImplementedError()
async def crawl_done(self) -> bool:
"""
Determine if the crawl is entirely finished.
"""
raise NotImplementedError()
async def checkout_work(self, worker_id: int, n: int) -> Set[str]:
"""
Checkout approximately `n` items to the given `worker_id`,
returning a set of all work the worker should do.
"""
raise NotImplementedError()
async def mark_work_finished(self, worker_id: int, work: Set[str]):
"""
Mark that the given `worker_id` has finished items in `work`.
"""
raise NotImplementedError()
async def shutdown(self):
"""
Method called when the Tracker is shutting down.
Useful to close sockets, files, etc.
"""
raise NotImplementedError()
class InMemoryTracker(ItemTracker):
all_items: Set[str]
unexplored_items: Set[str]
last_worker_id: int
last_id_lock: asyncio.Lock
assigned_work: Mapping[int, Set[str]]
def __init__(self):
super().__init__()
self.all_items = set()
self.unexplored_items = set()
self.last_worker_id = 0
self.last_id_lock = asyncio.Lock()
self.assigned_work = collections.defaultdict(set)
async def async_init(self):
pass
async def add_items(self, items):
new_items = items - self.all_items
self.all_items.update(new_items)
self.unexplored_items.update(new_items)
async def mark_explored(self, items):
self.unexplored_items.difference_update(items)
async def get_worker_id(self):
async with self.last_id_lock:
self.last_worker_id += 1
return self.last_worker_id
async def crawl_done(self):
return len(self.unexplored_items) == 0 and all(len(assigned) == 0 for assigned in self.assigned_work.values())
async def checkout_work(self, worker_id, n):
for _ in range(n):
if len(self.unexplored_items) == 0:
break
self.assigned_work[worker_id].add(self.unexplored_items.pop())
return self.assigned_work[worker_id]
async def mark_work_finished(self, worker_id, work):
assert work.issubset(self.assigned_work[worker_id])
self.assigned_work[worker_id].difference_update(work)
async def shutdown(self):
pass
class RedisTracker(ItemTracker):
_redis_address: str
crawl_manager: 'CrawlManager'
def __init__(self, redis_address: str):
super().__init__()
self._redis_address = redis_address
async def async_init(self):
self._redis = await aioredis.create_redis_pool(self._redis_address, minsize=1, maxsize=4)
await self.clear()
async def clear(self):
self._logger.info(f"Clearing all items in {self._items_key}")
await self._redis.delete(self._items_key)
self._logger.info(f"Clearing unexplored items in {self._unexplored_key}")
await self._redis.delete(self._unexplored_key)
self._logger.info(f"Resetting worker id counter in {self._worker_id_key}")
await self._redis.set(self._worker_id_key, 0)
self._logger.info(f"Resetting temporary id counter in {self._temp_id_key}")
await self._redis.set(self._temp_id_key, 0)
for worker_k in await self._redis.keys(self._checked_out_work_key('*')):
self._logger.info(f"Clearing items checked out by worker in {worker_k}")
await self._redis.delete(worker_k)
def _keyname(self, elem: str):
"""
Return a key name for `elem` unique to this crawl.
"""
return f"{self.crawl_manager.name}_{elem}"
@property
def _items_key(self):
"""
The set key used to store all known items.
"""
return self._keyname('all_items')
@property
def _unexplored_key(self):
"""
The set key used to store all unexplored items.
"""
return self._keyname('unexplored')
@property
def _worker_id_key(self):
"""
The counter key used to generate worker IDs.
"""
return self._keyname('worker_id')
@property
def _temp_id_key(self):
"""
The counter key used to generate temporary set names.
"""
return self._keyname('temp_id')
def _checked_out_work_key(self, worker_id):
"""
The set key used to store all items checked out to the given
`worker_id`.
"""
return self._keyname(f'checked_out_{worker_id}')
async def add_items(self, items):
temp_id = await self._redis.incr(self._temp_id_key)
temp_key = self._keyname(f'temp_{temp_id}')
# Load all items we're adding into a temp table
for some_items in grouper(1000, items):
await self._redis.sadd(temp_key, *some_items)
# Find the new items (i.e. items not already in all_items)
await self._redis.sdiffstore(temp_key, temp_key, self._items_key)
# Save new items into all_items and unexplored
await self._redis.sunionstore(self._items_key, self._items_key, temp_key)
await self._redis.sunionstore(self._unexplored_key, self._unexplored_key, temp_key)
# And clean up after ourselves
await self._redis.delete(temp_key)
async def mark_explored(self, items):
for some_items in grouper(1000, items):
await self._redis.srem(self._unexplored_key, *some_items)
async def get_worker_id(self):
return await self._redis.incr(self._worker_id_key)
async def crawl_done(self):
unexplored_len = await self._redis.scard(self._unexplored_key)
if unexplored_len > 0:
return False
for worker_k in await self._redis.keys(self._checked_out_work_key('*')):
checked_out = await self._redis.scard(worker_k)
if checked_out > 0:
return False
return True
async def checkout_work(self, worker_id, n):
worker_work_set = self._checked_out_work_key(worker_id)
items = await self._redis.srandmember(self._unexplored_key, n)
for item in items:
await self._redis.smove(self._unexplored_key, worker_work_set, item)
return {item.decode('utf-8') for item in await self._redis.smembers(worker_work_set)}
async def mark_work_finished(self, worker_id, work):
worker_work_set = self._checked_out_work_key(worker_id)
checked_out_work = {item.decode('utf-8') for item in await self._redis.smembers(worker_work_set)}
assert work.issubset(checked_out_work)
await self._redis.srem(worker_work_set, *checked_out_work)
async def shutdown(self):
self._redis.close()
await self._redis.wait_closed()