-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance regressions after queuing PR #295
Comments
Root task overproduction is an observation but not a cause. As you are telling us already,
|
I was under the impression that the queuing PR was supposed to have no impact at all if you leave |
|
First of all, a bit of magic after running the computation once def get_stim_logs(dask_worker):
return list(dask_worker.state.stimulus_log)
stim_logs = client.run(get_stim_logs) then reconstruct the worker state at a given point in time and have fun! addr = 'tls://10.0.1.197:37555'
def reconstruct_state(addr, ix_max=0):
"""
Reconstruct the worker state at a given time ``ix_max``
"""
ws = WorkerState(threads={}, address=addr, nthreads=2)
ws.validate = False
instructions = []
try:
log = stim_logs[addr]
# print(f"Available stimuli: {len(log)}")
for ix, stim in enumerate(log):
if ix_max and ix > ix_max:
break
instructions.append(
ws.handle_stimulus(stim)
)
except:
# There is some assertion error during forgetting
# Story looks as if we tried to forget twice
pass
return ws
ws = reconstruct_state(addr) I reconstructed the worker state for two of the workers and ran the following. The following returns the top priority task on the workers ready heap immediately after a The following function inspects the worker state for all from collections import Counter
def get_top_ready_heap(addr):
from distributed.worker_state_machine import ComputeTaskEvent
log = stim_logs[addr]
sub_stimuli = []
for ix, stim in enumerate(log):
if isinstance(stim, ComputeTaskEvent):
if "sub" in stim.key:
sub_stimuli.append(ix)
top_ready = []
for sub_stim_ix in sub_stimuli:
ws = reconstruct_state(addr, ix_max=sub_stim_ix)
if ws.ready:
stimulus = log[sub_stim_ix]
sub_ts = ws.tasks[stimulus.key]
ts = ws.ready.peek()
missing_deps = Counter({dep.state for dep in sub_ts.dependencies})
top_ready.append((ts, missing_deps))
return top_ready I'm inspecting two workers The first one is basically the "doesn't us any memory" worker of the above gif. The other one is just a random other worker. Worker without significant memoryAt the very least the first couple of tasks are scheduled perfectly. There are a couple of bad apples but overall this worker is getting great task assignments
Worker with significant memoryRight from the start, this worker is having a bad time. Every
Edit: There was a bug in my script showing only a dependency count of one. I accidentally used a set on the states. It's corrected now. |
If that's not interesting enough, the worker that is doing really well has the "smallest" IP address if we sort them all lexicographically. The workers dict on the scheduler is a somewhere in our decision making logic we prefer the "first worker" and we are apparently doing a great job of initial task placement for that worker but this quickly breaks down for other workers. Edit: There was a bug in my script showing only a dependency count of one. I accidentally used a set on the states. |
This is the main thing I'm curious about. Here's a screenshot from 2s before the side-by-side I posted. Both versions are at ~740 BeforeAfterNotice how 'before', 200 |
Replaying worker state like this is amazing. We should do this for the commit before and compare though. The difference in I'm not sure it's only that |
dask/distributed#6614 causes a significant performance improvement to
test_vorticity
:And a significant performance regression to
test_dataframe_align
:I think the key change is fixing dask/distributed#6597.
test_vorticity
comes from dask/distributed#6571. This workload is the one where we discovered the co-assignment bug that's now fixed. So it's encouraging (and not surprising) that improving co-assignment significantly reduces transfers, and improves performance.test_dataframe_align
is a bit surprising. You'd think that assigning matching partitions of the two input dataframes to the same worker would reduce downstream transfers—which it indeed does.Worth noting: in my original benchmarks,
test_dataframe_align
was probably the most affected by root task overproduction out of everything I tested:I ran it manually a few times both before and after the queuing PR. I also tried turning work stealing off, but it didn't make a difference.
Before
After
If you stare at these GIFs for a while, you can notice that:
Here's a comparison between the two dashboards at the same point through the graph (785
make-timeseries
tasks complete):This is also a critical point, because it's when we start spilling to disk in the "after" case. The "before" case never spills to disk.
You can see that:
repartition-merge
tasks in memory (top progress bar) in the "after" case. Nearly every executed task is in memory. Compare that to the before case, where half have already been released from memory.sub
,dataframe-count
, anddataframe-sum
executed. These are the data-consuming tasks.sub
s have just been scheduled. A moment before, none of those were in processing. So sadly, they arrive just a moment too late to prevent the workers from spilling to disk.The simplest hypothesis is that the
repartition-merge
s are completing way faster now, since they don't have to transfer data. Maybe that increase in speed gives them the chance to run further ahead of the scheduler before it can submit thesub
tasks? This pushes the workers over the spill threshold, so then everything slows down.I know I tend to blame things on root task overproduction, so I'm trying hard to come up with some other explanation here. But it does feel a bit like, because there's no data transfer holding the tasks back anymore, they are able to complete faster than the scheduler is able to schedule data-consuming tasks.
What's confusing is just that
repartition-merge
isn't a root task—and we see relatively fewmake-timeseries
orrepartition-split
s hanging around in memory. So why is it that schedulingsub
s lags behind more?The text was updated successfully, but these errors were encountered: