Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust worker batched send interval to cluster size #5036

Closed
wants to merge 2 commits into from

Conversation

gjoseph92
Copy link
Collaborator

Dynamically adjust how often workers send updates to the scheduler based on the size of the cluster.

See #4881 (comment) for context.

Initially, I thought about using the scheduler's current CPU utilization as the scaling metric (rather than purely cluster size), since that's the resource this is actually trying to conserve. However, CPU is quite spiky, making it both hard to test and possibly a poor metric without some more complex code to dampen that spikiness.

I currently have the values of 5ms-100ms range, targeting 5k scheduler messages per second. This is roughly based on the fact that a 20ms interval seemed decent for a 100 worker cluster in #4881 and on @mrocklin's comment here #4881 (comment). These numbers are provisional, and I want to do more testing before merging this.

  • Tests added / passed
  • Passes black distributed / flake8 distributed / isort distributed

gjoseph92 added 2 commits July 3, 2021 21:28
- Both size and CPU based metrics in here. CPU kinda makes more sense to me, but should be weighted avg'd or something.
- CPU test is way too unstable
- Test is a little slow because of scale up
@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Jul 13, 2021

Update: these results are completely invalid and should be ignored; see my next comment.

I'm sad to say that initial tests show this doesn't seem to affect performance. But I'll say that's a maybe, because I'm not yet sure my testing methodology is sound.

facets

To try to pick good values for the new interval, I ran a shuffle workload (imperfectly scaled by cluster size) on Coiled clusters, doing a parameter sweep over cluster size and batched-send interval. The values I tried are here.

Because of the increasing-duration-with-reruns problem noted in #4987 (comment), I'm only running each computation once on a cluster, then throwing it away and starting a new one. I have a feeling that's contributing to the variation here. In fact, instance types, etc. could even be changing between runs (since Coiled will over-provision if instances matching your request aren't available). Also, I'm not sure that linearly rescaling the number of tasks in the workload to cluster size makes sense, since the communication costs of the shuffle are O(n^2) on the size of the cluster, right?

Nonetheless, grouping by cluster size, the batched send interval seems to have very little effect on performance? Or maybe there's too much noise. Full data is here.

More plots

Looking at the batched send interval categorically (constant tick size on the x-axis), there's maybe some sort of pattern around the 15-30ms range, but it seems dubious.

catplot

violin

@fjetter
Copy link
Member

fjetter commented Jul 13, 2021

Also, I'm not sure that linearly rescaling the number of tasks in the workload to cluster size makes sense, since the communication costs of the shuffle are O(n^2) on the size of the cluster, right?

Well, the number of edges in the graph should scale with O(N log N) where N is the number of input partitions. That typically breaks up the graph into 2-5 stages and you can expect a full data copy per stage. I believe the number of workers should not have an impact here (assuming there is more than one) but I'm not 100% sure. I doubt it is quadratic.


I would expect a performance improvement to appear on the limit of scheduler CPU saturation. Looking at the numbers in your table, I wouldn't expect the scheduler to be actually saturated for a long enough time for this to make a big difference. The largest graph you're working with is "only" 120k tasks and runtimes are in the minute range. Maybe I'm wrong here but I would try to push this harder, i.e. one big graph, medium to large cluster as a setup. On this setup I would do a parameter scan over the send interval

I'd also be interested in two more things in this area but I don't want to blow up the scope of your investigation so feel free to ignore this:

  • How large is the typical buffer on a worker once we send (bytes and num messages)
  • Do different compression algorithms make a difference? Are you using any? (idk what we install by default tbh)

@gjoseph92
Copy link
Collaborator Author

Update: as expected, there was a bug in my benchmarking code (df.persist() vs client.persist(df); it was actually running the entire thing on the client!). Ignore the results from my last comment.

Here are some actual results. I would have much more, but accidentally lost the results from a bigger run. I can rerun this and get more data to smooth out the noise if we'd find that helpful.

results-regression-runtime

Lengthening the batched send interval decreases overall runtime, significantly so for larger clusters. Unsurprisingly, it slightly increases runtime for small clusters, which are likely not scheduler-bound, so we should be cautious of this.

I don't have enough data (it's too noisy) to confidently say:

  • how much it improves runtime
  • what a good target interval would actually be

I also think this is interesting and haven't decided what to make of it yet:

image

Looking at tasks per second per worker, larger clusters are just... bad. A 10-worker cluster can have 8x higher throughput per worker than a 100-worker cluster.

I'm not sure if this measurement is fair though, because I'm still scaling the workload by the number of workers. Maybe it makes sense that the difficulty of a shuffle will scale non-linearly?


How large is the typical buffer on a worker once we send (bytes and num messages)

Here are histograms of the buffer lengths (number of messages) at every comm send:

batched-sends

And nbytes sent:

image

The largest graph you're working with is "only" 120k tasks and runtimes are in the minute range

I've increased the graph sizes across the board; the largest is now 762k tasks.

Full results:

@fjetter
Copy link
Member

fjetter commented Jul 23, 2021

These results look great. Even though you cannot tell for sure what the exact results will look like, taking the 100 cluster size as an example it looks like one can conservatively state that you manage to reduce runtime by at least 10-20%. I guess this is only as dramatic because shuffle puts a lot of stress on the network. I doubt this will be as dramatic for less network intensive, but high task throughput graphs. You could test this on some non-fused embarrassingly parallel workload. High throughput, (every task transition is still orchestrated by the scheduler) but low network load.

I love the histograms, by the way. Is this some fragile code or something we could contribute? I have to admit these distributions are much wider than I anticipated. What's nice is that even for larger intervals, we're still in the ~10-100KB range for one batch which I would assume is a nice package size. We can also see how the default configuration causes us to have barely any batching since in about 30% of all cases we have a single message.

Do you have any particular tests in mind you'd like to run before we commit to a scaling function or what would you suggest as next steps? I don't think we'll ever find the perfect setting, just a slightly less bad one :)


I also think this is interesting and haven't decided what to make of it yet:

I'm not too surprised, see https://coiled.io/computers-are-not-as-parallel-as-we-think/ ;)

@gjoseph92
Copy link
Collaborator Author

I've collected some more data and am feeling more unsure about this. The data is very noisy. There seems to be more variation within a particular batched send interval than between them.

I've gone all the way to a 1,000ms batched send interval hoping to find a clear point where performance drops off.

Full results (note y-axis is different for each subplot):
results-all
Same data, with linear trendline (note this changed the y-scales):
results-full

Maybe in fact the performance drop-off happens quickly, and beyond 100ms or so the difference is irrelevant, so we want to just look at the shorter intervals?

Here's just 2ms - 100ms:
results-0:100

Intuitively, I still feel like the batched send interval is too short, and that it makes sense to scale it dynamically. However, I'm not seeing strong evidence for this in the data.

If we're looking for the inflection point, maybe we should just look at the buffer size histograms. The pattern changes most significantly from 2ms to 25ms, from "very little batching" to "normal-ish distribution of batch sizes centered around 10". Beyond that, it's the same normal distribution, just marched further to the right.

batched-sends

I'm still not sure what number to put here in this PR (target number of messages received by the scheduler per second). Anyone have any ideas, or ideas for how to decide? Should we set this PR aside and maybe just bump the interval to 5ms or 10ms?

A last thing: here are plots converted into that metric (scheduler_messages_per_sec = cluster_size / batched_send_interval_s).

Full results (note x is log-scale):
results-mps-full
scheduler_messages_per_sec < 10000:
results-mps-sub10k

I do realize I my data doesn't cover this metric's range very well. Here's the distribution I've recorded:

cce18b5a-3a05-464d-802b-268d90e85422

Perhaps it would be worth running benchmarks under this PR, and try a few scheduler_messages_per_sec numbers in range(0, 10_000, step=1_000)? That would be pretty quick to do.

@fjetter
Copy link
Member

fjetter commented Jul 29, 2021

I'm not sure if I have a good enough intuition about scheduler_messages_per_sec = cluster_size / batched_send_interval_s to read much off these plots. If anything it looks inconclusive as well. For lower numbers we do not see a change and for higher numbers the error is too large.

It might just be that our premise is wrong. While I second the intuition that for a large number of workers the interval should be larger I'm not 100% convinced anymore. The scheduler is still able to accept incoming network traffic to some extend even if it is busy, that's the entire point of using asyncio. Maybe this is why we do not see a lot of difference because the event loop buffers all of this. Just because the interval is set to 2ms that doesn't mean that it actually is able to submit every 2ms since the loop might be busy doing something else (see tick_duration).

Another error source is the network itself. I would assume that in a busy datacenter latencies and bandwidth vary over time. We do measure both but factoring these into this analysis is not trivial. However, it may be necessary to actually scale this interval based on network health instead of cluster size, idk.

Ultimately, do your examples provoke any work starvation? As long as the worker ready queue is full, I assume the communication between scheduler and worker is not a hard bottleneck. After all, the workers will do some work all the time even thought it might not be the optimal order of work. We might observe different memory usage if that's the case but probably only very minor variations.

I'm not sure how to proceed here. There is likely something underneath this data but I'm starting to think that it will require an insane amount of data collection and non trivial error estimations in this huge parameter space to get to the bottom of it. Factoring all of this in is worthy a PhD thesis and I'm inclined to put this experiment to rest until we have a more solid foundation for the experiment itself, e.g. a reproducer we are very confident to be throttled by scheduler<->worker latency

Thoughts?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants