From 0fcc724431ae5767e2f958b6c6fdee7a3be203a3 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 13 Jun 2022 11:16:17 +0100 Subject: [PATCH] Make worker state machine methods private (#6564) --- distributed/tests/test_failed_workers.py | 32 +-- distributed/worker.py | 276 ++++++++++++----------- 2 files changed, 158 insertions(+), 150 deletions(-) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 1e074bb5e7c..75624ceaacb 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -16,6 +16,7 @@ from distributed.metrics import time from distributed.utils import CancelledError, sync from distributed.utils_test import ( + BlockedGatherDep, captured_logger, cluster, div, @@ -24,7 +25,7 @@ slowadd, slowinc, ) -from distributed.worker_state_machine import FreeKeysEvent, TaskState +from distributed.worker_state_machine import FreeKeysEvent pytestmark = pytest.mark.ci1 @@ -469,22 +470,27 @@ async def test_worker_time_to_live(c, s, a, b): assert time() - start < interval + 2.0 -@gen_cluster() -async def test_forget_data_not_supposed_to_have(s, a, b): +@gen_cluster(client=True, nthreads=[("", 1)]) +async def test_forget_data_not_supposed_to_have(c, s, a): """If a dependency fetch finishes on a worker after the scheduler already released everything, the worker might be stuck with a redundant replica which is never cleaned up. """ - # FIXME: Replace with "blackbox test" which shows an actual example where - # this situation is provoked if this is even possible. - ts = TaskState("key", state="flight") - a.tasks["key"] = ts - recommendations = {ts: ("memory", 123)} - a.transitions(recommendations, stimulus_id="test") - - assert a.data - while a.data: - await asyncio.sleep(0.001) + async with BlockedGatherDep(s.address) as b: + x = c.submit(inc, 1, key="x", workers=[a.address]) + y = c.submit(inc, x, key="y", workers=[b.address]) + + await b.in_gather_dep.wait() + assert b.tasks["x"].state == "flight" + + x.release() + y.release() + while s.tasks: + await asyncio.sleep(0.01) + + b.block_gather_dep.set() + while b.tasks: + await asyncio.sleep(0.01) @gen_cluster( diff --git a/distributed/worker.py b/distributed/worker.py index 2c590befdc4..6a09d56cae5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1940,7 +1940,7 @@ def _handle_acquire_replicas(self, ev: AcquireReplicasEvent) -> RecsInstrs: recommendations: Recs = {} for key in ev.who_has: - ts = self.ensure_task_exists( + ts = self._ensure_task_exists( key=key, # Transfer this data after all dependency tasks of computations with # default or explicitly high (>0) user priority and before all @@ -1955,7 +1955,7 @@ def _handle_acquire_replicas(self, ev: AcquireReplicasEvent) -> RecsInstrs: self._update_who_has(ev.who_has) return recommendations, [] - def ensure_task_exists( + def _ensure_task_exists( self, key: str, *, priority: tuple[int, ...], stimulus_id: str ) -> TaskState: try: @@ -2029,7 +2029,7 @@ def _handle_compute_task(self, ev: ComputeTaskEvent) -> RecsInstrs: assert all(ev.who_has.values()) for dep_key, dep_workers in ev.who_has.items(): - dep_ts = self.ensure_task_exists( + dep_ts = self._ensure_task_exists( key=dep_key, priority=priority, stimulus_id=ev.stimulus_id, @@ -2054,7 +2054,7 @@ def _handle_compute_task(self, ev: ComputeTaskEvent) -> RecsInstrs: # Worker State Machine # ######################## - def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstrs: + def _transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstrs: if not ts.who_has: return {ts: "missing"}, [] @@ -2072,14 +2072,14 @@ def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstr # _select_keys_for_gather(). return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)] - def transition_missing_waiting( + def _transition_missing_waiting( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: self._missing_dep_flight.discard(ts) self._purge_state(ts) - return self.transition_released_waiting(ts, stimulus_id=stimulus_id) + return self._transition_released_waiting(ts, stimulus_id=stimulus_id) - def transition_missing_fetch( + def _transition_missing_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2089,25 +2089,25 @@ def transition_missing_fetch( return {}, [] self._missing_dep_flight.discard(ts) - return self.transition_generic_fetch(ts, stimulus_id=stimulus_id) + return self._transition_generic_fetch(ts, stimulus_id=stimulus_id) - def transition_missing_released( + def _transition_missing_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: self._missing_dep_flight.discard(ts) - recs, instructions = self.transition_generic_released( + recs, instructions = self._transition_generic_released( ts, stimulus_id=stimulus_id ) assert ts.key in self.tasks return recs, instructions - def transition_flight_missing( + def _transition_flight_missing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: assert ts.done - return self.transition_generic_missing(ts, stimulus_id=stimulus_id) + return self._transition_generic_missing(ts, stimulus_id=stimulus_id) - def transition_generic_missing( + def _transition_generic_missing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2118,14 +2118,14 @@ def transition_generic_missing( ts.done = False return {}, [] - def transition_released_fetch( + def _transition_released_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: assert ts.state == "released" - return self.transition_generic_fetch(ts, stimulus_id=stimulus_id) + return self._transition_generic_fetch(ts, stimulus_id=stimulus_id) - def transition_generic_released( + def _transition_generic_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: self._purge_state(ts) @@ -2146,7 +2146,7 @@ def transition_generic_released( self._ensure_computing(), ) - def transition_released_waiting( + def _transition_released_waiting( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2170,7 +2170,7 @@ def transition_released_waiting( ts.state = "waiting" return recommendations, [] - def transition_fetch_flight( + def _transition_fetch_flight( self, ts: TaskState, worker: str, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2187,23 +2187,23 @@ def transition_fetch_flight( self._in_flight_tasks.add(ts) return {}, [] - def transition_fetch_missing( + def _transition_fetch_missing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: # _ensure_communicating could have just popped this task out of data_needed self.data_needed.discard(ts) - return self.transition_generic_missing(ts, stimulus_id=stimulus_id) + return self._transition_generic_missing(ts, stimulus_id=stimulus_id) - def transition_memory_released( + def _transition_memory_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: - recs, instructions = self.transition_generic_released( + recs, instructions = self._transition_generic_released( ts, stimulus_id=stimulus_id ) instructions.append(ReleaseWorkerDataMsg(key=ts.key, stimulus_id=stimulus_id)) return recs, instructions - def transition_waiting_constrained( + def _transition_waiting_constrained( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2219,14 +2219,14 @@ def transition_waiting_constrained( self.constrained.append(ts.key) return self._ensure_computing() - def transition_long_running_rescheduled( + def _transition_long_running_rescheduled( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: recs: Recs = {ts: "released"} smsg = RescheduleMsg(key=ts.key, stimulus_id=stimulus_id) return recs, [smsg] - def transition_executing_rescheduled( + def _transition_executing_rescheduled( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: for resource, quantity in ts.resource_restrictions.items(): @@ -2241,7 +2241,7 @@ def transition_executing_rescheduled( self._ensure_computing(), ) - def transition_waiting_ready( + def _transition_waiting_ready( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2258,7 +2258,7 @@ def transition_waiting_ready( return self._ensure_computing() - def transition_cancelled_error( + def _transition_cancelled_error( self, ts: TaskState, exception: Serialize, @@ -2269,7 +2269,7 @@ def transition_cancelled_error( stimulus_id: str, ) -> RecsInstrs: assert ts._previous == "executing" or ts.key in self.long_running - recs, instructions = self.transition_executing_error( + recs, instructions = self._transition_executing_error( ts, exception, traceback, @@ -2292,7 +2292,7 @@ def transition_cancelled_error( recs[ts] = "released" return recs, instructions - def transition_generic_error( + def _transition_generic_error( self, ts: TaskState, exception: Serialize, @@ -2315,7 +2315,7 @@ def transition_generic_error( return {}, [smsg] - def transition_executing_error( + def _transition_executing_error( self, ts: TaskState, exception: Serialize, @@ -2330,7 +2330,7 @@ def transition_executing_error( self._executing.discard(ts) return merge_recs_instructions( - self.transition_generic_error( + self._transition_generic_error( ts, exception, traceback, @@ -2361,7 +2361,7 @@ def _transition_from_resumed( depending on the origin. Equally, only `fetch`, `waiting`, or `released` are allowed output states. - See also `transition_resumed_waiting` + See also `_transition_resumed_waiting` """ recs: Recs = {} instructions: Instructions = [] @@ -2383,14 +2383,14 @@ def _transition_from_resumed( assert ts._previous in {"executing", "flight"}, ts._previous if next_state != finish: - recs, instructions = self.transition_generic_released( + recs, instructions = self._transition_generic_released( ts, stimulus_id=stimulus_id ) recs[ts] = next_state return recs, instructions - def transition_resumed_fetch( + def _transition_resumed_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: """See Worker._transition_from_resumed""" @@ -2405,13 +2405,13 @@ def transition_resumed_fetch( assert ts.state != "fetch" return recs, instructions - def transition_resumed_missing( + def _transition_resumed_missing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: """See Worker._transition_from_resumed""" return self._transition_from_resumed(ts, "missing", stimulus_id=stimulus_id) - def transition_resumed_released( + def _transition_resumed_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if not ts.done: @@ -2419,15 +2419,15 @@ def transition_resumed_released( ts._next = None return {}, [] else: - return self.transition_generic_released(ts, stimulus_id=stimulus_id) + return self._transition_generic_released(ts, stimulus_id=stimulus_id) - def transition_resumed_waiting( + def _transition_resumed_waiting( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: """See Worker._transition_from_resumed""" return self._transition_from_resumed(ts, "waiting", stimulus_id=stimulus_id) - def transition_cancelled_fetch( + def _transition_cancelled_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if ts.done: @@ -2441,7 +2441,7 @@ def transition_cancelled_fetch( ts._next = "fetch" return {}, [] - def transition_cancelled_waiting( + def _transition_cancelled_waiting( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if ts.done: @@ -2455,7 +2455,7 @@ def transition_cancelled_waiting( ts._next = "waiting" return {}, [] - def transition_cancelled_forgotten( + def _transition_cancelled_forgotten( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: ts._next = "forgotten" @@ -2463,7 +2463,7 @@ def transition_cancelled_forgotten( return {}, [] return {ts: "released"}, [] - def transition_cancelled_released( + def _transition_cancelled_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if not ts.done: @@ -2474,9 +2474,9 @@ def transition_cancelled_released( for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - return self.transition_generic_released(ts, stimulus_id=stimulus_id) + return self._transition_generic_released(ts, stimulus_id=stimulus_id) - def transition_executing_released( + def _transition_executing_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: ts._previous = ts.state @@ -2486,13 +2486,13 @@ def transition_executing_released( ts.done = False return self._ensure_computing() - def transition_long_running_memory( + def _transition_long_running_memory( self, ts: TaskState, value=NO_VALUE, *, stimulus_id: str ) -> RecsInstrs: self.executed_count += 1 - return self.transition_generic_memory(ts, value=value, stimulus_id=stimulus_id) + return self._transition_generic_memory(ts, value=value, stimulus_id=stimulus_id) - def transition_generic_memory( + def _transition_generic_memory( self, ts: TaskState, value=NO_VALUE, *, stimulus_id: str ) -> RecsInstrs: if value is NO_VALUE and ts.key not in self.data: @@ -2523,7 +2523,7 @@ def transition_generic_memory( return recs, instructions - def transition_executing_memory( + def _transition_executing_memory( self, ts: TaskState, value=NO_VALUE, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2534,11 +2534,11 @@ def transition_executing_memory( self._executing.discard(ts) self.executed_count += 1 return merge_recs_instructions( - self.transition_generic_memory(ts, value=value, stimulus_id=stimulus_id), + self._transition_generic_memory(ts, value=value, stimulus_id=stimulus_id), self._ensure_computing(), ) - def transition_constrained_executing( + def _transition_constrained_executing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2553,7 +2553,7 @@ def transition_constrained_executing( instr = Execute(key=ts.key, stimulus_id=stimulus_id) return {}, [instr] - def transition_ready_executing( + def _transition_ready_executing( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: @@ -2570,7 +2570,9 @@ def transition_ready_executing( instr = Execute(key=ts.key, stimulus_id=stimulus_id) return {}, [instr] - def transition_flight_fetch(self, ts: TaskState, *, stimulus_id: str) -> RecsInstrs: + def _transition_flight_fetch( + self, ts: TaskState, *, stimulus_id: str + ) -> RecsInstrs: # If this transition is called after the flight coroutine has finished, # we can reset the task and transition to fetch again. If it is not yet # finished, this should be a no-op @@ -2578,9 +2580,9 @@ def transition_flight_fetch(self, ts: TaskState, *, stimulus_id: str) -> RecsIns return {}, [] ts.coming_from = None - return self.transition_generic_fetch(ts, stimulus_id=stimulus_id) + return self._transition_generic_fetch(ts, stimulus_id=stimulus_id) - def transition_flight_error( + def _transition_flight_error( self, ts: TaskState, exception: Serialize, @@ -2592,7 +2594,7 @@ def transition_flight_error( ) -> RecsInstrs: self._in_flight_tasks.discard(ts) ts.coming_from = None - return self.transition_generic_error( + return self._transition_generic_error( ts, exception, traceback, @@ -2601,13 +2603,13 @@ def transition_flight_error( stimulus_id=stimulus_id, ) - def transition_flight_released( + def _transition_flight_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if ts.done: # FIXME: Is this even possible? Would an assert instead be more # sensible? - return self.transition_generic_released(ts, stimulus_id=stimulus_id) + return self._transition_generic_released(ts, stimulus_id=stimulus_id) else: ts._previous = "flight" ts._next = None @@ -2615,15 +2617,15 @@ def transition_flight_released( ts.state = "cancelled" return {}, [] - def transition_cancelled_memory(self, ts, value, *, stimulus_id): + def _transition_cancelled_memory(self, ts, value, *, stimulus_id): # We only need this because the to-memory signatures require a value but # we do not want to store a cancelled result and want to release # immediately assert ts.done - return self.transition_cancelled_released(ts, stimulus_id=stimulus_id) + return self._transition_cancelled_released(ts, stimulus_id=stimulus_id) - def transition_executing_long_running( + def _transition_executing_long_running( self, ts: TaskState, compute_duration: float, *, stimulus_id: str ) -> RecsInstrs: ts.state = "long-running" @@ -2638,7 +2640,7 @@ def transition_executing_long_running( self._ensure_computing(), ) - def transition_released_memory( + def _transition_released_memory( self, ts: TaskState, value, *, stimulus_id: str ) -> RecsInstrs: try: @@ -2650,7 +2652,7 @@ def transition_released_memory( smsg = AddKeysMsg(keys=[ts.key], stimulus_id=stimulus_id) return recs, [smsg] - def transition_flight_memory( + def _transition_flight_memory( self, ts: TaskState, value, *, stimulus_id: str ) -> RecsInstrs: self._in_flight_tasks.discard(ts) @@ -2664,7 +2666,7 @@ def transition_flight_memory( smsg = AddKeysMsg(keys=[ts.key], stimulus_id=stimulus_id) return recs, [smsg] - def transition_released_forgotten( + def _transition_released_forgotten( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: recommendations: Recs = {} @@ -2690,56 +2692,56 @@ def transition_released_forgotten( _TRANSITIONS_TABLE: ClassVar[ Mapping[tuple[TaskStateState, TaskStateState], Callable[..., RecsInstrs]] ] = { - ("cancelled", "fetch"): transition_cancelled_fetch, - ("cancelled", "released"): transition_cancelled_released, - ("cancelled", "missing"): transition_cancelled_released, - ("cancelled", "waiting"): transition_cancelled_waiting, - ("cancelled", "forgotten"): transition_cancelled_forgotten, - ("cancelled", "memory"): transition_cancelled_memory, - ("cancelled", "error"): transition_cancelled_error, - ("resumed", "memory"): transition_generic_memory, - ("resumed", "error"): transition_generic_error, - ("resumed", "released"): transition_resumed_released, - ("resumed", "waiting"): transition_resumed_waiting, - ("resumed", "fetch"): transition_resumed_fetch, - ("resumed", "missing"): transition_resumed_missing, - ("constrained", "executing"): transition_constrained_executing, - ("constrained", "released"): transition_generic_released, - ("error", "released"): transition_generic_released, - ("executing", "error"): transition_executing_error, - ("executing", "long-running"): transition_executing_long_running, - ("executing", "memory"): transition_executing_memory, - ("executing", "released"): transition_executing_released, - ("executing", "rescheduled"): transition_executing_rescheduled, - ("fetch", "flight"): transition_fetch_flight, - ("fetch", "missing"): transition_fetch_missing, - ("fetch", "released"): transition_generic_released, - ("flight", "error"): transition_flight_error, - ("flight", "fetch"): transition_flight_fetch, - ("flight", "memory"): transition_flight_memory, - ("flight", "missing"): transition_flight_missing, - ("flight", "released"): transition_flight_released, - ("long-running", "error"): transition_generic_error, - ("long-running", "memory"): transition_long_running_memory, - ("long-running", "rescheduled"): transition_executing_rescheduled, - ("long-running", "released"): transition_executing_released, - ("memory", "released"): transition_memory_released, - ("missing", "fetch"): transition_missing_fetch, - ("missing", "released"): transition_missing_released, - ("missing", "error"): transition_generic_error, - ("missing", "waiting"): transition_missing_waiting, - ("ready", "error"): transition_generic_error, - ("ready", "executing"): transition_ready_executing, - ("ready", "released"): transition_generic_released, - ("released", "error"): transition_generic_error, - ("released", "fetch"): transition_released_fetch, - ("released", "missing"): transition_generic_missing, - ("released", "forgotten"): transition_released_forgotten, - ("released", "memory"): transition_released_memory, - ("released", "waiting"): transition_released_waiting, - ("waiting", "constrained"): transition_waiting_constrained, - ("waiting", "ready"): transition_waiting_ready, - ("waiting", "released"): transition_generic_released, + ("cancelled", "fetch"): _transition_cancelled_fetch, + ("cancelled", "released"): _transition_cancelled_released, + ("cancelled", "missing"): _transition_cancelled_released, + ("cancelled", "waiting"): _transition_cancelled_waiting, + ("cancelled", "forgotten"): _transition_cancelled_forgotten, + ("cancelled", "memory"): _transition_cancelled_memory, + ("cancelled", "error"): _transition_cancelled_error, + ("resumed", "memory"): _transition_generic_memory, + ("resumed", "error"): _transition_generic_error, + ("resumed", "released"): _transition_resumed_released, + ("resumed", "waiting"): _transition_resumed_waiting, + ("resumed", "fetch"): _transition_resumed_fetch, + ("resumed", "missing"): _transition_resumed_missing, + ("constrained", "executing"): _transition_constrained_executing, + ("constrained", "released"): _transition_generic_released, + ("error", "released"): _transition_generic_released, + ("executing", "error"): _transition_executing_error, + ("executing", "long-running"): _transition_executing_long_running, + ("executing", "memory"): _transition_executing_memory, + ("executing", "released"): _transition_executing_released, + ("executing", "rescheduled"): _transition_executing_rescheduled, + ("fetch", "flight"): _transition_fetch_flight, + ("fetch", "missing"): _transition_fetch_missing, + ("fetch", "released"): _transition_generic_released, + ("flight", "error"): _transition_flight_error, + ("flight", "fetch"): _transition_flight_fetch, + ("flight", "memory"): _transition_flight_memory, + ("flight", "missing"): _transition_flight_missing, + ("flight", "released"): _transition_flight_released, + ("long-running", "error"): _transition_generic_error, + ("long-running", "memory"): _transition_long_running_memory, + ("long-running", "rescheduled"): _transition_executing_rescheduled, + ("long-running", "released"): _transition_executing_released, + ("memory", "released"): _transition_memory_released, + ("missing", "fetch"): _transition_missing_fetch, + ("missing", "released"): _transition_missing_released, + ("missing", "error"): _transition_generic_error, + ("missing", "waiting"): _transition_missing_waiting, + ("ready", "error"): _transition_generic_error, + ("ready", "executing"): _transition_ready_executing, + ("ready", "released"): _transition_generic_released, + ("released", "error"): _transition_generic_error, + ("released", "fetch"): _transition_released_fetch, + ("released", "missing"): _transition_generic_missing, + ("released", "forgotten"): _transition_released_forgotten, + ("released", "memory"): _transition_released_memory, + ("released", "waiting"): _transition_released_waiting, + ("waiting", "constrained"): _transition_waiting_constrained, + ("waiting", "ready"): _transition_waiting_ready, + ("waiting", "released"): _transition_generic_released, } def _transition( @@ -2834,7 +2836,7 @@ def _transition( ) return recs, instructions - def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: + def _transitions(self, recommendations: Recs, *, stimulus_id: str) -> None: """Process transitions until none are left This includes feedback from previous transitions and continues until we @@ -2868,7 +2870,7 @@ def handle_stimulus(self, stim: StateMachineEvent) -> None: self.stimulus_log.append(stim.to_loggable(handled=time())) try: recs, instructions = self._handle_event(stim) - self.transitions(recs, stimulus_id=stim.stimulus_id) + self._transitions(recs, stimulus_id=stim.stimulus_id) self._handle_instructions(instructions) except Exception as e: if hasattr(e, "to_event"): @@ -2952,7 +2954,7 @@ def _handle_instructions(self, instructions: Instructions) -> None: recs, instructions = self._ensure_communicating( stimulus_id=ensure_communicating.stimulus_id ) - self.transitions(recs, stimulus_id=ensure_communicating.stimulus_id) + self._transitions(recs, stimulus_id=ensure_communicating.stimulus_id) else: instructions = [] @@ -3536,7 +3538,7 @@ def _handle_steal_request(self, ev: StealRequestEvent) -> RecsInstrs: if state in READY | {"waiting"}: # If task is marked as "constrained" we haven't yet assigned it an # `available_resources` to run on, that happens in - # `transition_constrained_executing` + # `_transition_constrained_executing` assert ts return {ts: "released"}, [smsg] else: @@ -4194,14 +4196,14 @@ async def benchmark_network(self, address: str) -> dict[str, float]: # Validation # ############## - def validate_task_memory(self, ts): + def _validate_task_memory(self, ts): assert ts.key in self.data or ts.key in self.actors assert isinstance(ts.nbytes, int) assert not ts.waiting_for_data assert ts.key not in self.ready assert ts.state == "memory" - def validate_task_executing(self, ts): + def _validate_task_executing(self, ts): assert ts.state == "executing" assert ts.run_spec is not None assert ts.key not in self.data @@ -4210,7 +4212,7 @@ def validate_task_executing(self, ts): assert dep.state == "memory", self.story(dep) assert dep.key in self.data or dep.key in self.actors - def validate_task_ready(self, ts): + def _validate_task_ready(self, ts): assert ts.key in pluck(1, self.ready) assert ts.key not in self.data assert ts.state != "executing" @@ -4220,14 +4222,14 @@ def validate_task_ready(self, ts): dep.key in self.data or dep.key in self.actors for dep in ts.dependencies ) - def validate_task_waiting(self, ts): + def _validate_task_waiting(self, ts): assert ts.key not in self.data assert ts.state == "waiting" assert not ts.done if ts.dependencies and ts.run_spec: assert not all(dep.key in self.data for dep in ts.dependencies) - def validate_task_flight(self, ts): + def _validate_task_flight(self, ts): assert ts.key not in self.data assert ts in self._in_flight_tasks assert not any(dep.key in self.ready for dep in ts.dependents) @@ -4235,7 +4237,7 @@ def validate_task_flight(self, ts): assert ts.coming_from in self.in_flight_workers assert ts.key in self.in_flight_workers[ts.coming_from] - def validate_task_fetch(self, ts): + def _validate_task_fetch(self, ts): assert ts.key not in self.data assert self.address not in ts.who_has assert not ts.done @@ -4245,25 +4247,25 @@ def validate_task_fetch(self, ts): assert ts.key in self.has_what[w] assert ts in self.data_needed_per_worker[w] - def validate_task_missing(self, ts): + def _validate_task_missing(self, ts): assert ts.key not in self.data assert not ts.who_has assert not ts.done assert not any(ts.key in has_what for has_what in self.has_what.values()) assert ts in self._missing_dep_flight - def validate_task_cancelled(self, ts): + def _validate_task_cancelled(self, ts): assert ts.key not in self.data assert ts._previous in {"long-running", "executing", "flight"} # We'll always transition to released after it is done assert ts._next is None, (ts.key, ts._next, self.story(ts)) - def validate_task_resumed(self, ts): + def _validate_task_resumed(self, ts): assert ts.key not in self.data assert ts._next assert ts._previous in {"long-running", "executing", "flight"} - def validate_task_released(self, ts): + def _validate_task_released(self, ts): assert ts.key not in self.data assert not ts._next assert not ts._previous @@ -4291,25 +4293,25 @@ def validate_task(self, ts): if ts.key in self.tasks: assert self.tasks[ts.key] == ts if ts.state == "memory": - self.validate_task_memory(ts) + self._validate_task_memory(ts) elif ts.state == "waiting": - self.validate_task_waiting(ts) + self._validate_task_waiting(ts) elif ts.state == "missing": - self.validate_task_missing(ts) + self._validate_task_missing(ts) elif ts.state == "cancelled": - self.validate_task_cancelled(ts) + self._validate_task_cancelled(ts) elif ts.state == "resumed": - self.validate_task_resumed(ts) + self._validate_task_resumed(ts) elif ts.state == "ready": - self.validate_task_ready(ts) + self._validate_task_ready(ts) elif ts.state == "executing": - self.validate_task_executing(ts) + self._validate_task_executing(ts) elif ts.state == "flight": - self.validate_task_flight(ts) + self._validate_task_flight(ts) elif ts.state == "fetch": - self.validate_task_fetch(ts) + self._validate_task_fetch(ts) elif ts.state == "released": - self.validate_task_released(ts) + self._validate_task_released(ts) except Exception as e: logger.exception(e) if LOG_PDB: