Skip to content

Commit

Permalink
Root-ish tasks logic updates
Browse files Browse the repository at this point in the history
The goal is to identify a specific situation: fan-outs where the group is larger than the cluster, but the dependencies are (much) smaller than the cluster. When this is the case, scheduling near the dependencies is pointless, since you know those workers will get filled up and the dependencies will have to get copied everywhere anyway. So you want to instead schedule in a methodical order which ends up keeping neighbors together.

But the key is really crossing that boundary of cluster size. Hence these changes:
* `total_nthreads * 2` -> `total_nthreads`: so long as every thread will be saturated by this group, we know every worker will need all its dependencies. The 2x requirement is too restrictive.
* Switch magic 5 to `min(5, len(self.workers))`: if you have 3 workers, and your group has 3 dependencies, you actually _should_ try to schedule near those dependencies. Then each worker only needs 1 dependency, instead of copying all 3 dependencies to all 3 workers. If you had 20 workers, duplicating the dependencies would be unavoidable (without leaving most of the workers idle). But here, it is avoidable while maintaining parallelism, so avoid it.

  I'm actually wondering if we should get rid of magic 5 entirely, and just use a cluster-size metric. Like `len(self.workers) / 4` or something. If you have 1,000 workers, and a multi-thousand group has 20 dependencies, maybe you do want to copy those 20 dependencies to all workers up front. But if you only had 30 workers, you'd be much better off considering locality.
  • Loading branch information
gjoseph92 committed Sep 17, 2021
1 parent e1fd58b commit e5175ce
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
9 changes: 5 additions & 4 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,12 +2485,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
ts.state = "no-worker"
return ws

# Group is larger than cluster with few dependencies? Minimize future data transfers.
# Group fills the cluster and dependencies are much smaller than cluster? Minimize future data transfers.
ndeps_cutoff: Py_ssize_t = min(5, len(self._workers_dv))
if (
valid_workers is None
and len(group) > self._total_nthreads * 2
and len(group._dependencies) < 5
and sum(map(len, group._dependencies)) < 5
and len(group) >= self._total_nthreads
and len(group._dependencies) < ndeps_cutoff
and sum(map(len, group._dependencies)) < ndeps_cutoff
):
ws: WorkerState = group._last_worker

Expand Down
3 changes: 3 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
if ndeps >= len(nthreads):
pytest.skip()

@gen_cluster(
client=True,
nthreads=nthreads,
Expand Down

0 comments on commit e5175ce

Please sign in to comment.