forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor find_missing and refresh_who_has #3
Closed
Closed
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
2f96f32
Refactor find_missing and refresh_who_has
crusaderky 35bb0d2
Merge branch 'WSMR/update_who_has' into WSMR/refresh_who_has
crusaderky 263bf73
redesign post changes in update_who_has PR
crusaderky df488f0
Revert bits of update_who_has
crusaderky 47488ac
Merge branch 'WSMR/update_who_has' into WSMR/refresh_who_has
crusaderky e3e5fae
Merge branch 'WSMR/update_who_has' into WSMR/refresh_who_has
crusaderky 1bc0283
Don't use EnsureCommunicatingAfterTransitions
crusaderky File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -115,6 +115,7 @@ | |
Execute, | ||
ExecuteFailureEvent, | ||
ExecuteSuccessEvent, | ||
FindMissingEvent, | ||
GatherDep, | ||
GatherDepDoneEvent, | ||
Instructions, | ||
|
@@ -123,7 +124,9 @@ | |
MissingDataMsg, | ||
Recs, | ||
RecsInstrs, | ||
RefreshWhoHasEvent, | ||
ReleaseWorkerDataMsg, | ||
RequestRefreshWhoHasMsg, | ||
RescheduleEvent, | ||
RescheduleMsg, | ||
SendMessageToScheduler, | ||
|
@@ -813,6 +816,7 @@ def __init__( | |
"free-keys": self.handle_free_keys, | ||
"remove-replicas": self.handle_remove_replicas, | ||
"steal-request": self.handle_steal_request, | ||
"refresh-who-has": self.handle_refresh_who_has, | ||
"worker-status-change": self.handle_worker_status_change, | ||
} | ||
|
||
|
@@ -840,9 +844,7 @@ def __init__( | |
) | ||
self.periodic_callbacks["keep-alive"] = pc | ||
|
||
# FIXME annotations: https://github.com/tornadoweb/tornado/issues/3117 | ||
pc = PeriodicCallback(self.find_missing, 1000) # type: ignore | ||
self._find_missing_running = False | ||
pc = PeriodicCallback(self.find_missing, 1000) | ||
self.periodic_callbacks["find-missing"] = pc | ||
|
||
self._address = contact_address | ||
|
@@ -1839,6 +1841,13 @@ def handle_remove_replicas(self, keys: list[str], stimulus_id: str) -> str: | |
|
||
return "OK" | ||
|
||
def handle_refresh_who_has( | ||
self, who_has: dict[str, list[str]], stimulus_id: str | ||
) -> None: | ||
self.handle_stimulus( | ||
RefreshWhoHasEvent(who_has=who_has, stimulus_id=stimulus_id) | ||
) | ||
|
||
async def set_resources(self, **resources) -> None: | ||
for r, quantity in resources.items(): | ||
if r in self.total_resources: | ||
|
@@ -2849,7 +2858,8 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: | |
|
||
@log_errors | ||
def handle_stimulus(self, stim: StateMachineEvent) -> None: | ||
self.stimulus_log.append(stim.to_loggable(handled=time())) | ||
if not isinstance(stim, FindMissingEvent): | ||
self.stimulus_log.append(stim.to_loggable(handled=time())) | ||
recs, instructions = self.handle_event(stim) | ||
self.transitions(recs, stimulus_id=stim.stimulus_id) | ||
self._handle_instructions(instructions) | ||
|
@@ -2991,11 +3001,8 @@ def _ensure_communicating(self, *, stimulus_id: str) -> RecsInstrs: | |
if ts.state != "fetch" or ts.key in all_keys_to_gather: | ||
continue | ||
|
||
if not ts.who_has: | ||
recommendations[ts] = "missing" | ||
continue | ||
|
||
if self.validate: | ||
assert ts.who_has | ||
assert self.address not in ts.who_has | ||
|
||
workers = [ | ||
|
@@ -3348,7 +3355,7 @@ def done_event(): | |
self.busy_workers.add(worker) | ||
self.io_loop.call_later(0.15, self._readd_busy_worker, worker) | ||
|
||
refresh_who_has = set() | ||
refresh_who_has = [] | ||
|
||
for d in self.in_flight_workers.pop(worker): | ||
ts = self.tasks[d] | ||
|
@@ -3358,7 +3365,7 @@ def done_event(): | |
elif busy: | ||
recommendations[ts] = "fetch" | ||
if not ts.who_has - self.busy_workers: | ||
refresh_who_has.add(ts.key) | ||
refresh_who_has.append(d) | ||
elif ts not in recommendations: | ||
ts.who_has.discard(worker) | ||
self.has_what[worker].discard(ts.key) | ||
|
@@ -3371,17 +3378,19 @@ def done_event(): | |
) | ||
) | ||
recommendations[ts] = "fetch" | ||
del data, response | ||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
self._handle_instructions(instructions) | ||
|
||
if refresh_who_has: | ||
# All workers that hold known replicas of our tasks are busy. | ||
# Try querying the scheduler for unknown ones. | ||
who_has = await retry_operation( | ||
self.scheduler.who_has, keys=refresh_who_has | ||
instructions.append( | ||
RequestRefreshWhoHasMsg( | ||
keys=refresh_who_has, | ||
stimulus_id=f"gather-dep-busy-{time()}", | ||
) | ||
) | ||
self._update_who_has(who_has) | ||
|
||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
self._handle_instructions(instructions) | ||
|
||
@log_errors | ||
def _readd_busy_worker(self, worker: str) -> None: | ||
|
@@ -3391,33 +3400,13 @@ def _readd_busy_worker(self, worker: str) -> None: | |
) | ||
|
||
@log_errors | ||
async def find_missing(self) -> None: | ||
if self._find_missing_running or not self._missing_dep_flight: | ||
return | ||
try: | ||
self._find_missing_running = True | ||
if self.validate: | ||
for ts in self._missing_dep_flight: | ||
assert not ts.who_has | ||
def find_missing(self) -> None: | ||
self.handle_stimulus(FindMissingEvent(stimulus_id=f"find-missing-{time()}")) | ||
|
||
stimulus_id = f"find-missing-{time()}" | ||
who_has = await retry_operation( | ||
self.scheduler.who_has, | ||
keys=[ts.key for ts in self._missing_dep_flight], | ||
) | ||
self._update_who_has(who_has) | ||
recommendations: Recs = {} | ||
for ts in self._missing_dep_flight: | ||
if ts.who_has: | ||
recommendations[ts] = "fetch" | ||
self.transitions(recommendations, stimulus_id=stimulus_id) | ||
|
||
finally: | ||
self._find_missing_running = False | ||
# This is quite arbitrary but the heartbeat has scaling implemented | ||
self.periodic_callbacks[ | ||
"find-missing" | ||
].callback_time = self.periodic_callbacks["heartbeat"].callback_time | ||
# This is quite arbitrary but the heartbeat has scaling implemented | ||
self.periodic_callbacks["find-missing"].callback_time = self.periodic_callbacks[ | ||
"heartbeat" | ||
].callback_time | ||
|
||
def _update_who_has(self, who_has: Mapping[str, Collection[str]]) -> None: | ||
for key, workers in who_has.items(): | ||
|
@@ -3965,6 +3954,51 @@ def _(self, ev: RescheduleEvent) -> RecsInstrs: | |
assert ts, self.story(ev.key) | ||
return {ts: "rescheduled"}, [] | ||
|
||
@handle_event.register | ||
def _(self, ev: FindMissingEvent) -> RecsInstrs: | ||
if not self._missing_dep_flight: | ||
return {}, [] | ||
|
||
if self.validate: | ||
assert not any(ts.who_has for ts in self._missing_dep_flight) | ||
|
||
smsg = RequestRefreshWhoHasMsg( | ||
keys=[ts.key for ts in self._missing_dep_flight], | ||
stimulus_id=ev.stimulus_id, | ||
) | ||
return {}, [smsg] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, the only reason for this event - which is always triggered by the worker itself - is to encapsulate this logic away from the Worker and into the WorkerState. |
||
|
||
@handle_event.register | ||
def _(self, ev: RefreshWhoHasEvent) -> RecsInstrs: | ||
self._update_who_has(ev.who_has) | ||
recommendations: Recs = {} | ||
instructions: Instructions = [] | ||
ensure_communicating = False | ||
|
||
for key in ev.who_has: | ||
ts = self.tasks.get(key) | ||
if not ts: | ||
continue | ||
|
||
if ts.who_has and ts.state == "missing": | ||
recommendations[ts] = "fetch" | ||
elif ts.who_has and ts.state == "fetch": | ||
# We potentially just acquired new replicas whereas all previously known | ||
# workers are in flight or busy. We're deliberately not testing the | ||
# minute use cases here for the sake of simplicity; instead we rely on | ||
# _ensure_communicating to be a no-op when there's nothing to do. | ||
ensure_communicating = True | ||
elif not ts.who_has and ts.state == "fetch": | ||
recommendations[ts] = "missing" | ||
|
||
if ensure_communicating: | ||
recommendations, instructions = merge_recs_instructions( | ||
(recommendations, instructions), | ||
self._ensure_communicating(stimulus_id=ev.stimulus_id), | ||
) | ||
|
||
return recommendations, instructions | ||
|
||
def _prepare_args_for_execution( | ||
self, ts: TaskState, args: tuple, kwargs: dict[str, Any] | ||
) -> tuple[tuple, dict[str, Any]]: | ||
|
@@ -4190,8 +4224,8 @@ def validate_task_fetch(self, ts): | |
assert self.address not in ts.who_has | ||
assert not ts.done | ||
assert ts in self.data_needed | ||
# Note: ts.who_has may be have been emptied by _update_who_has, but the task | ||
# won't transition to missing until it reaches the top of the data_needed heap. | ||
assert ts.who_has | ||
|
||
for w in ts.who_has: | ||
assert ts.key in self.has_what[w] | ||
assert ts in self.data_needed_per_worker[w] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly, but post refactor the alternative is for Worker to put its nose directly into the internal data structures of the WorkerState. The alternative would also require the Worker to autonomously realise that something's stuck on the state and query the scheduler accordingly; I don't think it should own this kind of intelligence.