From ee6acc137f6dd6ae6c624b935884ac1e0424380f Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 4 Feb 2022 14:20:59 +0100 Subject: [PATCH 01/12] Add test for get_abs_max Add a (failing) unit test for the new (not yet implemented) get_abs_max and get_abs_min methods on the BucketResampler class. --- pyresample/test/test_bucket.py | 48 ++++++++++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/pyresample/test/test_bucket.py b/pyresample/test/test_bucket.py index 18ebc6bd8..cf666c949 100644 --- a/pyresample/test/test_bucket.py +++ b/pyresample/test/test_bucket.py @@ -65,6 +65,8 @@ def test_init(self, get_indices, prj): self.assertTrue(hasattr(resampler, 'get_count')) self.assertTrue(hasattr(resampler, 'get_min')) self.assertTrue(hasattr(resampler, 'get_max')) + self.assertTrue(hasattr(resampler, 'get_abs_max')) + self.assertTrue(hasattr(resampler, 'get_abs_min')) self.assertTrue(hasattr(resampler, 'get_average')) self.assertTrue(hasattr(resampler, 'get_fractions')) self.assertIsNone(resampler.counts) @@ -198,15 +200,15 @@ def test_get_min(self): data = da.from_array(np.array([[2, 11], [5, np.nan]]), chunks=self.chunks) result = self._get_min_result(data) - # test multiple entries average + # test multiple entries minimum self.assertEqual(np.count_nonzero(result == 2), 1) - # test single entry average + # test single entry minimum self.assertEqual(np.count_nonzero(result == 5), 1) # test that minimum of bucket with only nan is nan, and empty buckets are nan self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) def _get_max_result(self, data, **kwargs): - """Compute the bucket average with kwargs and check that no dask computation is performed.""" + """Compute the bucket max with kwargs and check that no dask computation is performed.""" with dask.config.set(scheduler=CustomScheduler(max_computes=1)): result = self.resampler.get_max(data, **kwargs) return result.compute() @@ -216,9 +218,45 @@ def test_get_max(self): data = da.from_array(np.array([[2, 11], [5, np.nan]]), chunks=self.chunks) result = self._get_max_result(data) - # test multiple entries average + # test multiple entries maximum self.assertEqual(np.count_nonzero(result == 11), 1) - # test single entry average + # test single entry maximum + self.assertEqual(np.count_nonzero(result == 5), 1) + # test that minimum of bucket with only nan is nan, and empty buckets are nan + self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) + + def _get_abs_max_result(self, data, **kwargs): + """Compute the bucket abs max with kwargs and check that no dask computation is performed.""" + with dask.config.set(scheduler=CustomScheduler(max_computes=1)): + result = self.resampler.get_abs_max(data, **kwargs) + return result.compute() + + def test_get_abs_max(self): + """Test abs max bucket resampling.""" + data = da.from_array(np.array([[2, -11], [5, np.nan]]), + chunks=self.chunks) + result = self._get_abs_max_result(data) + # test multiple entries absolute maximum + self.assertEqual(np.count_nonzero(result == -11), 1) + # test single entry maximum + self.assertEqual(np.count_nonzero(result == 5), 1) + # test that minimum of bucket with only nan is nan, and empty buckets are nan + self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) + + def _get_abs_min_result(self, data, **kwargs): + """Compute the bucket abs min with kwargs and check that no dask computation is performed.""" + with dask.config.set(scheduler=CustomScheduler(max_computes=1)): + result = self.resampler.get_abs_min(data, **kwargs) + return result.compute() + + def test_get_abs_min(self): + """Test abs min bucket resampling.""" + data = da.from_array(np.array([[-2, -11], [5, np.nan]]), + chunks=self.chunks) + result = self._get_abs_min_result(data) + # test multiple entries absolute minimum + self.assertEqual(np.count_nonzero(result == -2), 1) + # test single entry absolute minimum self.assertEqual(np.count_nonzero(result == 5), 1) # test that minimum of bucket with only nan is nan, and empty buckets are nan self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) From 3b0a7dabe611d92dcc488491b3abf75fcaaae545 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 4 Feb 2022 15:11:29 +0100 Subject: [PATCH 02/12] First attempt to implement get_abs_max First attempt to implement get_abs_max for the bucket resampler. It's failing because ``da.where`` return ``NotImplemented``. I don't understand the problem. --- pyresample/bucket/__init__.py | 70 +++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index 6fa73006a..344bd18c3 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -226,6 +226,18 @@ def _call_pandas_groupby_statistics(self, scipy_method, data, fill_value=None, s bins=np.linspace(0, out_size, out_size) ) )['values'].max()) + elif scipy_method == "absmin": + def get_abs_min(part): + gb = part.groupby(np.digitize(part.x, bins=np.linspace(0, out_size, out_size))) + return gb.apply(lambda p: p.loc[abs(p["values"]).argmin()]) + statistics = df.map_partitions(get_abs_min) + elif scipy_method == "absmax": + def get_abs_max(part): + gb = part.groupby(np.digitize(part.x, bins=np.linspace(0, out_size, out_size))) + return gb.apply(lambda p: p.loc[abs(p["values"]).argmax()]) + statistics = df.map_partitions(get_abs_max) + else: + raise ValueError(f"Invalid method: {scipy_method:s}") # fill missed index statistics = (statistics + pd.Series(np.zeros(out_size))).fillna(0) @@ -296,6 +308,64 @@ def get_max(self, data, fill_value=np.nan, skipna=True): LOG.info("Get max of values in each location") return self._call_pandas_groupby_statistics('max', data, fill_value, skipna) + def get_abs_max(self, data, fill_value=np.nan, skipna=True): + """Calculate absolute maximums for each bin with drop-in-a-bucket resampling. + + .. warning:: + + The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here, + as the `dask_groupby `_ is still under development. + + Parameters + ---------- + data : Numpy or Dask array + Data to be binned. + skipna : boolean (optional) + If True, skips NaN values for the maximum calculation + (similarly to Numpy's `nanmax`). Buckets containing only NaN are set to zero. + If False, sets the bucket to NaN if one or more NaN values are present in the bucket + (similarly to Numpy's `max`). + In both cases, empty buckets are set to 0. + Default: True + + Returns + ------- + data : Numpy or Dask array + Bin-wise maximums in the target grid + """ + LOG.debug("Get abs max of values in each location") + return self._call_pandas_groupby_statistics('absmax', data, fill_value, skipna) + + + def get_abs_min(self, data, fill_value=np.nan, skipna=True): + """Calculate absolute minimums for each bin with drop-in-a-bucket resampling. + + .. warning:: + + The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here, + as the `dask_groupby `_ is still under development. + + Parameters + ---------- + data : Numpy or Dask array + Data to be binned. + skipna : boolean (optional) + If True, skips NaN values for the maximum calculation + (similarly to Numpy's `nanmax`). Buckets containing only NaN are set to zero. + If False, sets the bucket to NaN if one or more NaN values are present in the bucket + (similarly to Numpy's `max`). + In both cases, empty buckets are set to 0. + Default: True + + Returns + ------- + data : Numpy or Dask array + Bin-wise maximums in the target grid + """ + LOG.debug("Get abs min of values in each location") + return self._call_pandas_groupby_statistics('absmin', data, fill_value, skipna) + + def get_count(self): """Count the number of occurrences for each bin using drop-in-a-bucket resampling. From 862f91be64a01a6c667f61acc916f5cd8ee69c3f Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 4 Feb 2022 15:15:17 +0100 Subject: [PATCH 03/12] PEP8 errors --- pyresample/bucket/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index 344bd18c3..93124e947 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -336,7 +336,6 @@ def get_abs_max(self, data, fill_value=np.nan, skipna=True): LOG.debug("Get abs max of values in each location") return self._call_pandas_groupby_statistics('absmax', data, fill_value, skipna) - def get_abs_min(self, data, fill_value=np.nan, skipna=True): """Calculate absolute minimums for each bin with drop-in-a-bucket resampling. @@ -365,7 +364,6 @@ def get_abs_min(self, data, fill_value=np.nan, skipna=True): LOG.debug("Get abs min of values in each location") return self._call_pandas_groupby_statistics('absmin', data, fill_value, skipna) - def get_count(self): """Count the number of occurrences for each bin using drop-in-a-bucket resampling. From 429e01eac62b9e8906a619abe57619a6ae602361 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 4 Feb 2022 16:29:04 +0100 Subject: [PATCH 04/12] Fix implementation for get_abs_max Fix the implementation for get_abs_max. Remove implementation and test for get_abs_min because it's harder to implement and I don't need it. --- pyresample/bucket/__init__.py | 43 +++------------------------------- pyresample/test/test_bucket.py | 31 ------------------------ 2 files changed, 3 insertions(+), 71 deletions(-) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index 93124e947..d8e55b44b 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -226,16 +226,6 @@ def _call_pandas_groupby_statistics(self, scipy_method, data, fill_value=None, s bins=np.linspace(0, out_size, out_size) ) )['values'].max()) - elif scipy_method == "absmin": - def get_abs_min(part): - gb = part.groupby(np.digitize(part.x, bins=np.linspace(0, out_size, out_size))) - return gb.apply(lambda p: p.loc[abs(p["values"]).argmin()]) - statistics = df.map_partitions(get_abs_min) - elif scipy_method == "absmax": - def get_abs_max(part): - gb = part.groupby(np.digitize(part.x, bins=np.linspace(0, out_size, out_size))) - return gb.apply(lambda p: p.loc[abs(p["values"]).argmax()]) - statistics = df.map_partitions(get_abs_max) else: raise ValueError(f"Invalid method: {scipy_method:s}") @@ -333,36 +323,9 @@ def get_abs_max(self, data, fill_value=np.nan, skipna=True): data : Numpy or Dask array Bin-wise maximums in the target grid """ - LOG.debug("Get abs max of values in each location") - return self._call_pandas_groupby_statistics('absmax', data, fill_value, skipna) - - def get_abs_min(self, data, fill_value=np.nan, skipna=True): - """Calculate absolute minimums for each bin with drop-in-a-bucket resampling. - - .. warning:: - - The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here, - as the `dask_groupby `_ is still under development. - - Parameters - ---------- - data : Numpy or Dask array - Data to be binned. - skipna : boolean (optional) - If True, skips NaN values for the maximum calculation - (similarly to Numpy's `nanmax`). Buckets containing only NaN are set to zero. - If False, sets the bucket to NaN if one or more NaN values are present in the bucket - (similarly to Numpy's `max`). - In both cases, empty buckets are set to 0. - Default: True - - Returns - ------- - data : Numpy or Dask array - Bin-wise maximums in the target grid - """ - LOG.debug("Get abs min of values in each location") - return self._call_pandas_groupby_statistics('absmin', data, fill_value, skipna) + max = self.get_max(data, fill_value=fill_value, skipna=skipna) + min = self.get_min(data, fill_value=fill_value, skipna=skipna) + return da.where(-min > max, min, max) def get_count(self): """Count the number of occurrences for each bin using drop-in-a-bucket resampling. diff --git a/pyresample/test/test_bucket.py b/pyresample/test/test_bucket.py index cf666c949..6ff5b68fd 100644 --- a/pyresample/test/test_bucket.py +++ b/pyresample/test/test_bucket.py @@ -66,7 +66,6 @@ def test_init(self, get_indices, prj): self.assertTrue(hasattr(resampler, 'get_min')) self.assertTrue(hasattr(resampler, 'get_max')) self.assertTrue(hasattr(resampler, 'get_abs_max')) - self.assertTrue(hasattr(resampler, 'get_abs_min')) self.assertTrue(hasattr(resampler, 'get_average')) self.assertTrue(hasattr(resampler, 'get_fractions')) self.assertIsNone(resampler.counts) @@ -231,36 +230,6 @@ def _get_abs_max_result(self, data, **kwargs): result = self.resampler.get_abs_max(data, **kwargs) return result.compute() - def test_get_abs_max(self): - """Test abs max bucket resampling.""" - data = da.from_array(np.array([[2, -11], [5, np.nan]]), - chunks=self.chunks) - result = self._get_abs_max_result(data) - # test multiple entries absolute maximum - self.assertEqual(np.count_nonzero(result == -11), 1) - # test single entry maximum - self.assertEqual(np.count_nonzero(result == 5), 1) - # test that minimum of bucket with only nan is nan, and empty buckets are nan - self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) - - def _get_abs_min_result(self, data, **kwargs): - """Compute the bucket abs min with kwargs and check that no dask computation is performed.""" - with dask.config.set(scheduler=CustomScheduler(max_computes=1)): - result = self.resampler.get_abs_min(data, **kwargs) - return result.compute() - - def test_get_abs_min(self): - """Test abs min bucket resampling.""" - data = da.from_array(np.array([[-2, -11], [5, np.nan]]), - chunks=self.chunks) - result = self._get_abs_min_result(data) - # test multiple entries absolute minimum - self.assertEqual(np.count_nonzero(result == -2), 1) - # test single entry absolute minimum - self.assertEqual(np.count_nonzero(result == 5), 1) - # test that minimum of bucket with only nan is nan, and empty buckets are nan - self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) - def _get_average_result(self, data, **kwargs): """Compute the bucket average with kwargs and check that no dask computation is performed.""" with dask.config.set(scheduler=CustomScheduler(max_computes=0)): From a742fd80ac8a43cad3f60ae71cf56669c794000f Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 4 Feb 2022 16:41:17 +0100 Subject: [PATCH 05/12] Remove unreachable code Remove some code that is unreachable and untested --- pyresample/bucket/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index d8e55b44b..1f772d28d 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -226,8 +226,6 @@ def _call_pandas_groupby_statistics(self, scipy_method, data, fill_value=None, s bins=np.linspace(0, out_size, out_size) ) )['values'].max()) - else: - raise ValueError(f"Invalid method: {scipy_method:s}") # fill missed index statistics = (statistics + pd.Series(np.zeros(out_size))).fillna(0) From c1239da531d42f10b6adffd7fab1051c64948d2a Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Fri, 4 Feb 2022 18:41:11 +0100 Subject: [PATCH 06/12] Return accidentally removed unit test. Recover the unit test accidentally deleted in a previous commit. --- pyresample/test/test_bucket.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pyresample/test/test_bucket.py b/pyresample/test/test_bucket.py index 6ff5b68fd..f04836123 100644 --- a/pyresample/test/test_bucket.py +++ b/pyresample/test/test_bucket.py @@ -224,6 +224,18 @@ def test_get_max(self): # test that minimum of bucket with only nan is nan, and empty buckets are nan self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) + def test_get_abs_max(self): + """Test abs max bucket resampling.""" + data = da.from_array(np.array([[2, -11], [5, np.nan]]), + chunks=self.chunks) + result = self._get_abs_max_result(data) + # test multiple entries absolute maximum + self.assertEqual(np.count_nonzero(result == -11), 1) + # test single entry maximum + self.assertEqual(np.count_nonzero(result == 5), 1) + # test that minimum of bucket with only nan is nan, and empty buckets are nan + self.assertEqual(np.count_nonzero(~np.isnan(result)), 2) + def _get_abs_max_result(self, data, **kwargs): """Compute the bucket abs max with kwargs and check that no dask computation is performed.""" with dask.config.set(scheduler=CustomScheduler(max_computes=1)): From 2ae033c19018729b277f2c7783e8e153c16fff7b Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Thu, 10 Feb 2022 10:05:50 +0100 Subject: [PATCH 07/12] increase max_computer to make test pass If I increase max_computes in the unit test, the test passes --- pyresample/test/test_bucket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyresample/test/test_bucket.py b/pyresample/test/test_bucket.py index f04836123..a5c06f6a8 100644 --- a/pyresample/test/test_bucket.py +++ b/pyresample/test/test_bucket.py @@ -238,7 +238,7 @@ def test_get_abs_max(self): def _get_abs_max_result(self, data, **kwargs): """Compute the bucket abs max with kwargs and check that no dask computation is performed.""" - with dask.config.set(scheduler=CustomScheduler(max_computes=1)): + with dask.config.set(scheduler=CustomScheduler(max_computes=3)): result = self.resampler.get_abs_max(data, **kwargs) return result.compute() From 9d9ece8bd8620c5ec5ce64a62872320a340dbba9 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 13 Apr 2022 09:23:29 +0200 Subject: [PATCH 08/12] Clarify get_abs_max docs. --- pyresample/bucket/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index 1f772d28d..e30147da3 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -299,6 +299,9 @@ def get_max(self, data, fill_value=np.nan, skipna=True): def get_abs_max(self, data, fill_value=np.nan, skipna=True): """Calculate absolute maximums for each bin with drop-in-a-bucket resampling. + Returns for each bin the original signed value which has the largest + absolute value. + .. warning:: The slow :meth:`pandas.DataFrame.groupby` method is temporarily used here, From 325c00cd156a3855f8d11c0e1220fb1ad5c606ea Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 13 Apr 2022 10:07:43 +0200 Subject: [PATCH 09/12] don't shadow builtins rename min/max local variables as to not shadow builtins --- pyresample/bucket/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index e30147da3..07da991b3 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -324,9 +324,9 @@ def get_abs_max(self, data, fill_value=np.nan, skipna=True): data : Numpy or Dask array Bin-wise maximums in the target grid """ - max = self.get_max(data, fill_value=fill_value, skipna=skipna) - min = self.get_min(data, fill_value=fill_value, skipna=skipna) - return da.where(-min > max, min, max) + max_ = self.get_max(data, fill_value=fill_value, skipna=skipna) + min_ = self.get_min(data, fill_value=fill_value, skipna=skipna) + return da.where(-min_ > max_, min_, max_) def get_count(self): """Count the number of occurrences for each bin using drop-in-a-bucket resampling. From 933f4b956117d103d406c78d07d4ba0a7a252b8b Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Mon, 2 May 2022 11:28:27 +0200 Subject: [PATCH 10/12] Ensure no computations Change max-computes from 3 to 0 in get_abs_max test --- pyresample/test/test_bucket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyresample/test/test_bucket.py b/pyresample/test/test_bucket.py index a5c06f6a8..6e0260bda 100644 --- a/pyresample/test/test_bucket.py +++ b/pyresample/test/test_bucket.py @@ -238,7 +238,7 @@ def test_get_abs_max(self): def _get_abs_max_result(self, data, **kwargs): """Compute the bucket abs max with kwargs and check that no dask computation is performed.""" - with dask.config.set(scheduler=CustomScheduler(max_computes=3)): + with dask.config.set(scheduler=CustomScheduler(max_computes=0)): result = self.resampler.get_abs_max(data, **kwargs) return result.compute() From cc377d6fca85c25a447a5410fea24d0536cbb55d Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 4 May 2022 16:55:25 +0200 Subject: [PATCH 11/12] Add doc for missing argument in get_abs_max In the bucket resampler get_abs_max method, add the missing fill_value to the method documentation. --- pyresample/bucket/__init__.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index 07da991b3..94060d8dd 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -311,13 +311,16 @@ def get_abs_max(self, data, fill_value=np.nan, skipna=True): ---------- data : Numpy or Dask array Data to be binned. + fill_value : number (optional) + Value to use for empty buckets or all-NaN buckets. skipna : boolean (optional) - If True, skips NaN values for the maximum calculation - (similarly to Numpy's `nanmax`). Buckets containing only NaN are set to zero. - If False, sets the bucket to NaN if one or more NaN values are present in the bucket - (similarly to Numpy's `max`). - In both cases, empty buckets are set to 0. - Default: True + If True, skips NaN values for the maximum calculation + (similarly to Numpy's `nanmax`). Buckets containing only NaN are + set to fill value. + If False, sets the bucket to NaN if one or more NaN values are present in the bucket + (similarly to Numpy's `max`). + In both cases, empty buckets are set to fill value. + Default: True Returns ------- From 70b24c1db3041efa5008a2d56a3a321b7f9bc0e1 Mon Sep 17 00:00:00 2001 From: Gerrit Holl Date: Wed, 4 May 2022 17:03:17 +0200 Subject: [PATCH 12/12] Refactor get_abs_max Refactor the BucketResampler.get_abs_max method. Move the calculation of the abs_max from min and max to its own (pseudoprivate) method. --- pyresample/bucket/__init__.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyresample/bucket/__init__.py b/pyresample/bucket/__init__.py index 94060d8dd..a7cf100e3 100644 --- a/pyresample/bucket/__init__.py +++ b/pyresample/bucket/__init__.py @@ -329,6 +329,11 @@ def get_abs_max(self, data, fill_value=np.nan, skipna=True): """ max_ = self.get_max(data, fill_value=fill_value, skipna=skipna) min_ = self.get_min(data, fill_value=fill_value, skipna=skipna) + return self._get_abs_max_from_min_max(min_, max_) + + @staticmethod + def _get_abs_max_from_min_max(min_, max_): + """From array of min and array of max, get array of abs max.""" return da.where(-min_ > max_, min_, max_) def get_count(self):