diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index fca6219e3b6..03a74164994 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -3021,10 +3021,12 @@ def validate_state(self) -> None: assert worker in self.tasks[k].who_has for worker, tss in self.data_needed.items(): + # Test that there aren't multiple TaskState objects with the same key in the + # set + assert len({ts.key for ts in tss}) == len(tss) for ts in tss: assert ts.state == "fetch" assert self.tasks[ts.key] is ts - assert ts in self.data_needed assert worker in ts.who_has for ts in self.tasks.values(): @@ -3033,10 +3035,6 @@ def validate_state(self) -> None: if self.transition_counter_max: assert self.transition_counter < self.transition_counter_max - # Test that there aren't multiple TaskState objects with the same key in sets - for tss in self.data_needed.values(): - assert len({ts.key for ts in tss}) == len(tss) - class BaseWorker(abc.ABC): """Wrapper around the :class:`WorkerState` that implements instructions handling.