Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up Bucket get_min and get_max #368

Merged
merged 28 commits into from
May 6, 2022
Merged
Changes from 3 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c9f90a1
speed up get_min/get_max using ufunc.reduceat
zxdawn Jun 5, 2021
e467a33
fix name bug
zxdawn Jun 5, 2021
569951f
fix wrong index and bins
zxdawn Jun 6, 2021
7b5cf79
use argmax and manual chunksize
zxdawn Jun 15, 2021
ad75b93
change chunk_size
zxdawn Jun 15, 2021
4c39f6b
use dask from_delayed
zxdawn Jun 16, 2021
fa2de01
move into delay
zxdawn Jun 18, 2021
2273dda
delete useless package
zxdawn Jun 18, 2021
815ef39
use numpy function: np.digitize and np.unique
zxdawn Jun 21, 2021
c9dbd55
resolve conflict
zxdawn Feb 10, 2022
e727919
resolve stickler-ci
zxdawn Feb 10, 2022
b98efa1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 10, 2022
81259de
resolve flake8
zxdawn Feb 10, 2022
56b8cdd
Test that get_min, get_max have no computes
gerritholl May 2, 2022
b85f4e2
Move delayed function outside of class
zxdawn May 2, 2022
5640a15
Simply missing data handling in get-max/min
gerritholl May 2, 2022
a3e9126
Remove outdated note in bucket resampler
gerritholl May 4, 2022
f7cfc5e
Apply suggestions from code review to adapt _sort_weights
gerritholl May 5, 2022
1aa2779
Small refactoring in bucket resampler
gerritholl May 5, 2022
c2b4596
Use dtype int64 directly in bucket resampler
gerritholl May 6, 2022
1b21364
Use direct dtype in bucket resampler
gerritholl May 6, 2022
57fa302
Simplify get_max from two to one dask.delayed call
gerritholl May 6, 2022
fb9aa56
PEP8 fixes
gerritholl May 6, 2022
4ee4ece
Slight speedup by reshaping inside dask.delayed
gerritholl May 6, 2022
a0bd7f5
PEP8 fixes
gerritholl May 6, 2022
c102262
Remove redundant reshape, remove breakpoint
gerritholl May 6, 2022
eb0227e
isort fixes
gerritholl May 6, 2022
8106676
Remove dead code
gerritholl May 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 51 additions & 31 deletions pyresample/bucket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,8 @@ def _mask_bins_with_nan_if_not_skipna(self, skipna, data, out_size, statistic):
statistic = da.where(nan_bins > 0, np.nan, statistic)
return statistic

def _call_pandas_groupby_statistics(self, scipy_method, data, fill_value=None, skipna=None):
def _call_bin_statistic(self, statistic_method, data, fill_value=None, skipna=None):
"""Calculate statistics (min/max) for each bin with drop-in-a-bucket resampling."""
import dask.dataframe as dd
import pandas as pd

if isinstance(data, xr.DataArray):
data = data.data
data = data.ravel()
Expand All @@ -209,27 +206,50 @@ def _call_pandas_groupby_statistics(self, scipy_method, data, fill_value=None, s
# Calculate the min of the data falling to each bin
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
out_size = self.target_area.size

# merge into one Dataframe
df = dd.concat([dd.from_dask_array(self.idxs), dd.from_dask_array(weights)],
axis=1)
df.columns = ['x', 'values']

if scipy_method == 'min':
statistics = df.map_partitions(lambda part: part.groupby(
np.digitize(part.x,
bins=np.linspace(0, out_size, out_size)
)
)['values'].min())

elif scipy_method == 'max':
statistics = df.map_partitions(lambda part: part.groupby(
np.digitize(part.x,
bins=np.linspace(0, out_size, out_size)
)
)['values'].max())

# fill missed index
statistics = (statistics + pd.Series(np.zeros(out_size))).fillna(0)
def numpy_reduceat(data, bins, statistic_method):
'''Calculate the bin_statistic using numpy.ufunc.reduceat'''
if statistic_method == 'min':
return np.minimum.reduceat(data, bins)
elif statistic_method == 'max':
return np.maximum.reduceat(data, bins)
# create the output bins
bins = da.linspace(0, out_size-1, out_size).astype('int')

# get the indices of the bins to which each value in self.idxs belongs
slices = da.digitize(self.idxs, bins)

# convert to DataArray using idxs as coords
weights = xr.DataArray(weights, dims=['x'])
slices = xr.DataArray(slices, dims=['x'])

# set out of range value to nan
mask = xr.DataArray((self.idxs >= bins.min()) & (self.idxs <= bins.max()), dims=['x'])
weights = weights.where(mask, drop=True)
slices = slices.where(mask, drop=True)

# sort the slices
sort_index = da.map_blocks(np.argsort, slices.data)
slices = slices[sort_index]
weights = weights[sort_index]

# get the unique slices (for assignment later) and bins (for numpy_reduceat)
unique_slices, unique_bins = da.unique(slices.data, return_index=True)
statistics_sub = xr.apply_ufunc(numpy_reduceat,
weights,
unique_bins.compute_chunk_sizes(),
zxdawn marked this conversation as resolved.
Show resolved Hide resolved
kwargs={'statistic_method': statistic_method},
zxdawn marked this conversation as resolved.
Show resolved Hide resolved
input_core_dims=[['x'], ['new_x']],
exclude_dims=set(('x',)),
output_core_dims=[['new_x'], ],
dask="parallelized",
output_dtypes=[weights.dtype],
dask_gufunc_kwargs={'allow_rechunk': True},
)

# initialize the output DataArray with np.nan
statistics = xr.DataArray(da.from_array(np.full((out_size), np.nan)), dims=['x'])
# assign the binned statistics
statistics.loc[unique_slices.astype('int')-1] = statistics_sub

counts = self.get_sum(np.logical_not(np.isnan(data)).astype(int)).ravel()

Expand All @@ -244,9 +264,9 @@ def _call_pandas_groupby_statistics(self, scipy_method, data, fill_value=None, s
def get_min(self, data, fill_value=np.nan, skipna=True):
"""Calculate minimums for each bin with drop-in-a-bucket resampling.

.. warning::
.. note::

The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here,
The :meth:`numpy.ufunc.reduceat` method is used here,
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
as the `dask_groupby <https://github.com/dcherian/dask_groupby>`_ is still under development.

Parameters
Expand All @@ -267,14 +287,14 @@ def get_min(self, data, fill_value=np.nan, skipna=True):
Bin-wise minimums in the target grid
"""
LOG.info("Get min of values in each location")
return self._call_pandas_groupby_statistics('min', data, fill_value, skipna)
return self._call_bin_statistic('min', data, fill_value, skipna)

def get_max(self, data, fill_value=np.nan, skipna=True):
"""Calculate maximums for each bin with drop-in-a-bucket resampling.

.. warning::
.. note::

The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here,
The :meth:`numpy.ufunc.reduceat` method is temporarily used here,
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
as the `dask_groupby <https://github.com/dcherian/dask_groupby>`_ is still under development.

Parameters
Expand All @@ -295,7 +315,7 @@ def get_max(self, data, fill_value=np.nan, skipna=True):
Bin-wise maximums in the target grid
"""
LOG.info("Get max of values in each location")
return self._call_pandas_groupby_statistics('max', data, fill_value, skipna)
return self._call_bin_statistic('max', data, fill_value, skipna)

def get_count(self):
"""Count the number of occurrences for each bin using drop-in-a-bucket
Expand Down