From 026382811029e7601a954687edad681f5a1a724f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 20 May 2022 01:26:47 +0100 Subject: [PATCH] Refactor busy_worker reinsertion (#6379) --- distributed/worker.py | 27 +++++++++++++++++++++------ distributed/worker_state_machine.py | 14 ++++++++++++-- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 23f43a337fd..82f8a361f93 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -129,6 +129,8 @@ RequestRefreshWhoHasMsg, RescheduleEvent, RescheduleMsg, + RetryBusyWorkerEvent, + RetryBusyWorkerLater, SendMessageToScheduler, SerializedTask, StateMachineEvent, @@ -2852,6 +2854,7 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: else: self._handle_instructions(instructions) + @fail_hard @log_errors def handle_stimulus(self, stim: StateMachineEvent) -> None: if not isinstance(stim, FindMissingEvent): @@ -2917,6 +2920,12 @@ def _handle_instructions(self, instructions: Instructions) -> None: name=f"execute({inst.key})", ) + elif isinstance(inst, RetryBusyWorkerLater): + task = asyncio.create_task( + self.retry_busy_worker_later(inst.worker), + name=f"retry_busy_worker_later({inst.worker})", + ) + else: raise TypeError(inst) # pragma: nocover @@ -3349,7 +3358,9 @@ def done_event(): # Avoid hammering the worker. If there are multiple replicas # available, immediately try fetching from a different worker. self.busy_workers.add(worker) - self.io_loop.call_later(0.15, self._readd_busy_worker, worker) + instructions.append( + RetryBusyWorkerLater(worker=worker, stimulus_id=stimulus_id) + ) refresh_who_has = set() @@ -3388,11 +3399,10 @@ def done_event(): self.transitions(recommendations, stimulus_id=stimulus_id) self._handle_instructions(instructions) - @log_errors - def _readd_busy_worker(self, worker: str) -> None: - self.busy_workers.remove(worker) - self.handle_stimulus( - GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}") + async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent | None: + await asyncio.sleep(0.15) + return RetryBusyWorkerEvent( + worker=worker, stimulus_id=f"retry-busy-worker-{time()}" ) @log_errors @@ -3935,6 +3945,11 @@ def _(self, ev: GatherDepDoneEvent) -> RecsInstrs: """Temporary hack - to be removed""" return self._ensure_communicating(stimulus_id=ev.stimulus_id) + @handle_event.register + def _(self, ev: RetryBusyWorkerEvent) -> RecsInstrs: + self.busy_workers.discard(ev.worker) + return self._ensure_communicating(stimulus_id=ev.stimulus_id) + @handle_event.register def _(self, ev: CancelComputeEvent) -> RecsInstrs: """Scheduler requested to cancel a task""" diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index ec8a54fe6bc..888671cb49a 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -279,6 +279,12 @@ class Execute(Instruction): key: str +@dataclass +class RetryBusyWorkerLater(Instruction): + __slots__ = ("worker",) + worker: str + + @dataclass class EnsureCommunicatingAfterTransitions(Instruction): __slots__ = () @@ -463,6 +469,12 @@ class UnpauseEvent(StateMachineEvent): __slots__ = () +@dataclass +class RetryBusyWorkerEvent(StateMachineEvent): + __slots__ = ("worker",) + worker: str + + @dataclass class GatherDepDoneEvent(StateMachineEvent): """Temporary hack - to be removed""" @@ -478,7 +490,6 @@ class ExecuteSuccessEvent(StateMachineEvent): stop: float nbytes: int type: type | None - stimulus_id: str __slots__ = tuple(__annotations__) # type: ignore def to_loggable(self, *, handled: float) -> StateMachineEvent: @@ -501,7 +512,6 @@ class ExecuteFailureEvent(StateMachineEvent): traceback: Serialize | None exception_text: str traceback_text: str - stimulus_id: str __slots__ = tuple(__annotations__) # type: ignore def _after_from_dict(self) -> None: