-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Speed up Bucket get_min
and get_max
#368
Conversation
The method sounds reasonable to me, but the result is wrong: It seems that |
@pnuu I fixed the bug and the result looks well now: However, the speed is slow again ... I created a simple example below to test what leads to slow speed. To reproduce this example, you can copy them into jupyter notebook cells and execute them. Cell1: Create sample dataClick to show the code
Cell2: Calculate bin_min using pure numpyClick to show the code
Cell3: Calculate bin_min using dask and xarrayClick to show the code
Cell4: Calculate bin_min using pandasClick to show the code
|
Codecov Report
@@ Coverage Diff @@
## main #368 +/- ##
==========================================
+ Coverage 93.89% 93.95% +0.06%
==========================================
Files 65 65
Lines 11130 11269 +139
==========================================
+ Hits 10450 10588 +138
- Misses 680 681 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The algorithm needs to be broken up into different parts. It seems there are parts that are chunk-friendly and parts that aren't. It also seems like you've tried converting the SO answer into dask by switching some np.
calls to da.
. In some cases this may be unnecessary as numpy will call dask's methods properly. In other cases it has no effect or is actually worse. For example, da.empty
will create an empty dask array, but then you need to do assignments on it which will generally not perform well with dask.
You may also want to look at dask's blockwise
if your algorithm ends up going from some chunked array shape to another chunked array shape which is typical for resampling. For example if you had input arrays to an algorithm that were (bins, y, x) and needs to go to (bins, y2, x2) then blockwise
would be a good fit. https://docs.dask.org/en/latest/array-api.html#dask.array.blockwise
If you can separate the part of the algorithm that is dealing with multiple chunks at a time (sorting, assigning to random areas of the output array, etc) into its own function and then wrap that in delayed, you can do whatever you want with numpy functions inside that function.
The algorithm you were provided may be the fastest numpy algorithm and it will still perform well in a delayed function but there is probably another algorithm (or a version of this algorithm) that would use dask chunks in a better way. Even if they did this it might not perform as well as the pure numpy version, but it would at least be able to operate per-chunk and never load more data than it needed. How this dask-friendly algorithm would be coded is not obvious and would probably not be worth the effort unless you want to use the opportunity to learn how dask graphs work (really not that bad).
This would be much easier to talk about in person with a whiteboard in front of us.
Thank you very much, @djhoese. The explanation is quite clear and I will try my best to improve this PR for dask tomorrow ;) |
@djhoese Following your suggestions, I modified the creation of empty array and switched to |
@djhoese The purpose of Iterating the blocks is to deal with the memory error problem. Now, I moved them into a delayed function and get the memory like this now:
|
What size data are you running this with (source size, target area size)? If the algorithm really is trying to take this much memory then it needs to be rewritten for full dask chunking. I'm sorry I don't have the time to help with this more. It sounds interesting and there is likely something that can be done...I just don't have the time. |
@djhoese The source and target area have the same size (ABI CONUS 2 km): 1500*2500 = 3750000.
|
Em ... It seems that the test error |
@gerritholl @zxdawn What is the current state of this PR as far as performance? Are there still some things you can't explain or you expect or want to be faster? Or do you think you've caught all the edges cases and it is as fast as it can reasonably be? |
I can't promise we've caught all edge cases and it's as fast as it can possibly be, but as far as I'm concerned we've made very good progress. I am satisfied with this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks pretty nice. Good job.
I had a couple small suggestions and then I have one big one that I would like tested out either in this PR or a promise that it will be done in a future PR. Right now the general flow of the dask arrays goes like this:
data -> delayed(_sort_weights) -> indexing and linspace -> delayed(_get_bin_statistics) -> reshape
To me there isn't much benefit to having that middle indexing and linspace section of code. The only benefit I see is if the indexing is much faster when done in parallel with dask, but since the indexes are coming from a delayed function (no real chunking) and it is then fed into the second delayed function right away. I think all this logic should be one giant delayed function. It'd be nice if it could be done with map_blocks
but I don't see a way to do that. The only reason not to do this would be if combining all of these resulted in a large memory usage because all of the temporary arrays (del idxs_sorted
might help with that).
Lastly, while looking at how self.idxs
is generated, I think that entire function (_get_indices
) should be turned into a map_blocks call. There are a lot of little intermediate operations there that are just adding more overhead to the dask graph. However, now that I say that I see that y_idxs
, x_idxs
, and idxs
are all assigned to self
in this method so maybe it isn't as pretty as I had hoped. Wait! No, y_idxs
and x_idxs
are never used outside this method, right? Having a map_blocks
function that took lons
and lats
as inputs and returned the equivalent of self.idxs
should clean up the dask graph a bit and I would hope improve memory usage and maybe execution time. Bare minimum the dask graph would look better.
Co-authored-by: David Hoese <david.hoese@ssec.wisc.edu>
In the bucket resampler, use the direct numpy dtype rather than the detour via a string
They're not used within this class, but they're exposed as public API, so removing them would affect backward compatibility. This behaviour is currently tested in |
No, I don't think |
In the bucket resampler, simplify the implementation of get_max to use only one rather than two dask.delayed functions.
|
||
return weights_sorted[weight_idx] # last value of weigths_sorted always nan | ||
|
||
@dask.delayed(pure=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E302 expected 2 blank lines, found 1
pyresample/bucket/__init__.py
Outdated
# set bin without data to fill value | ||
statistics = da.where(counts == 0, fill_value, statistics) | ||
statistics = da.from_delayed( | ||
_get_statistics(statistic_method, data, self.idxs, out_size), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
E126 continuation line over-indented for hanging indent
I tried if bringing the |
For the bucket resampler get_max/get_min, another (slight) speedup by reshaping inside rather than outside the dask delayed function. My test runs about 0.8 seconds faster on my system, but the dask graph is still dominated by a single task taking 60% of the time.
So the entire top portion (or most of it) of that dask graph is the |
...and yeah until we come up with a better algorithm I'm not sure we can get away from the single task doing most of the work. That said, there is that |
In the spirit of incremental PRs that do one thing, I'm a bit hesitant to break open the |
I'm fine with that. |
git diff origin/main **/*py | flake8 --diff
The pandas method is quite slow. I come up with the faster method by combing
numpy.ufunc.reduceat()
andxr.apply_ufunc()
.