Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore widely-shared dependencies in decide_worker #5325

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2485,12 +2485,13 @@ def decide_worker(self, ts: TaskState) -> WorkerState:
ts.state = "no-worker"
return ws

# Group is larger than cluster with few dependencies? Minimize future data transfers.
# Group fills the cluster and dependencies are much smaller than cluster? Minimize future data transfers.
ndeps_cutoff: Py_ssize_t = min(5, len(self._workers_dv))
if (
valid_workers is None
and len(group) > self._total_nthreads * 2
and len(group._dependencies) < 5
and sum(map(len, group._dependencies)) < 5
and len(group) >= self._total_nthreads
and len(group._dependencies) < ndeps_cutoff
and sum(map(len, group._dependencies)) < ndeps_cutoff
):
ws: WorkerState = group._last_worker

Expand Down Expand Up @@ -7982,7 +7983,14 @@ def decide_worker(
if ts._actor:
candidates = set(all_workers)
else:
candidates = {wws for dts in deps for wws in dts._who_has}
candidates = {
wws
for dts in deps
# Ignore dependencies that will need to be, or already are, copied to all workers
if max(len(dts._dependents) / len(dts._group), len(dts._who_has))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intentionally the number of dependents of a task over the length of a group. Wouldn't the behaviour here be drastically different if I have 2 subtrees vs 200 subtrees even though the topology is otherwise identical?

Reusing your ascii art

    ........  ........ 
    \\\\////  \\\\////
       a         b

and

    ........  ........  ........  ........    ........  ........  ........  ........    ........  ........  ........  ........
    \\\\////  \\\\////  \\\\////  \\\\////    \\\\////  \\\\////  \\\\////  \\\\////    \\\\////  \\\\////  \\\\////  \\\\////
       a         b         c         d       a1         b1         c1         d1       a2         b2         c2         d2

should behave equally

Copy link
Member

@fjetter fjetter Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my intuition about groups might be off. in In my mind, this graph contains two groups. All ... are a group and all {a*, b*, c*} are a group. The length of that group is then the number of tasks in a group.

IF that is correct, the len(dependents)/len(group) would evaluate to

1st graph: #dependents(8) / len(group)(2) == 4
2nd graph: #dependents(8) / len(group)(12) == 2/3

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent about 1 minute coming up with this metric and then 1 hour trying to explain or figure out if it actually made sense (beyond that it works in this case). It may not. But here's what I'm thinking (this does not answer the question at all, just trying to communicate a lot of thoughts):

Your intuition is correct. The behavior would and should be different if you have 2 subtrees vs 200—depending on the number of workers. Imagine we have 5 workers:

  • In the first graph we have fewer sub-trees (groups) than workers. (This actually should fall in the root-ish task case. More on this later.) So unless we're going to leave 3 workers idle, copying a and b to more workers will improve parallelism. So we don't care where a and b live while scheduling ... because we expect a and b to get transferred around anyway.
  • In the second graph we have more sub-trees than workers. So transferring the a*,b*,c*s won't get things done any faster[^1]—the cluster is already saturated. Therefore, we do want to consider where the root tasks live when scheduling ..., because we can avoid them getting copied around unnecessarily.

So even though local topology is identical, we should schedule these differently depending on cluster size. If we had 100 workers, we'd schedule both graphs the same way (ignore the root task locations, because copying to new workers will give us more parallelism). And if we had 2 workers we'd also schedule both graphs the same way (consider root task locations because every worker already has a root task).

But I actually think all of these cases should be handled by the root task logic. I wrote more notes in e5175ce, but I think we should think of that as "fan-out logic" instead of "root-ish task logic", since it's really about this special case where we're crossing the boundary of cluster size. When we fan out from having fewer keys than workers to more keys than workers, the placement of those few pieces of input data is a poor guide for where the many pieces of output data should go. We have to copy the input data around anyway, so it's a good opportunity to reorganize things—that's what the current root-ish task logic is all about.

This PR is really about when some dependencies make a task seem root-ish, and others don't. Imagine overlaying the graphs

. . . . . . . .
| | | | | | | |
* * * * * * * *

and

. . . . . . . .
\ \ \ \ / / / /
       a    

so you have

. . . . . . . .   . . . . . . . .
|\|\|\|\|/|/|/|   |\|\|\|\|/|/|/|
| | | | a | | |   | | | | b | | |
* * * * * * * *   * * * * * * * *

The .s should never be considered root-ish tasks now—as linear chains, they should definitely take into account the location of the * tasks when scheduling. But if we have 5 workers, every worker will have a * task on it, but only 2 workers will have an a or b. In scheduling the first few .s, there's a tug-of-war between the a and the *—which do we want to schedule near? We want a way to disregard the a.

But—unlike in the simpler case above where we just have a tree—if we have 12 of these subtrees and only 5 workers, we still want to disregard where the as are, because they're much much less important than where the *s are.

And therein is the overall problem with this PR. In one case, we need to pay attention to the as (when they're the only dependency); in other cases, we want to ignore them (when there are other dependencies with few dependents and replicas). Dividing by the group length was an effective but kind of nonsensical way to do this. I think there are probably other better ways that are more logical, but might involve more change.

I'll note that I think amortizing transfer cost (#5326) is a more sensible way to do this. That actually makes sense. The problem is, it doesn't do much here, because transfer costs (even adding the 10ms penalty) are minuscule relative to occupancy. Weighing those two things, occupancy is usually far more significant.

So I also tried a couple ideas to counteract that. What if we could amortize occupancy in some way too? Basically to account for the opportunity cost of taking a spot on a worker that lots of other tasks could possibly run on, that might be better suited.

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 1c810f4a..4c8d97b9 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -3418,15 +3418,27 @@ class SchedulerState:
         nbytes: Py_ssize_t
         comm_bytes: double = 0
         xfers: Py_ssize_t = 0
+        alternative_tasks: Py_ssize_t = 0
         for dts in ts._dependencies:
-            if ws not in dts._who_has:
+            if ws in dts._who_has:
+                alternative_tasks += len(dts._waiters) - 1
+            else:
                 nbytes = dts.get_nbytes()
                 # amortize transfer cost over all waiters
                 comm_bytes += nbytes / len(dts._waiters)
                 xfers += 1
 
+        # If there are many other tasks that could run on this worker,
+        # consider how much we are displacing a better task that could run soon
+        # (unless we are that "better task").
+        # TODO wrong duration kinda
+        opportunity_cost: double = (
+            (alternative_tasks * self.get_task_duration(ts) / 2) if xfers else 0.0
+        )
         stack_time: double = ws._occupancy / ws._nthreads
-        start_time: double = stack_time + comm_bytes / self._bandwidth + xfers * 0.01
+        start_time: double = (
+            stack_time + opportunity_cost + comm_bytes / self._bandwidth + xfers * 0.01
+        )
 
         if ts._actor:
             return (len(ws._actors), start_time, ws._nbytes)

This didn't help much. It worked at the beginning, but once the a task was in lots of places, the opportunity_cost showed up on every worker.

I also tried something similar with stack_time *= alternative_tasks / len(ws.processing) to let tasks that could run in fewer other places cut in line more—this is basically another form of #5253. Didn't work either.

The more I've worked on this, the more I find myself fighting against occupancy. It's hard to find enough incentives or penalties to make a worker with 0.1s occupancy ever beat an idle one.

For fun, I ignored occupancy and just removed stack_time from worker_objective entirely. With only that, plus #5326 and gjoseph92@e5175ce, it passes all the tests in this PR.

As I think about it more, I feel more skeptical of occupancy as a metric to use in the worker_objective. The greedy goal of "get this task running as soon as possible" is sometimes counterproductive to running the whole graph smoothly, and therefore quickly.

Especially as we're thinking about STA, it feels maybe we should be more simple, dumb, and optimistic in initial task placement. Just follow graph structure. Then the load-balancing decisions to minimize occupancy, like worker_objective is trying to do eagerly right now, are more the responsibility of stealing. We also may be able to make these decisions better once things are already running and we've collected more information (runtime, memory, etc.).

None of this is really an answer, these are just all the things I've thought about trying to work on this.

[^1] well some workers will have 3 a*,b*,c* and others will only have 2. So once the ones with only 2 finish all their ...s they could maybe take some load from the ones with 3—but that's a work-stealing question.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your thorough problem description. I will try to address some of the points above but it will not be an exhaustive review.

So even though local topology is identical, we should schedule these differently depending on cluster size. If we had 100 workers, we'd schedule both graphs the same way (ignore the root task locations, because copying to new workers will give us more parallelism). And if we had 2 workers we'd also schedule both graphs the same way (consider root task locations because every worker already has a root task).

I agree. I want to point out, though, that parallelism is mostly not one of our concerns. In my mind, memory efficient scheduling will trump parallelism if we ever needed to make that call. However, I think the case where we are not parallelizing enough is happening less frequently.

In particular, there is also the case where transfer costs would kill us and just executing stuff sequentially on few workers will be faster. This case can only be handled by including runtime metrics, of course.

But I actually think all of these cases should be handled by the root task logic. I wrote more notes in e5175ce, but I think we should think of that as "fan-out logic" instead of "root-ish task logic"

I would love it if we had a relatively high level description documented somewhere about our strategic goals of scheduling. That would also include a description of what we mean when talking about root-ish task scheduling since I'm still struggling to classify those task. I imagine something similar to what we do with dask.order, see https://github.com/dask/dask/blob/9fc5777f3d83f1084360adf982da301ed4afe13b/dask/order.py#L1-L77

I believe https://distributed.dask.org/en/latest/journey.html#step-3-select-a-worker goes into this direction but is not thorough enough and likely out of date.

I'll note that I think amortizing transfer cost (#5326) is a more sensible way to do this.

I agree that that some kind of amortization or weighing might be a good approach to solve this. I'm still not convinced where to apply weights and what the correct weights are, though.

I also see another component factoring in, namely if we knew which dependencies are intended to be fetched on a worker, that could influence our decision as well. Maybe, instead of tracking combined occupancy, we should track "comm occupancy" and try to minimize this instead of total occupancy? Can we detect a "too small parallelism" scenario and switch metrics based on that?

To make things even more messy, I also want to point out that I believe we're double counting comm cost which might blow occupancy up unnecessarily, see

for tts in s:
if tts._processing_on is not None:
wws = tts._processing_on
comm: double = self.get_comm_cost(tts, wws)
old: double = wws._processing[tts]
new: double = avg_duration + comm
diff: double = new - old
wws._processing[tts] = new
wws._occupancy += diff
self._total_occupancy += diff
I might be wrong, though.

Basically to account for the opportunity cost of taking a spot on a worker that lots of other tasks could possibly run on, that

I don't dislike opportunity costs but at this point in time it is much too unclear how that cost factor would even look like. I would suggest to delay this for now since I don't think this will be strong enough to counteract the occupancy and will make this even less comprehensible.
If we had a good handle on what opportunity cost would actually be that might be a different story.

As I think about it more, I feel more skeptical of occupancy as a metric to use in the worker_objective. The greedy goal of "get this task running as soon as possible" is sometimes counterproductive to running the whole graph smoothly, and therefore quickly.

I'm open to reconsider this metric for initial task placement although this feels like a long term thing since it may be a highly disruptive change. FWIW we will likely need to reconsider the entire worker_objective for STA since it is based 100% on runtime information.

You mentioned that it would work good enough if we left out stack_time from the worker_objective. This sounds quite similar to what is proposed in #5253 since disabling it entirely without replacement would probably introduce many other problems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to point out, though, that parallelism is mostly not one of our concerns.

Totally agree. But taken to the extreme (never consider parallelism), then we'd do things like assign every singe task to the same one worker in a 100-worker cluster, leaving 99 others idle, because that worker holds one root dependency. I know you're not proposing that at all—just saying that since we do sometimes consider parallelism, we have to have some mechanism to decide where that "sometimes" line is. Agreed that having documentation of this would help.

As you said, it's hard to do without runtime metrics. I think what I'm proposing is that, when we only have graph metrics, a reasonable threshold for adding parallelism is when we have more tasks to schedule than total threads, yet purely following dependencies would leave some workers totally idle.

Maybe, instead of tracking combined occupancy, we should track "comm occupancy" and try to minimize this instead of total occupancy?

I've often wanted this metric. (Adds a bit of communication for the workers to tell the scheduler every time they fetch a key from a peer though, right?) And I think it's a good thing to minimize, though what we really want to minimize is memory. However, tracking this data would allow us to more preemptively avoid transferring to a worker which already has a lot of (bytes of) pending incoming transfers.

Can we detect a "too small parallelism" scenario and switch metrics based on that?

Probably not? Because tasks will prefer to schedule on the worker with no transfers, that worker's comm occupancy won't ever go up, so we'll keep scheduling there. The graph-structural approach would hopefully still avoid this though?

You mentioned that it would work good enough if we left out stack_time from the worker_objective. This sounds quite similar to what is proposed in #5253 since disabling it entirely without replacement would probably introduce many other problems.

With #5253 we do still consider occupancy, just more accurately. But as I showed in #5253 (comment) that causes a different sort of bad behavior. That's a good example of why I think earliest start time isn't quite the right metric. With #5253 we correctly identify that we can actually start the task sooner by transferring to a different worker and cutting in the priority line, because the transfer is so cheap. We do successfully minimize task start time. The problem is that it disrupts most of the subsequent tasks by shoving in line like that. The short-term gains come with much bigger long-term costs.

FWIW we will likely need to reconsider the entire worker_objective for STA since it is based 100% on runtime information.

Agreed. STA still feels like the place to start. I'm still intrigued by the idea of eagerly and optimistically assigning tasks by graph structure (since that's probably reasonable in the common case) and then load-balancing with runtime metrics.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter tried slightly rewording this metric in d0f0955 and it at least vaguely makes sense, though I'm still not sure we should be doing this PR at all vs just trying STA.

< len(valid_workers if valid_workers is not None else all_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a / 2 here? "almost everywhere" might be enough of a condition here.

for wws in dts._who_has
}
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
Expand Down
150 changes: 150 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c):
],
)
def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads):
if ndeps >= len(nthreads):
pytest.skip()

@gen_cluster(
client=True,
nthreads=nthreads,
Expand Down Expand Up @@ -237,6 +240,153 @@ def random(**kwargs):
test()


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4)
async def test_decide_worker_common_dep_ignored(client, s, *workers):
r"""
When we have basic linear chains, but all the downstream tasks also share a common dependency, ignore that dependency.

i j k l m n o p
\__\__\__\___/__/__/__/
| | | | | | | | |
| | | | X | | | |
a b c d e f g h

^ Ignore the location of X when picking a worker for i..p.
It will end up being copied to all workers anyway.

If a dependency will end up on every worker regardless, because many things depend on it,
we should ignore it when selecting our candidate workers. Otherwise, we'll end up considering
every worker as a candidate, which is 1) slow and 2) often leads to poor choices.
"""
roots = [
delayed(slowinc)(1, 0.1 / (i + 1), dask_key_name=f"root-{i}") for i in range(16)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for choosing delayed over futures?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. It made it easy to visualize the graph though. Would we prefer futures?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Typically I would prefer futures since if something goes wrong and we'd need to debug, futures contain fewer layers. However, the visualization argument is strong. I would recommend leaving a comment in-code to avoid overly eager refactoring down the line

]
# This shared dependency will get copied to all workers, eventually making all workers valid candidates for each dep
everywhere = delayed(None, name="everywhere")
deps = [
delayed(lambda x, y: None)(r, everywhere, dask_key_name=f"dep-{i}")
for i, r in enumerate(roots)
]

rs, ds = dask.persist(roots, deps)
await wait(ds)
gjoseph92 marked this conversation as resolved.
Show resolved Hide resolved

keys = {
worker.name: dict(
root_keys=sorted(
[int(k.split("-")[1]) for k in worker.data if k.startswith("root")]
),
deps_of_root=sorted(
[int(k.split("-")[1]) for k in worker.data if k.startswith("dep")]
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["deps_of_root"]

for worker in workers:
log = worker.incoming_transfer_log
if log:
assert len(log) == 1
assert list(log[0]["keys"]) == ["everywhere"]


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4)
async def test_decide_worker_large_subtrees_colocated(client, s, *workers):
r"""
Ensure that the above "ignore common dependencies" logic doesn't affect wide (but isolated) subtrees.

........ ........ ........ ........
\\\\//// \\\\//// \\\\//// \\\\////
a b c d
Comment on lines +303 to +305
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't put much thought into this question, therefore a short answer is more than enough.

Would your logic be impacted if the subtrees flow together again, i.e. they have a common dependent or set of dependents as in a tree reduction.

If the answer is "no, this logic doesn't go that deep into the graph" (which is what I'm currently guessing), that's fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it doesn't go any further into the graph.


Each one of a, b, etc. has more dependents than there are workers. But just because a has
lots of dependents doesn't necessarily mean it will end up copied to every worker.
Because a also has a few siblings, a's dependents shouldn't spread out over the whole cluster.
"""
roots = [delayed(inc)(i, dask_key_name=f"root-{i}") for i in range(len(workers))]
deps = [
delayed(inc)(r, dask_key_name=f"dep-{i}-{j}")
for i, r in enumerate(roots)
for j in range(len(workers) * 2)
]

rs, ds = dask.persist(roots, deps)
await wait(ds)

keys = {
worker.name: dict(
root_keys=set(
int(k.split("-")[1]) for k in worker.data if k.startswith("root")
),
deps_of_root=set(
int(k.split("-")[1]) for k in worker.data if k.startswith("dep")
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["deps_of_root"]
assert len(k["root_keys"]) == len(roots) / len(workers)

for worker in workers:
assert not worker.incoming_transfer_log


@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)] * 4,
config={"distributed.scheduler.work-stealing": False},
)
async def test_decide_worker_large_multiroot_subtrees_colocated(client, s, *workers):
r"""
Same as the above test, but also check isolated trees with multiple roots.

........ ........ ........ ........
\\\\//// \\\\//// \\\\//// \\\\////
a b c d e f g h
"""
roots = [
delayed(inc)(i, dask_key_name=f"root-{i}") for i in range(len(workers) * 2)
]
deps = [
delayed(lambda x, y: None)(
r, roots[i * 2 + 1], dask_key_name=f"dep-{i * 2}-{j}"
)
for i, r in enumerate(roots[::2])
for j in range(len(workers) * 2)
]

rs, ds = dask.persist(roots, deps)
await wait(ds)

keys = {
worker.name: dict(
root_keys=set(
int(k.split("-")[1]) for k in worker.data if k.startswith("root")
),
deps_of_root=set().union(
*(
(int(k.split("-")[1]), int(k.split("-")[1]) + 1)
for k in worker.data
if k.startswith("dep")
)
),
)
for worker in workers
}

for k in keys.values():
assert k["root_keys"] == k["deps_of_root"]
assert len(k["root_keys"]) == len(roots) / len(workers)

for worker in workers:
assert not worker.incoming_transfer_log


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)
async def test_move_data_over_break_restrictions(client, s, a, b, c):
[x] = await client.scatter([1], workers=b.address)
Expand Down