Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement get_abs_max on BucketResampler #418

Merged
merged 13 commits into from
May 6, 2022
40 changes: 40 additions & 0 deletions pyresample/bucket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,46 @@ def get_max(self, data, fill_value=np.nan, skipna=True):
LOG.info("Get max of values in each location")
return self._call_bin_statistic('max', data, fill_value, skipna)

def get_abs_max(self, data, fill_value=np.nan, skipna=True):
"""Calculate absolute maximums for each bin with drop-in-a-bucket resampling.

Returns for each bin the original signed value which has the largest
absolute value.

.. warning::

The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here,
as the `dask_groupby <https://github.com/dcherian/dask_groupby>`_ is still under development.

Parameters
----------
data : Numpy or Dask array
Data to be binned.
fill_value : number (optional)
Value to use for empty buckets or all-NaN buckets.
skipna : boolean (optional)
If True, skips NaN values for the maximum calculation
(similarly to Numpy's `nanmax`). Buckets containing only NaN are
set to fill value.
If False, sets the bucket to NaN if one or more NaN values are present in the bucket
(similarly to Numpy's `max`).
In both cases, empty buckets are set to fill value.
Default: True

Returns
-------
data : Numpy or Dask array
Bin-wise maximums in the target grid
"""
max_ = self.get_max(data, fill_value=fill_value, skipna=skipna)
min_ = self.get_min(data, fill_value=fill_value, skipna=skipna)
return self._get_abs_max_from_min_max(min_, max_)

@staticmethod
def _get_abs_max_from_min_max(min_, max_):
"""From array of min and array of max, get array of abs max."""
return da.where(-min_ > max_, min_, max_)
gerritholl marked this conversation as resolved.
Show resolved Hide resolved

def get_count(self):
"""Count the number of occurrences for each bin using drop-in-a-bucket resampling.

Expand Down
29 changes: 24 additions & 5 deletions pyresample/test/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def test_init(self, get_indices, prj):
self.assertTrue(hasattr(resampler, 'get_count'))
self.assertTrue(hasattr(resampler, 'get_min'))
self.assertTrue(hasattr(resampler, 'get_max'))
self.assertTrue(hasattr(resampler, 'get_abs_max'))
self.assertTrue(hasattr(resampler, 'get_average'))
self.assertTrue(hasattr(resampler, 'get_fractions'))
self.assertIsNone(resampler.counts)
Expand Down Expand Up @@ -198,15 +199,15 @@ def test_get_min(self):
data = da.from_array(np.array([[2, 11], [5, np.nan]]),
chunks=self.chunks)
result = self._get_min_result(data)
# test multiple entries average
# test multiple entries minimum
self.assertEqual(np.count_nonzero(result == 2), 1)
# test single entry average
# test single entry minimum
self.assertEqual(np.count_nonzero(result == 5), 1)
# test that minimum of bucket with only nan is nan, and empty buckets are nan
self.assertEqual(np.count_nonzero(~np.isnan(result)), 2)

def _get_max_result(self, data, **kwargs):
"""Compute the bucket average with kwargs and check that no dask computation is performed."""
"""Compute the bucket max with kwargs and check that no dask computation is performed."""
gerritholl marked this conversation as resolved.
Show resolved Hide resolved
with dask.config.set(scheduler=CustomScheduler(max_computes=0)):
result = self.resampler.get_max(data, **kwargs)
return result.compute()
Expand All @@ -216,13 +217,31 @@ def test_get_max(self):
data = da.from_array(np.array([[2, 11], [5, np.nan]]),
chunks=self.chunks)
result = self._get_max_result(data)
# test multiple entries average
# test multiple entries maximum
self.assertEqual(np.count_nonzero(result == 11), 1)
# test single entry average
# test single entry maximum
self.assertEqual(np.count_nonzero(result == 5), 1)
# test that minimum of bucket with only nan is nan, and empty buckets are nan
self.assertEqual(np.count_nonzero(~np.isnan(result)), 2)

def test_get_abs_max(self):
sfinkens marked this conversation as resolved.
Show resolved Hide resolved
"""Test abs max bucket resampling."""
data = da.from_array(np.array([[2, -11], [5, np.nan]]),
chunks=self.chunks)
result = self._get_abs_max_result(data)
# test multiple entries absolute maximum
self.assertEqual(np.count_nonzero(result == -11), 1)
# test single entry maximum
self.assertEqual(np.count_nonzero(result == 5), 1)
# test that minimum of bucket with only nan is nan, and empty buckets are nan
self.assertEqual(np.count_nonzero(~np.isnan(result)), 2)

def _get_abs_max_result(self, data, **kwargs):
"""Compute the bucket abs max with kwargs and check that no dask computation is performed."""
with dask.config.set(scheduler=CustomScheduler(max_computes=0)):
result = self.resampler.get_abs_max(data, **kwargs)
return result.compute()

def _get_average_result(self, data, **kwargs):
"""Compute the bucket average with kwargs and check that no dask computation is performed."""
with dask.config.set(scheduler=CustomScheduler(max_computes=0)):
Expand Down