-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adjust transfer costs in worker_objective
#5326
base: main
Are you sure you want to change the base?
Adjust transfer costs in worker_objective
#5326
Conversation
I'd like to incorporate measured latency somehow too instead of a magic 10ms, but it's a start.
As discussed in dask#5325. The idea is that if a key we need has many dependents, we should amortize the cost of transferring it to a new worker, since those other dependencies could then run on the new worker more cheaply. "We'll probably have to move this at some point anyway, might as well do it now." This isn't actually intended to encourage transfers though. It's more meant to discourage transferring keys that could have just stayed in one place. The goal is that if A and B are on different workers, and we're the only task that will ever need A, but plenty of other tasks will need B, we should schedule alongside A even if B is a bit larger to move. But this is all a theory and needs some tests.
distributed/scheduler.py
Outdated
# amortize transfer cost over all waiters | ||
comm_bytes += nbytes / len(dts._waiters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an in-code comment explaining how this division amortizes cost? I assume this is again a "local topology" argument related to the fan-out tasks (#5325 (comment)) where we try to "ignore" tasks which will likely end up everywhere anyhow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. It's related to that, but actually a simpler idea. Basically, if we transfer to this worker now, that opens up the potential for N other tasks to run on this worker without transferring the data. So you could look at as, rather than this task paying the whole cost up front and others getting the benefit for free, all the sibling tasks split the cost of the transferring evenly between them. (That's an analogy of course—once transferred, the other tasks don't actually pay anything!)
Unit Test Results 16 files ±0 16 suites ±0 7h 38m 11s ⏱️ + 22m 54s For more details on these failures and errors, see this check. Results for commit e3d62f6. ± Comparison against base commit baf05c0. ♻️ This comment has been updated with latest results. |
# amortize transfer cost over all waiters | ||
comm_bytes += nbytes / len(dts.waiters) | ||
xfers += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# amortize transfer cost over all waiters | |
comm_bytes += nbytes / len(dts.waiters) | |
xfers += 1 | |
nwaiters = len(dts.waiters) | |
# amortize transfer cost over all waiters | |
comm_bytes += nbytes / nwaiters | |
xfers += 1 / nwaiters |
@gjoseph92 do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However this would not be replicable in get_comm_cost above
This should maybe be two PRs, since there are two different things happening:
Add a fixed (currently 10ms) penalty per transfer as discussed in Scheduler underestimates data transfer cost for small transfers #5324 (comment). This should help discourage small transfers. I'd prefer if this cost weren't just a magic 0.01 number though.
Amortize the transfer cost by the number of waiters. This is related to Ignore widely-shared dependencies in
decide_worker
#5325. See the commit message b4ebbee for more description:I haven't tested this at all yet; it's just a theory right now. Just looking for thoughts.
black distributed
/flake8 distributed
/isort distributed
cc @fjetter @crusaderky @mrocklin