Skip to content

Commit

Permalink
alternative: resume if RetireWorkers stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
gjoseph92 committed Apr 27, 2022
1 parent eae1bd4 commit 2fa193a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
2 changes: 0 additions & 2 deletions distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,6 @@ def run(self) -> SuggestionGenerator:

def done(self) -> bool:
"""Return True if it is safe to close the worker down; False otherwise"""
if self not in self.manager.policies:
return True
ws = self.manager.scheduler.workers.get(self.address)
if ws is None:
return True
Expand Down
9 changes: 8 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6071,8 +6071,15 @@ async def _track_retire_worker(
close_workers: bool,
remove: bool,
) -> tuple: # tuple[str | None, dict]
amm = self.extensions["amm"]
while not policy.done():
if policy.no_recipients:
if stopped := policy not in amm.policies:
logger.warning(
f"RetireWorkers for {policy.address} stopped, but not done. "
f"Setting worker back to {prev_status.name!r}. Keys on worker: "
f"{ {ts: ts.who_has for ts in ws.has_what} }"
)
if policy.no_recipients or stopped:
# Abort retirement. This time we don't need to worry about race
# conditions and we can wait for a scheduler->worker->scheduler
# round-trip.
Expand Down

0 comments on commit 2fa193a

Please sign in to comment.