-
-
Notifications
You must be signed in to change notification settings - Fork 719
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,6 +115,7 @@ | |
Execute, | ||
ExecuteFailureEvent, | ||
ExecuteSuccessEvent, | ||
FindMissingEvent, | ||
GatherDep, | ||
GatherDepDoneEvent, | ||
Instructions, | ||
|
@@ -123,6 +124,8 @@ | |
MissingDataMsg, | ||
Recs, | ||
RecsInstrs, | ||
RefreshWhoHasEvent, | ||
RefreshWhoHasMsg, | ||
ReleaseWorkerDataMsg, | ||
RescheduleEvent, | ||
RescheduleMsg, | ||
|
@@ -798,6 +801,7 @@ def __init__( | |
"compute-task": self.handle_compute_task, | ||
"free-keys": self.handle_free_keys, | ||
"remove-replicas": self.handle_remove_replicas, | ||
"refresh-who-has": self.handle_refresh_who_has, | ||
"steal-request": self.handle_steal_request, | ||
"worker-status-change": self.handle_worker_status_change, | ||
} | ||
|
@@ -1822,6 +1826,13 @@ def handle_remove_replicas(self, keys: list[str], stimulus_id: str) -> str: | |
|
||
return "OK" | ||
|
||
def handle_refresh_who_has( | ||
self, who_has: dict[str, list[str]], stimulus_id: str | ||
) -> None: | ||
self.handle_stimulus( | ||
RefreshWhoHasEvent(who_has=who_has, stimulus_id=stimulus_id) | ||
) | ||
|
||
async def set_resources(self, **resources) -> None: | ||
for r, quantity in resources.items(): | ||
if r in self.total_resources: | ||
|
@@ -3423,22 +3434,19 @@ def done_event(): | |
) | ||
) | ||
recommendations[ts] = "fetch" | ||
del data, response | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
crusaderky
Author
Collaborator
|
||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
self._handle_instructions(instructions) | ||
|
||
if refresh_who_has: | ||
# All workers that hold known replicas of our tasks are busy. | ||
# Try querying the scheduler for unknown ones. | ||
who_has = await retry_operation( | ||
self.scheduler.who_has, keys=refresh_who_has | ||
) | ||
refresh_stimulus_id = f"refresh-who-has-{time()}" | ||
recommendations, instructions = self._update_who_has( | ||
who_has, stimulus_id=refresh_stimulus_id | ||
instructions.append( | ||
RefreshWhoHasMsg( | ||
keys=list(refresh_who_has), | ||
stimulus_id=f"gather-dep-busy-{time()}", | ||
) | ||
) | ||
self.transitions(recommendations, stimulus_id=refresh_stimulus_id) | ||
self._handle_instructions(instructions) | ||
|
||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
self._handle_instructions(instructions) | ||
|
||
@log_errors | ||
def _readd_busy_worker(self, worker: str) -> None: | ||
|
@@ -3448,40 +3456,20 @@ def _readd_busy_worker(self, worker: str) -> None: | |
) | ||
|
||
@log_errors | ||
async def find_missing(self) -> None: | ||
if not self._missing_dep_flight: | ||
return | ||
try: | ||
if self.validate: | ||
for ts in self._missing_dep_flight: | ||
assert not ts.who_has | ||
|
||
stimulus_id = f"find-missing-{time()}" | ||
who_has = await retry_operation( | ||
self.scheduler.who_has, | ||
keys=[ts.key for ts in self._missing_dep_flight], | ||
) | ||
recommendations, instructions = self._update_who_has( | ||
who_has, stimulus_id=stimulus_id | ||
) | ||
for ts in self._missing_dep_flight: | ||
if ts.who_has: | ||
assert ts not in recommendations | ||
recommendations[ts] = "fetch" | ||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
self._handle_instructions(instructions) | ||
def find_missing(self) -> None: | ||
self.handle_stimulus(FindMissingEvent(stimulus_id=f"find-missing-{time()}")) | ||
|
||
finally: | ||
# This is quite arbitrary but the heartbeat has scaling implemented | ||
self.periodic_callbacks[ | ||
"find-missing" | ||
].callback_time = self.periodic_callbacks["heartbeat"].callback_time | ||
# This is quite arbitrary but the heartbeat has scaling implemented | ||
self.periodic_callbacks["find-missing"].callback_time = self.periodic_callbacks[ | ||
"heartbeat" | ||
].callback_time | ||
|
||
def _update_who_has( | ||
self, who_has: dict[str, Collection[str]], *, stimulus_id: str | ||
self, who_has: Mapping[str, Collection[str]], *, stimulus_id: str | ||
) -> RecsInstrs: | ||
recs: Recs = {} | ||
instructions: Instructions = [] | ||
ensure_communicating = False | ||
|
||
for key, workers in who_has.items(): | ||
ts = self.tasks.get(key) | ||
|
@@ -3527,11 +3515,19 @@ def _update_who_has( | |
self.has_what[worker].add(key) | ||
if ts.state == "fetch": | ||
self.data_needed_per_worker[worker].push(ts) | ||
ensure_communicating = True | ||
This comment has been minimized.
Sorry, something went wrong.
gjoseph92
Collaborator
|
||
|
||
ts.who_has = workers | ||
if not workers and ts.state == "fetch": | ||
recs[ts] = "missing" | ||
elif workers and ts.state == "missing": | ||
recs[ts] = "fetch" | ||
|
||
if ensure_communicating: | ||
recs, instructions = merge_recs_instructions( | ||
(recs, instructions), | ||
self._ensure_communicating(stimulus_id=stimulus_id), | ||
This comment has been minimized.
Sorry, something went wrong.
gjoseph92
Collaborator
|
||
) | ||
return recs, instructions | ||
|
||
def handle_steal_request(self, key: str, stimulus_id: str) -> None: | ||
|
@@ -4056,6 +4052,25 @@ def _(self, ev: RescheduleEvent) -> RecsInstrs: | |
assert ts, self.story(ev.key) | ||
return {ts: "rescheduled"}, [] | ||
|
||
@handle_event.register | ||
def _(self, ev: FindMissingEvent) -> RecsInstrs: | ||
if not self._missing_dep_flight: | ||
return {}, [] | ||
|
||
if self.validate: | ||
for ts in self._missing_dep_flight: | ||
assert not ts.who_has | ||
|
||
smsg = RefreshWhoHasMsg( | ||
keys=[ts.key for ts in self._missing_dep_flight], | ||
stimulus_id=ev.stimulus_id, | ||
) | ||
return {}, [smsg] | ||
|
||
@handle_event.register | ||
def _(self, ev: RefreshWhoHasEvent) -> RecsInstrs: | ||
return self._update_who_has(ev.who_has, stimulus_id=ev.stimulus_id) | ||
|
||
def _prepare_args_for_execution( | ||
self, ts: TaskState, args: tuple, kwargs: dict[str, Any] | ||
) -> tuple[tuple, dict[str, Any]]: | ||
|
It might help to make the names of the scheduler and worker side more different. Like
RequestRefreshWhoHasMsg
->request-refresh-who-has
->refresh-who-has
->RefreshWhoHasEvent
.