-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fail case] Almost-blockwise weighted arithmetic vorticity calculation #1
Comments
We've been running a benchmark that's very similar to your use case and when Gabe's worker saturation is enabled we've see the benchmark run ~75% faster while using ~60% less memory. I'd be interested to see what impact this has on your real-world workload. Happy to help out here however I can @TomNicholas |
The Ramba team has spent some time looking into this example. We didn't previously have the pad function but we implemented it and got this example working with Ramba. With a 4 node cluster with 36 cores per node (2 sockets) and 128GB of memory per node, with data size 5000x5000x300 we are seeing about 5s for ramba and about 65s for dask. Ramba seems to scale quite linearly and we can go up to 5000x5000x600 on these machines in main memory without thrashing. So, if your cluster is big enough then in terms of an in-memory solution ramba seems to be quite a bit faster than dask even if we reduce dask by 75% as in the above comment. |
Here's a statement of the full problem (including padding and rechunking) in xarray code, and how cubed performs on it (see the @DrTodd13 sorry for not replying, that sounds cool! I would love to try and run the same problem with ramba inside xarray, and get a 3-way dask vs cubed vs ramba comparison going. |
Motivation
@rabernat and I have a huge oceanographic dataset (LLC4320) with surface velocity components
u
andv
. We want to calculate the vorticity from these usingu*dx - v*dy
, which takes into account the size of the grid cells in the x and y directionsdx
anddy
.This is a specific example of a generic pattern in geoscience / fluid dynamics: apply a simple arithmetic operation weighted by some lower-dimensional constants. In our case we are approximating a differential operator but you might see this pattern in other cases such as weighted averages.
Full problem statement
The full problem we were trying to solve has an additional feature - a rechunking step where each variable gets one more element appended to the end of the array, and then immediately rechunked to merge that length 1 chunk back into the final chunk of the array. This is done by xGCM to deal with boundary conditions, but the only part relevant for this issue is that it's practically a no-op but is nevertheless enough to mean the overall graph is technically no longer "blockwise".
What specifically goes wrong
This computation displays a number of failure modes when executed with
dask.distributed
.Too many tasks - Doing anything with this LLC4320 dataset immediately involves hundreds of thousands of tasks, just because even one variable has 117390 chunks. This is not really going to scale much further.
Memory blowup - This is the big one, where the non-blockwiseness causes
distributed
to load chunks in a non-streaming fashion, so they consume memory faster than they are saved out, blowing up the memory overall. This problem is referred to as "root task overproduction" by Gabe Joseph, who has implemented some potential fixes todask.distributed
. However it's also arguably a symptom of a fundamental problem with dask not knowing enough about what happens in each task, and therefore being unable to make any guarantees about memory usage.Widely-shared dependencies -
dx * u
is an extremely one-to-many, where every chunk ofu
has to have access todx
. At large scale scheduling this can become a bottleneck fordistributed
(in a way I don't fully understand).Root-task co-location - For an in-memory executor like dask, corresponding chunks of
u
andv
should be co-located on the same processor, to improve performance and avoid a huge amount of unnecessary communication.General considerations
I think this problem is a useful benchmark because it exposes multiple failure modes of the dask.distributed scheduler at once, whilst also allowing you to switch any of these problems off or on by slight changes to the problem definition.
Starting from
(u - u)
, this would still be called viaapply_ufunc
in xarray, and will produce anO(N)
size task graph in dask. (It would also be nice if a high-level graph abstraction knew that this was a no-op...)(u - v)
introduces the problem of "root task co-location", where you want the scheduler to know that corresponding chunks ofu
andv
should be co-located on the same node from the start in order to avoid a lot of unnecessary communication.(u*dx - v*dy)
now has the "widely-shared dependencies" problem, wheredx
anddy
are much smaller thanu
andv
, but are required by all chunks ofu
andv
.Same again but with the rechunk step inserted as above is now basically the same graph, and should be evaluated in the same way, but has enough of a difference to no longer technically be blockwise. This non-blockwiseness is what causes distributed to fall back on the flawed algorithm that displays the "root task overproduction" problem.
Links
Benchmarks
dask.distributed
performs very poorlydask.delayed
for the scipy talk. (link) This involved pre-loadingdx
anddy
into memory.apply_gufunc
to express this in cubed? Maybe not... Supporting xarray.apply_ufunc cubed-dev/cubed#67The text was updated successfully, but these errors were encountered: