Skip to content

Commit

Permalink
Don't use EnsureCommunicatingAfterTransitions
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 30, 2022
1 parent e3e5fae commit 1bc0283
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3973,6 +3973,7 @@ def _(self, ev: RefreshWhoHasEvent) -> RecsInstrs:
self._update_who_has(ev.who_has)
recommendations: Recs = {}
instructions: Instructions = []
ensure_communicating = False

for key in ev.who_has:
ts = self.tasks.get(key)
Expand All @@ -3986,12 +3987,16 @@ def _(self, ev: RefreshWhoHasEvent) -> RecsInstrs:
# workers are in flight or busy. We're deliberately not testing the
# minute use cases here for the sake of simplicity; instead we rely on
# _ensure_communicating to be a no-op when there's nothing to do.
instructions.append(
EnsureCommunicatingAfterTransitions(stimulus_id=ev.stimulus_id)
)
ensure_communicating = True
elif not ts.who_has and ts.state == "fetch":
recommendations[ts] = "missing"

if ensure_communicating:
recommendations, instructions = merge_recs_instructions(
(recommendations, instructions),
self._ensure_communicating(stimulus_id=ev.stimulus_id),
)

return recommendations, instructions

def _prepare_args_for_execution(
Expand Down

0 comments on commit 1bc0283

Please sign in to comment.