diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 69ee585dfb4..cb5318a8231 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -7060,20 +7060,23 @@ def adaptive_target(self, target_duration=None): to_close = self.workers_to_close() return len(self.workers) - len(to_close) - def request_acquire_replicas( - self, addr: str, keys: Iterable[str], *, stimulus_id: str - ) -> None: + def request_acquire_replicas(self, addr: str, keys: list, *, stimulus_id: str): """Asynchronously ask a worker to acquire a replica of the listed keys from other workers. This is a fire-and-forget operation which offers no feedback for success or failure, and is intended for housekeeping and not for computation. """ - who_has = {key: {ws.address for ws in self.tasks[key].who_has} for key in keys} + who_has = {} + for key in keys: + ts = self.tasks[key] + who_has[key] = {ws.address for ws in ts.who_has} + if self.validate: assert all(who_has.values()) self.stream_comms[addr].send( { "op": "acquire-replicas", + "keys": keys, "who_has": who_has, "stimulus_id": stimulus_id, }, diff --git a/distributed/worker.py b/distributed/worker.py index 9e3ea84cad2..1accfa5f2dd 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1864,15 +1864,17 @@ def handle_cancel_compute(self, key: str, stimulus_id: str) -> None: def handle_acquire_replicas( self, - who_has: dict[str, Collection[str]], *, + keys: Collection[str], + who_has: dict[str, Collection[str]], stimulus_id: str, ) -> None: if self.validate: + assert set(keys) == who_has.keys() assert all(who_has.values()) recommendations: Recs = {} - for key in who_has: + for key in keys: ts = self.ensure_task_exists( key=key, # Transfer this data after all dependency tasks of computations with @@ -1893,7 +1895,7 @@ def handle_acquire_replicas( self._handle_instructions(instructions) if self.validate: - for key in who_has: + for key in keys: assert self.tasks[key].state != "released", self.story(key) def ensure_task_exists(