From 14d9714ff4566e1fc65e6082ae7ca5b8be6fd677 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Mon, 28 Oct 2024 12:51:29 +0100 Subject: [PATCH] Specify meta in dask.array.map_blocks (#5989) * Specify meta in da.map_blocks * Improve tests * Add more tests * Add whatsnew * Specify dtype in map_complete_blocks and undo change to lazy_elementwise --------- Co-authored-by: Patrick Peglar --- docs/src/whatsnew/latest.rst | 3 + lib/iris/_lazy_data.py | 12 +++- lib/iris/analysis/__init__.py | 7 +- lib/iris/analysis/_area_weighted.py | 13 +++- lib/iris/analysis/_regrid.py | 13 +++- lib/iris/mesh/utils.py | 5 +- .../regrid/test_RectilinearRegridder.py | 13 ++++ .../tests/unit/analysis/test_PERCENTILE.py | 10 +-- .../lazy_data/test_map_complete_blocks.py | 64 +++++++++++++++++-- 9 files changed, 116 insertions(+), 24 deletions(-) diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index c862ef192e..35b101ab3d 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -62,6 +62,9 @@ This document explains the changes made to Iris for this release #. `@rcomer`_ enabled partial collapse of multi-dimensional string coordinates, fixing :issue:`3653`. (:pull:`5955`) +#. `@bouweandela`_ made further updates to the ``chunktype`` of Dask arrays, + so it corresponds better with the array content. (:pull:`5989`) + #. `@ukmo-ccbunney`_ improved error handling for malformed `cell_method` attribute. Also made cell_method string parsing more lenient w.r.t. whitespace. (:pull:`6083`) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index cd093b315c..a3dfa1edb4 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -537,11 +537,12 @@ def lazy_elementwise(lazy_array, elementwise_op): # call may cast to float, or not, depending on unit equality : Thus, it's # much safer to get udunits to decide that for us. dtype = elementwise_op(np.zeros(1, lazy_array.dtype)).dtype + meta = da.utils.meta_from_array(lazy_array).astype(dtype) - return da.map_blocks(elementwise_op, lazy_array, dtype=dtype) + return da.map_blocks(elementwise_op, lazy_array, dtype=dtype, meta=meta) -def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs): +def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs): """Apply a function to complete blocks. Complete means that the data is not chunked along the chosen dimensions. @@ -557,6 +558,8 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs): Dimensions that cannot be chunked. out_sizes : tuple of int Output size of dimensions that cannot be chunked. + dtype : + Output dtype. *args : tuple Additional arguments to pass to `func`. **kwargs : dict @@ -596,8 +599,11 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs): for dim, size in zip(dims, out_sizes): out_chunks[dim] = size + # Assume operation preserves mask. + meta = da.utils.meta_from_array(data).astype(dtype) + result = data.map_blocks( - func, *args, chunks=out_chunks, dtype=src.dtype, **kwargs + func, *args, chunks=out_chunks, meta=meta, dtype=dtype, **kwargs ) return result diff --git a/lib/iris/analysis/__init__.py b/lib/iris/analysis/__init__.py index 215d6dff0a..2c890ef8cc 100644 --- a/lib/iris/analysis/__init__.py +++ b/lib/iris/analysis/__init__.py @@ -1390,9 +1390,10 @@ def _percentile(data, percent, fast_percentile_method=False, **kwargs): result = iris._lazy_data.map_complete_blocks( data, - _calc_percentile, - (-1,), - percent.shape, + func=_calc_percentile, + dims=(-1,), + out_sizes=percent.shape, + dtype=np.float64, percent=percent, fast_percentile_method=fast_percentile_method, **kwargs, diff --git a/lib/iris/analysis/_area_weighted.py b/lib/iris/analysis/_area_weighted.py index a25a21bb47..3ed4f8aa33 100644 --- a/lib/iris/analysis/_area_weighted.py +++ b/lib/iris/analysis/_area_weighted.py @@ -392,11 +392,18 @@ def _regrid_area_weighted_rectilinear_src_and_grid__perform( tgt_shape = (len(grid_y.points), len(grid_x.points)) + # Specify the output dtype + if np.issubdtype(src_cube.dtype, np.integer): + out_dtype = np.float64 + else: + out_dtype = src_cube.dtype + new_data = map_complete_blocks( src_cube, - _regrid_along_dims, - (src_y_dim, src_x_dim), - meshgrid_x.shape, + func=_regrid_along_dims, + dims=(src_y_dim, src_x_dim), + out_sizes=meshgrid_x.shape, + dtype=out_dtype, x_dim=src_x_dim, y_dim=src_y_dim, weights=weights, diff --git a/lib/iris/analysis/_regrid.py b/lib/iris/analysis/_regrid.py index 0f375e69f4..fd56eb04a1 100644 --- a/lib/iris/analysis/_regrid.py +++ b/lib/iris/analysis/_regrid.py @@ -943,11 +943,18 @@ def __call__(self, src): x_dim = src.coord_dims(src_x_coord)[0] y_dim = src.coord_dims(src_y_coord)[0] + # Specify the output dtype + if self._method == "linear" and np.issubdtype(src.dtype, np.integer): + out_dtype = np.float64 + else: + out_dtype = src.dtype + data = map_complete_blocks( src, - self._regrid, - (y_dim, x_dim), - sample_grid_x.shape, + func=self._regrid, + dims=(y_dim, x_dim), + out_sizes=sample_grid_x.shape, + dtype=out_dtype, x_dim=x_dim, y_dim=y_dim, src_x_coord=src_x_coord, diff --git a/lib/iris/mesh/utils.py b/lib/iris/mesh/utils.py index d054cfec4f..3930fa3f1b 100644 --- a/lib/iris/mesh/utils.py +++ b/lib/iris/mesh/utils.py @@ -277,6 +277,9 @@ def fill_region(target, regiondata, regioninds): # Notes on resultant calculation properties: # 1. map_blocks is chunk-mapped, so it is parallelisable and space-saving # 2. However, fetching less than a whole chunk is not efficient + meta = np.ma.array( + np.empty((0,) * result_array.ndim, dtype=result_array.dtype), mask=True + ) for cube in submesh_cubes: # Lazy data array from the region cube sub_data = cube.lazy_data() @@ -300,7 +303,7 @@ def fill_region(target, regiondata, regioninds): sub_data, indarr, dtype=result_array.dtype, - meta=np.ndarray, + meta=meta, ) # Construct the result cube diff --git a/lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py b/lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py index 284d52d3f9..3f841b938a 100644 --- a/lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py +++ b/lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py @@ -474,12 +474,25 @@ def setUp(self): self.args = ("linear", "mask") self.regridder = Regridder(self.cube, self.cube, *self.args) self.lazy_cube = self.cube.copy(da.asarray(self.cube.data)) + self.lazy_masked_cube = self.lazy_cube.copy(da.ma.masked_array(self.cube.data)) self.lazy_regridder = Regridder(self.lazy_cube, self.lazy_cube, *self.args) def test_lazy_regrid(self): result = self.lazy_regridder(self.lazy_cube) self.assertTrue(result.has_lazy_data()) + meta = da.utils.meta_from_array(result.core_data()) + self.assertTrue(meta.__class__ is np.ndarray) expected = self.regridder(self.cube) + self.assertEqual(result.dtype, expected.dtype) + self.assertTrue(result == expected) + + def test_lazy_masked_regrid(self): + result = self.lazy_regridder(self.lazy_masked_cube) + self.assertTrue(result.has_lazy_data()) + meta = da.utils.meta_from_array(result.core_data()) + self.assertTrue(isinstance(meta, np.ma.MaskedArray)) + expected = self.regridder(self.cube) + self.assertEqual(result.dtype, expected.dtype) self.assertTrue(result == expected) diff --git a/lib/iris/tests/unit/analysis/test_PERCENTILE.py b/lib/iris/tests/unit/analysis/test_PERCENTILE.py index 0d759c621f..72218af830 100644 --- a/lib/iris/tests/unit/analysis/test_PERCENTILE.py +++ b/lib/iris/tests/unit/analysis/test_PERCENTILE.py @@ -155,10 +155,10 @@ def test_default_kwargs_passed(self, mocked_mquantiles): if self.lazy: data = as_lazy_data(data) - self.agg_method(data, axis=axis, percent=percent) + result = self.agg_method(data, axis=axis, percent=percent) # Trigger calculation for lazy case. - as_concrete_data(data) + as_concrete_data(result) for key in ["alphap", "betap"]: self.assertEqual(mocked_mquantiles.call_args.kwargs[key], 1) @@ -170,10 +170,12 @@ def test_chosen_kwargs_passed(self, mocked_mquantiles): if self.lazy: data = as_lazy_data(data) - self.agg_method(data, axis=axis, percent=percent, alphap=0.6, betap=0.5) + result = self.agg_method( + data, axis=axis, percent=percent, alphap=0.6, betap=0.5 + ) # Trigger calculation for lazy case. - as_concrete_data(data) + as_concrete_data(result) for key, val in zip(["alphap", "betap"], [0.6, 0.5]): self.assertEqual(mocked_mquantiles.call_args.kwargs[key], val) diff --git a/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py b/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py index 7403a5611e..7d619353ed 100644 --- a/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py +++ b/lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py @@ -32,13 +32,27 @@ def create_mock_cube(array): class Test_map_complete_blocks(tests.IrisTest): def setUp(self): self.array = np.arange(8).reshape(2, 4) - self.func = lambda chunk: chunk + 1 + + def func(chunk): + """Use a function that cannot be 'sampled'. + + To make sure the call to map_blocks is correct for any function, + we define this function that cannot be called with size 0 arrays + to infer the output meta. + """ + if chunk.size == 0: + raise ValueError + return chunk + 1 + + self.func = func self.func_result = self.array + 1 def test_non_lazy_input(self): # Check that a non-lazy input doesn't trip up the functionality. cube, cube_data = create_mock_cube(self.array) - result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,)) + result = map_complete_blocks( + cube, self.func, dims=(1,), out_sizes=(4,), dtype=self.array.dtype + ) self.assertFalse(is_lazy_data(result)) self.assertArrayEqual(result, self.func_result) # check correct data was accessed @@ -48,7 +62,9 @@ def test_non_lazy_input(self): def test_lazy_input(self): lazy_array = da.asarray(self.array, chunks=((1, 1), (4,))) cube, cube_data = create_mock_cube(lazy_array) - result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,)) + result = map_complete_blocks( + cube, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype + ) self.assertTrue(is_lazy_data(result)) self.assertArrayEqual(result.compute(), self.func_result) # check correct data was accessed @@ -57,14 +73,44 @@ def test_lazy_input(self): def test_dask_array_input(self): lazy_array = da.asarray(self.array, chunks=((1, 1), (4,))) - result = map_complete_blocks(lazy_array, self.func, dims=(1,), out_sizes=(4,)) + result = map_complete_blocks( + lazy_array, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype + ) + self.assertTrue(is_lazy_data(result)) + self.assertArrayEqual(result.compute(), self.func_result) + + def test_dask_masked_array_input(self): + array = da.ma.masked_array(np.arange(2), mask=np.arange(2)) + result = map_complete_blocks( + array, self.func, dims=tuple(), out_sizes=tuple(), dtype=array.dtype + ) self.assertTrue(is_lazy_data(result)) + self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray)) + self.assertArrayEqual(result.compute(), np.ma.masked_array([1, 2], mask=[0, 1])) + + def test_dask_array_input_with_different_output_dtype(self): + lazy_array = da.ma.masked_array(self.array, chunks=((1, 1), (4,))) + dtype = np.float32 + + def func(chunk): + if chunk.size == 0: + raise ValueError + return (chunk + 1).astype(np.float32) + + result = map_complete_blocks( + lazy_array, func, dims=(1,), out_sizes=(4,), dtype=dtype + ) + self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray)) + self.assertTrue(result.dtype == dtype) + self.assertTrue(result.compute().dtype == dtype) self.assertArrayEqual(result.compute(), self.func_result) def test_rechunk(self): lazy_array = da.asarray(self.array, chunks=((1, 1), (2, 2))) cube, _ = create_mock_cube(lazy_array) - result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,)) + result = map_complete_blocks( + cube, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype + ) self.assertTrue(is_lazy_data(result)) self.assertArrayEqual(result.compute(), self.func_result) @@ -76,7 +122,9 @@ def func(_): return np.arange(2).reshape(1, 2) func_result = [[0, 1], [0, 1]] - result = map_complete_blocks(cube, func, dims=(1,), out_sizes=(2,)) + result = map_complete_blocks( + cube, func, dims=(1,), out_sizes=(2,), dtype=lazy_array.dtype + ) self.assertTrue(is_lazy_data(result)) self.assertArrayEqual(result.compute(), func_result) @@ -84,7 +132,9 @@ def test_multidimensional_input(self): array = np.arange(2 * 3 * 4).reshape(2, 3, 4) lazy_array = da.asarray(array, chunks=((1, 1), (1, 2), (4,))) cube, _ = create_mock_cube(lazy_array) - result = map_complete_blocks(cube, self.func, dims=(1, 2), out_sizes=(3, 4)) + result = map_complete_blocks( + cube, self.func, dims=(1, 2), out_sizes=(3, 4), dtype=lazy_array.dtype + ) self.assertTrue(is_lazy_data(result)) self.assertArrayEqual(result.compute(), array + 1)