Skip to content

Commit

Permalink
Batch all messages from gather
Browse files Browse the repository at this point in the history
  • Loading branch information
jakirkham committed Feb 23, 2021
1 parent 55f5440 commit 93ad1ed
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4893,6 +4893,10 @@ async def gather(self, comm=None, keys=None, serializers=None):
for worker in missing_workers
]
)

recommendations: dict
client_msgs: dict = {}
worker_msgs: dict = {}
for key, workers in missing_keys.items():
# Task may already be gone if it was held by a
# `missing_worker`
Expand All @@ -4905,13 +4909,15 @@ async def gather(self, comm=None, keys=None, serializers=None):
if not workers or ts is None:
continue
ts_nbytes: Py_ssize_t = ts.get_nbytes()
recommendations: dict = {key: "released"}
for worker in workers:
ws = parent._workers_dv.get(worker)
if ws is not None and ts in ws._has_what:
ws._has_what.remove(ts)
ts._who_has.remove(ws)
ws._nbytes -= ts_nbytes
self.transitions({key: "released"})
self._transitions(recommendations, client_msgs, worker_msgs)
self.send_all(client_msgs, worker_msgs)

self.log_event("all", {"action": "gather", "count": len(keys)})
return result
Expand Down

0 comments on commit 93ad1ed

Please sign in to comment.