Skip to content

Commit

Permalink
Specify meta in dask.array.map_blocks (#5989)
Browse files Browse the repository at this point in the history
* Specify meta in da.map_blocks

* Improve tests

* Add more tests

* Add whatsnew

* Specify dtype in map_complete_blocks and undo change to lazy_elementwise

---------

Co-authored-by: Patrick Peglar <patrick.peglar@metoffice.gov.uk>
  • Loading branch information
bouweandela and pp-mo authored Oct 28, 2024
1 parent 9c29117 commit 14d9714
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 24 deletions.
3 changes: 3 additions & 0 deletions docs/src/whatsnew/latest.rst
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ This document explains the changes made to Iris for this release
#. `@rcomer`_ enabled partial collapse of multi-dimensional string coordinates,
fixing :issue:`3653`. (:pull:`5955`)

#. `@bouweandela`_ made further updates to the ``chunktype`` of Dask arrays,
so it corresponds better with the array content. (:pull:`5989`)

#. `@ukmo-ccbunney`_ improved error handling for malformed `cell_method`
attribute. Also made cell_method string parsing more lenient w.r.t.
whitespace. (:pull:`6083`)
Expand Down
12 changes: 9 additions & 3 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,12 @@ def lazy_elementwise(lazy_array, elementwise_op):
# call may cast to float, or not, depending on unit equality : Thus, it's
# much safer to get udunits to decide that for us.
dtype = elementwise_op(np.zeros(1, lazy_array.dtype)).dtype
meta = da.utils.meta_from_array(lazy_array).astype(dtype)

return da.map_blocks(elementwise_op, lazy_array, dtype=dtype)
return da.map_blocks(elementwise_op, lazy_array, dtype=dtype, meta=meta)


def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
def map_complete_blocks(src, func, dims, out_sizes, dtype, *args, **kwargs):
"""Apply a function to complete blocks.
Complete means that the data is not chunked along the chosen dimensions.
Expand All @@ -557,6 +558,8 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
Dimensions that cannot be chunked.
out_sizes : tuple of int
Output size of dimensions that cannot be chunked.
dtype :
Output dtype.
*args : tuple
Additional arguments to pass to `func`.
**kwargs : dict
Expand Down Expand Up @@ -596,8 +599,11 @@ def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs):
for dim, size in zip(dims, out_sizes):
out_chunks[dim] = size

# Assume operation preserves mask.
meta = da.utils.meta_from_array(data).astype(dtype)

result = data.map_blocks(
func, *args, chunks=out_chunks, dtype=src.dtype, **kwargs
func, *args, chunks=out_chunks, meta=meta, dtype=dtype, **kwargs
)

return result
7 changes: 4 additions & 3 deletions lib/iris/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1390,9 +1390,10 @@ def _percentile(data, percent, fast_percentile_method=False, **kwargs):

result = iris._lazy_data.map_complete_blocks(
data,
_calc_percentile,
(-1,),
percent.shape,
func=_calc_percentile,
dims=(-1,),
out_sizes=percent.shape,
dtype=np.float64,
percent=percent,
fast_percentile_method=fast_percentile_method,
**kwargs,
Expand Down
13 changes: 10 additions & 3 deletions lib/iris/analysis/_area_weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,18 @@ def _regrid_area_weighted_rectilinear_src_and_grid__perform(

tgt_shape = (len(grid_y.points), len(grid_x.points))

# Specify the output dtype
if np.issubdtype(src_cube.dtype, np.integer):
out_dtype = np.float64
else:
out_dtype = src_cube.dtype

new_data = map_complete_blocks(
src_cube,
_regrid_along_dims,
(src_y_dim, src_x_dim),
meshgrid_x.shape,
func=_regrid_along_dims,
dims=(src_y_dim, src_x_dim),
out_sizes=meshgrid_x.shape,
dtype=out_dtype,
x_dim=src_x_dim,
y_dim=src_y_dim,
weights=weights,
Expand Down
13 changes: 10 additions & 3 deletions lib/iris/analysis/_regrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -943,11 +943,18 @@ def __call__(self, src):
x_dim = src.coord_dims(src_x_coord)[0]
y_dim = src.coord_dims(src_y_coord)[0]

# Specify the output dtype
if self._method == "linear" and np.issubdtype(src.dtype, np.integer):
out_dtype = np.float64
else:
out_dtype = src.dtype

data = map_complete_blocks(
src,
self._regrid,
(y_dim, x_dim),
sample_grid_x.shape,
func=self._regrid,
dims=(y_dim, x_dim),
out_sizes=sample_grid_x.shape,
dtype=out_dtype,
x_dim=x_dim,
y_dim=y_dim,
src_x_coord=src_x_coord,
Expand Down
5 changes: 4 additions & 1 deletion lib/iris/mesh/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ def fill_region(target, regiondata, regioninds):
# Notes on resultant calculation properties:
# 1. map_blocks is chunk-mapped, so it is parallelisable and space-saving
# 2. However, fetching less than a whole chunk is not efficient
meta = np.ma.array(
np.empty((0,) * result_array.ndim, dtype=result_array.dtype), mask=True
)
for cube in submesh_cubes:
# Lazy data array from the region cube
sub_data = cube.lazy_data()
Expand All @@ -300,7 +303,7 @@ def fill_region(target, regiondata, regioninds):
sub_data,
indarr,
dtype=result_array.dtype,
meta=np.ndarray,
meta=meta,
)

# Construct the result cube
Expand Down
13 changes: 13 additions & 0 deletions lib/iris/tests/unit/analysis/regrid/test_RectilinearRegridder.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,25 @@ def setUp(self):
self.args = ("linear", "mask")
self.regridder = Regridder(self.cube, self.cube, *self.args)
self.lazy_cube = self.cube.copy(da.asarray(self.cube.data))
self.lazy_masked_cube = self.lazy_cube.copy(da.ma.masked_array(self.cube.data))
self.lazy_regridder = Regridder(self.lazy_cube, self.lazy_cube, *self.args)

def test_lazy_regrid(self):
result = self.lazy_regridder(self.lazy_cube)
self.assertTrue(result.has_lazy_data())
meta = da.utils.meta_from_array(result.core_data())
self.assertTrue(meta.__class__ is np.ndarray)
expected = self.regridder(self.cube)
self.assertEqual(result.dtype, expected.dtype)
self.assertTrue(result == expected)

def test_lazy_masked_regrid(self):
result = self.lazy_regridder(self.lazy_masked_cube)
self.assertTrue(result.has_lazy_data())
meta = da.utils.meta_from_array(result.core_data())
self.assertTrue(isinstance(meta, np.ma.MaskedArray))
expected = self.regridder(self.cube)
self.assertEqual(result.dtype, expected.dtype)
self.assertTrue(result == expected)


Expand Down
10 changes: 6 additions & 4 deletions lib/iris/tests/unit/analysis/test_PERCENTILE.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,10 @@ def test_default_kwargs_passed(self, mocked_mquantiles):
if self.lazy:
data = as_lazy_data(data)

self.agg_method(data, axis=axis, percent=percent)
result = self.agg_method(data, axis=axis, percent=percent)

# Trigger calculation for lazy case.
as_concrete_data(data)
as_concrete_data(result)
for key in ["alphap", "betap"]:
self.assertEqual(mocked_mquantiles.call_args.kwargs[key], 1)

Expand All @@ -170,10 +170,12 @@ def test_chosen_kwargs_passed(self, mocked_mquantiles):
if self.lazy:
data = as_lazy_data(data)

self.agg_method(data, axis=axis, percent=percent, alphap=0.6, betap=0.5)
result = self.agg_method(
data, axis=axis, percent=percent, alphap=0.6, betap=0.5
)

# Trigger calculation for lazy case.
as_concrete_data(data)
as_concrete_data(result)
for key, val in zip(["alphap", "betap"], [0.6, 0.5]):
self.assertEqual(mocked_mquantiles.call_args.kwargs[key], val)

Expand Down
64 changes: 57 additions & 7 deletions lib/iris/tests/unit/lazy_data/test_map_complete_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,27 @@ def create_mock_cube(array):
class Test_map_complete_blocks(tests.IrisTest):
def setUp(self):
self.array = np.arange(8).reshape(2, 4)
self.func = lambda chunk: chunk + 1

def func(chunk):
"""Use a function that cannot be 'sampled'.
To make sure the call to map_blocks is correct for any function,
we define this function that cannot be called with size 0 arrays
to infer the output meta.
"""
if chunk.size == 0:
raise ValueError
return chunk + 1

self.func = func
self.func_result = self.array + 1

def test_non_lazy_input(self):
# Check that a non-lazy input doesn't trip up the functionality.
cube, cube_data = create_mock_cube(self.array)
result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
cube, self.func, dims=(1,), out_sizes=(4,), dtype=self.array.dtype
)
self.assertFalse(is_lazy_data(result))
self.assertArrayEqual(result, self.func_result)
# check correct data was accessed
Expand All @@ -48,7 +62,9 @@ def test_non_lazy_input(self):
def test_lazy_input(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (4,)))
cube, cube_data = create_mock_cube(lazy_array)
result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
cube, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)
# check correct data was accessed
Expand All @@ -57,14 +73,44 @@ def test_lazy_input(self):

def test_dask_array_input(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (4,)))
result = map_complete_blocks(lazy_array, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
lazy_array, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)

def test_dask_masked_array_input(self):
array = da.ma.masked_array(np.arange(2), mask=np.arange(2))
result = map_complete_blocks(
array, self.func, dims=tuple(), out_sizes=tuple(), dtype=array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray))
self.assertArrayEqual(result.compute(), np.ma.masked_array([1, 2], mask=[0, 1]))

def test_dask_array_input_with_different_output_dtype(self):
lazy_array = da.ma.masked_array(self.array, chunks=((1, 1), (4,)))
dtype = np.float32

def func(chunk):
if chunk.size == 0:
raise ValueError
return (chunk + 1).astype(np.float32)

result = map_complete_blocks(
lazy_array, func, dims=(1,), out_sizes=(4,), dtype=dtype
)
self.assertTrue(isinstance(da.utils.meta_from_array(result), np.ma.MaskedArray))
self.assertTrue(result.dtype == dtype)
self.assertTrue(result.compute().dtype == dtype)
self.assertArrayEqual(result.compute(), self.func_result)

def test_rechunk(self):
lazy_array = da.asarray(self.array, chunks=((1, 1), (2, 2)))
cube, _ = create_mock_cube(lazy_array)
result = map_complete_blocks(cube, self.func, dims=(1,), out_sizes=(4,))
result = map_complete_blocks(
cube, self.func, dims=(1,), out_sizes=(4,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), self.func_result)

Expand All @@ -76,15 +122,19 @@ def func(_):
return np.arange(2).reshape(1, 2)

func_result = [[0, 1], [0, 1]]
result = map_complete_blocks(cube, func, dims=(1,), out_sizes=(2,))
result = map_complete_blocks(
cube, func, dims=(1,), out_sizes=(2,), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), func_result)

def test_multidimensional_input(self):
array = np.arange(2 * 3 * 4).reshape(2, 3, 4)
lazy_array = da.asarray(array, chunks=((1, 1), (1, 2), (4,)))
cube, _ = create_mock_cube(lazy_array)
result = map_complete_blocks(cube, self.func, dims=(1, 2), out_sizes=(3, 4))
result = map_complete_blocks(
cube, self.func, dims=(1, 2), out_sizes=(3, 4), dtype=lazy_array.dtype
)
self.assertTrue(is_lazy_data(result))
self.assertArrayEqual(result.compute(), array + 1)

Expand Down

0 comments on commit 14d9714

Please sign in to comment.