Skip to content

Commit

Permalink
Refactor busy_worker reinsertion (dask#6379)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed May 20, 2022
1 parent b660eac commit 0263828
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 8 deletions.
27 changes: 21 additions & 6 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@
RequestRefreshWhoHasMsg,
RescheduleEvent,
RescheduleMsg,
RetryBusyWorkerEvent,
RetryBusyWorkerLater,
SendMessageToScheduler,
SerializedTask,
StateMachineEvent,
Expand Down Expand Up @@ -2852,6 +2854,7 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None:
else:
self._handle_instructions(instructions)

@fail_hard
@log_errors
def handle_stimulus(self, stim: StateMachineEvent) -> None:
if not isinstance(stim, FindMissingEvent):
Expand Down Expand Up @@ -2917,6 +2920,12 @@ def _handle_instructions(self, instructions: Instructions) -> None:
name=f"execute({inst.key})",
)

elif isinstance(inst, RetryBusyWorkerLater):
task = asyncio.create_task(
self.retry_busy_worker_later(inst.worker),
name=f"retry_busy_worker_later({inst.worker})",
)

else:
raise TypeError(inst) # pragma: nocover

Expand Down Expand Up @@ -3349,7 +3358,9 @@ def done_event():
# Avoid hammering the worker. If there are multiple replicas
# available, immediately try fetching from a different worker.
self.busy_workers.add(worker)
self.io_loop.call_later(0.15, self._readd_busy_worker, worker)
instructions.append(
RetryBusyWorkerLater(worker=worker, stimulus_id=stimulus_id)
)

refresh_who_has = set()

Expand Down Expand Up @@ -3388,11 +3399,10 @@ def done_event():
self.transitions(recommendations, stimulus_id=stimulus_id)
self._handle_instructions(instructions)

@log_errors
def _readd_busy_worker(self, worker: str) -> None:
self.busy_workers.remove(worker)
self.handle_stimulus(
GatherDepDoneEvent(stimulus_id=f"readd-busy-worker-{time()}")
async def retry_busy_worker_later(self, worker: str) -> StateMachineEvent | None:
await asyncio.sleep(0.15)
return RetryBusyWorkerEvent(
worker=worker, stimulus_id=f"retry-busy-worker-{time()}"
)

@log_errors
Expand Down Expand Up @@ -3935,6 +3945,11 @@ def _(self, ev: GatherDepDoneEvent) -> RecsInstrs:
"""Temporary hack - to be removed"""
return self._ensure_communicating(stimulus_id=ev.stimulus_id)

@handle_event.register
def _(self, ev: RetryBusyWorkerEvent) -> RecsInstrs:
self.busy_workers.discard(ev.worker)
return self._ensure_communicating(stimulus_id=ev.stimulus_id)

@handle_event.register
def _(self, ev: CancelComputeEvent) -> RecsInstrs:
"""Scheduler requested to cancel a task"""
Expand Down
14 changes: 12 additions & 2 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ class Execute(Instruction):
key: str


@dataclass
class RetryBusyWorkerLater(Instruction):
__slots__ = ("worker",)
worker: str


@dataclass
class EnsureCommunicatingAfterTransitions(Instruction):
__slots__ = ()
Expand Down Expand Up @@ -463,6 +469,12 @@ class UnpauseEvent(StateMachineEvent):
__slots__ = ()


@dataclass
class RetryBusyWorkerEvent(StateMachineEvent):
__slots__ = ("worker",)
worker: str


@dataclass
class GatherDepDoneEvent(StateMachineEvent):
"""Temporary hack - to be removed"""
Expand All @@ -478,7 +490,6 @@ class ExecuteSuccessEvent(StateMachineEvent):
stop: float
nbytes: int
type: type | None
stimulus_id: str
__slots__ = tuple(__annotations__) # type: ignore

def to_loggable(self, *, handled: float) -> StateMachineEvent:
Expand All @@ -501,7 +512,6 @@ class ExecuteFailureEvent(StateMachineEvent):
traceback: Serialize | None
exception_text: str
traceback_text: str
stimulus_id: str
__slots__ = tuple(__annotations__) # type: ignore

def _after_from_dict(self) -> None:
Expand Down

0 comments on commit 0263828

Please sign in to comment.