-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor find_missing and refresh_who_has #3
Conversation
Unit Test Results 15 files ± 0 15 suites ±0 6h 27m 31s ⏱️ + 5m 38s For more details on these failures, see this check. Results for commit 1bc0283. ± Comparison against base commit e8f5ef5. ♻️ This comment has been updated with latest results. |
37c15b2
to
45dc795
Compare
933c9cf
to
9479d54
Compare
45dc795
to
2f96f32
Compare
@@ -2854,7 +2863,8 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: | |||
|
|||
@log_errors | |||
def handle_stimulus(self, stim: StateMachineEvent) -> None: | |||
self.stimulus_log.append(stim.to_loggable(handled=time())) | |||
if not isinstance(stim, FindMissingEvent): | |||
self.stimulus_log.append(stim.to_loggable(handled=time())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly, but post refactor the alternative is for Worker to put its nose directly into the internal data structures of the WorkerState. The alternative would also require the Worker to autonomously realise that something's stuck on the state and query the scheduler accordingly; I don't think it should own this kind of intelligence.
distributed/worker.py
Outdated
# any other state -> eventually, possibly, the task may transition to fetch | ||
# or missing, at which point the relevant transitions will test who_has that | ||
# we just updated. e.g. see the various transitions to fetch, which | ||
# instead recommend transitioning to missing if who_has is empty. | ||
if not workers and ts.state == "fetch": | ||
recs[ts] = "missing" | ||
elif workers and ts.state == "missing": | ||
recs[ts] = "fetch" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this could be triggered by any of the 4 events that invoke update_who_has:
- handle_compute_task
- handle_acquire_replicas
- RefreshWhoHasEvent, instigated by find_missing
- RefreshWhoHasEvent, instigated by refresh_who_has
keys=[ts.key for ts in self._missing_dep_flight], | ||
stimulus_id=ev.stimulus_id, | ||
) | ||
return {}, [smsg] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, the only reason for this event - which is always triggered by the worker itself - is to encapsulate this logic away from the Worker and into the WorkerState.
distributed/worker.py
Outdated
# _ensure_communicating to be a no-op when there's nothing to do. | ||
instructions.append( | ||
EnsureCommunicatingAfterTransitions(stimulus_id=ev.stimulus_id) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this is triggered specifically for find_missing and refresh_who_has and does not fix dask#6446
Temporary diff show for dask#6348 vs dask#6342