Skip to content

Commit

Permalink
WIP slightly different metric
Browse files Browse the repository at this point in the history
Here we're assuming that all tasks in the group have a similar number of dependents / degree of fan-out. Then if this dependency is widely used enough to fill the cluster, and there are not nearly enough like it to fill the cluster, then we should be okay with copying it around to enable parallelism (ignoring it, since other dependencies of the task are likely more important).
  • Loading branch information
gjoseph92 committed Sep 20, 2021
1 parent e5175ce commit d0f0955
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7979,6 +7979,9 @@ def decide_worker(
dts: TaskState
deps: set = ts._dependencies
candidates: set
n_workers: Py_ssize_t = len(
valid_workers if valid_workers is not None else all_workers
)
assert all([dts._who_has for dts in deps])
if ts._actor:
candidates = set(all_workers)
Expand All @@ -7987,8 +7990,18 @@ def decide_worker(
wws
for dts in deps
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._dependents) / len(dts._group), len(dts._who_has))
< len(valid_workers if valid_workers is not None else all_workers)
if len(dts._who_has) < n_workers
and not (
len(dts._dependents) >= n_workers
and len(dts._group) < n_workers // 2
# Really want something like:
# map(len, dts._group._dependents) >= nthreads and len(dts._group) < n_workers // 2
# Or at least
# len(dts._dependents) * len(dts._group) >= nthreads and len(dts._group) < n_workers // 2
# But `nthreads` is O(k) to calcualte if given `valid_workers`.
# and the `map(len, dts._group._dependents)` could be extremely expensive since we can't put
# much of an upper bound on it.
)
for wws in dts._who_has
}
if valid_workers is None:
Expand Down

0 comments on commit d0f0955

Please sign in to comment.