-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move CPU Bound Tasks off Tokio Threadpool #13692
Comments
I considered something like this when looking into how to solve the problem and I think the sticking point is:
It's relatively easy to, at a low level, know where IO calls are happening. In fact Tokio lets you configure the runtime to panic if you do IO at all (which makes it easy to be almost completely certain you didn't miss a spot). |
Yeah, there are tools like tokio-console and the runtime metrics that could help to find problematic codepaths, but I'm not going to pretend it is trivial. I do think it may end up being simpler, and potentially a better long term solution, than dispatching the async work. |
Couple of thoughts:
I'm wondering if we should rather write our own async scheduler for the async compute graph that can better deal with the DF workload. For IO, we still can (and should) use tokio, but I question if shoehorning tokio into the CPU-bound workload is really worth it. And FWIW: such a scheduler can -- at least to some extend -- be pull OR push-based (requires some wiring, but not impossible). |
The performance implications definitely concern me, I have a nagging suspicion block_in_place spawns a thread... An arguably better solution would be to instead spawn the CPU bound work to a threadpool like rayon, but this would require any arguments to be
This sounds a lot like #2199, I am not entirely sure such an approach is feasible at this point. |
Yes, agree broadly speaking with @adriangb Almost all of DataFusion's code is CPU bound and as @tustvold and others have observed before the locations where IO is done is (relatively) isolated in comparison. I can try to take another shot at #13690 to more fully annotate IO in DataFusion. |
IMO the key challenge is async is an abstraction designed for IO concurrency, not CPU parallelism. This is what ultimately makes it so difficult to disentangle in a coherent way. Whilst we can dispatch the "IO" off, the boundary inevitably ends up fuzzy, an async task doesn't have a single start and end, inevitably leading to complex shenanigans around lifetimes, buffering, etc... I'm increasingly of the opinion that having CPU-bound futures fundamentally breaks their abstraction, and rather than fighting this, DF would be better served by avoiding such things in the first place. Instead it can use async / tokio for what it is designed for, and move the CPU bound work somewhere else. I don't actually think this would be that bad, most kernels are synchronous and could be offloaded without too much ceremony. It would also be largely mechanical, and something that could easily be actioned by a broad contributor base. I'd be happy to write up an example of what this might look like if there is interest in taking such a direction Edit: It is also potentially worth highlighting that this would naturally open the door to parallelism beyond that afforded by the plan, i.e. morsel driven parallelism. |
LOL I feel like this is not a new opinion (e.g. #2199) I agreed to disagree with you on this 😆 I realize the current use of async tasks is non ideal, but it seems to be working just fine for performance. So, attempting to be pragmatic, I am going to spend my effort improving the current situation rather than trying to undertake a larger refactoring, unless there is something that is not possible (however non ideal) in the current design. |
Yes the current model works perfectly fine when there is little to no IO and sufficient plan parallelism to exploit, I don't dispute this. There is likely more going on here, but I can't help notice DF doesn't perform nearly as well on "cold" data
Sure I have made no secret of disliking DF's use of async, and that we have disagreed on this in the past, however, I guess my hope is that we might have reached critical mass of pain to actually do something about it. This is no longer me just moaning about a theoretical issue. Ultimately moving the CPU bound work off tokio isn't that complex, and the DF community has successfully pulled off far more complex initiatives (e.g. StringView). Doing so would simplify integrating DF into existing codebases, eliminate whole classes of footgun, and allow for greater parallelism than is currently possible. Edit:
I don't think there is a good story for intermixing IO with DF, and there is plenty of empirical evidence to this effect. We at Influx were aware of the need to run separate threadpools, and yet we still lost months to this class of issue. Blessing an approach we empirically know caused significant pain, in spite of significant investment to avoid the issue, seems unfortunate. |
It works in an artificial benchmark that likely runs on an oversized machine. So like most benchmarks, it's a rather biased PoV.
💯 |
This is the core challenge in my mind. I would love to believe this project isn't that complex, but I have yet to see someone show how this would work practically with DataFusion (as well as trying myself). I am more than willing to coordinate this work once we have a pattern to apply.
I agree with the first two points (more parallelism I am not sure). I would very much like this to happen and am willing to invest a significant amount of effort to to do. I just don't know how to do it. |
I think moving to a push-based execution model as #2199 attempted is likely intractable at this stage. However, I think there is a compromise where just the kernels are dispatched, I can probably try to find some time to write up something more concrete in the next few days if there is interest |
For my part, I'm going to spend the next few hours seeing if I can boil down an initial reproducer per the ask here. Hopefully it will help to show concretely where the current architecture falls over and test prototype approaches against. |
That'd be very useful, Influx also had a reproducer but it uses customer data and I no longer have access to it having left, but Marco or Andrew can probably run it as well. Adding blocking sleeps at random points can be one way to make the problem more obvious |
This would be super helpful. There are many open source datasets on https://huggingface.co/datasets -- maybe we can find some suitably large ones to run queries against, and throttle the network bandwidth somehow (blocking IO sleeps as in an ObjectStore wrapper, as @tustvold suggests) |
I managed to find my toy experiment from when I was investigating this at Influx, that I think quite nicely demonstrates the issue. I've pushed it up here - https://github.com/tustvold/io_stall The key thing is that even with extremely low concurrency, well below the number of threads available to tokio, the presence of blocking tasks on the same runtime starts to negatively impact IO throughput |
I've pushed a simple example to io_stall that glues together rayon and async_task to yield an async scheduler that is able to accommodate CPU bound tasks whilst not starving IO.
vs
I am sure there are ways to improve this, but I think this has some quite interesting properties, in particular:
However, it is important to highlight that with this approach IO will still degrade poorly once CPU resources are saturated. Where there is a clear IO boundary, e.g. AsyncFileReader::get_bytes, it may still be worthwhile to spawn that as a dedicated tokio task so that it can run to completion without needing to "find time" on the rayon pool. However, this can be done as an optimisation if people run into such issues. TLDR this avoids the major issue where concurrency nose dives long before CPU resources are saturated, without requiring complex shenanigans, and opening the door to greater intra-operator parallelism (e.g. using par_sort). Edit: The first version of this had incorrect numbers for the tokio threadpool, the benchmark was incorrect, this has now been fixed |
I hope to review this over the next day or two. Thank you |
@tustvold, I gave this a thorough read today and plan to test out the approach soon. The only thing I'm a little flaky on is the use of the call to |
This is a very interresting issue.
Honestly, I don't get why would be significant difference between the 2 as both apps seem to work the same way. Where I do see difference is
But that's because of the aforementioned Edit: Maybe a different advantage of rayon over tokio is that it offers structured concurency via scoped spawning and so we can avoid the |
It was a quick demo cobbled together in a couple of hours, I wouldn't read too much into what it is doing 😅. In this case I probably just did what was easiest
Once the concurrency exceeds the pool size starvation is inevitable, and isn't the issue people have been running into. The issue is tokio starving at low concurrency, I believe to do with the way it polls its reactor. Unfortunately the reproducer had a bug, which I fixed, but now it isn't as obvious, perhaps it never was a good reproducer. What probably needs to happen is someone to try this on one of the real workloads that had this issue and see if it helps. Unfortunately I have no more time to dedicate to this
1000% this, it also has a load of parallel operations like sorts out of the box |
(take with a grain of salt because I haven't worked with
But it is possible that using Additionally, as mentioned in its docstring, let (res1, res2) =
join!(
async {
task::block_in_place(...);
},
async {
// something async
...
},
); ... then the two async blocks may end up executing sequentially, because the entire task running the |
RE #13692 (comment) : My concern here is that this thread switch (i.e. the tokio worker gets a new thread from the pool) is potentially expensive, esp. in a hot loop. It's also a potential synchronization point, because the thread pool is shared across all workers. So while this may be Ok if you do that occasionally, I don't think this is gonna yield good results if you do that e.g. for every record batch. |
Sorry, but to be 1000% clear: The reason that additional threads might be spawned is not for the task which is running a line of code which hit a |
Here is another PR with an example showing how to move CPU work to another threadpool: |
Is your feature request related to a problem or challenge?
DataFusion currently performs CPU-bound work on the tokio runtime and this can lead to issues where IO tasks are starved and unable to adequately service the IO. Crucially this occurs long before the actual resources of the threadpool are exhausted, it is expected that if there is no CPU available that IO will be starved, what we want to avoid is IO getting throttled when there is resource available.
The accepted wisdom has been to run DataFusion in a separate threadpool and spawn IO off into a separate threadpool, as described here. #12393 seeks to document this, and by extension #13424 and #13690 to provide examples on how to achieve this.
Unfortunately this approach comes with a lot of both explicit and implicit complexity, and makes it hard to integrate DataFusion into existing codebases making use of tokio.
Describe the solution you'd like
The basic idea would be, rather than move the IO off to a separate runtime away from the CPU bound tasks, instead wrap the CPU bound tasks so that they can't starve the runtime.
Ultimately DF has three broad class of task:
The vast majority of code in DF falls into the fully synchronous bucket (1.), and so it is a natural response to want to special case the less common IO (2. / 3.) instead of the more common CPU-bound code. However, I'd like to posit that conceptually and practically it is much simpler to dispatch the synchronous code than the asynchronous code, and that the items falling into the second bucket are the most complex operators in DataFusion.
This would yield some quite compelling advantages:
This could be achieved using the native primitives
One way to achieve this would be to use block_in_place, as unlike spawn_blocking it does not require a
'static
lifetime and can therefore borrow data from the invoking context.That being said this does come with some risks:
An arguably better solution would be to dispatch the work using an actual CPU-threadpool such as rayon.
Describe alternatives you've considered
We could instead move IO off to a separate runtime
Additional context
No response
The text was updated successfully, but these errors were encountered: