From ca2b419ac3cd4d68638d804c371eeb5b411b5d0c Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Mon, 4 Mar 2024 17:21:12 +0100 Subject: [PATCH 1/9] Autodetect meta when creating lazy data --- lib/iris/_lazy_data.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 40984248d1..e1b2b07506 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -263,7 +263,7 @@ def as_lazy_data( raise ValueError( f"Dask chunking chosen, but chunks already assigned value {chunks}" ) - lazy_params = {"asarray": asarray, "meta": np.ndarray} + lazy_params = {"asarray": asarray} else: if chunks is None: # No existing chunks : Make a chunk the shape of the entire input array @@ -284,7 +284,6 @@ def as_lazy_data( lazy_params = { "chunks": chunks, "asarray": asarray, - "meta": np.ndarray, } if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) From 049c1feb58e8fb8d050429fb523a06c8e50489cd Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Tue, 16 Apr 2024 13:06:11 +0200 Subject: [PATCH 2/9] Use correct meta --- lib/iris/_lazy_data.py | 48 ++++++++++++++------- lib/iris/fileformats/netcdf/loader.py | 7 ++- lib/iris/fileformats/pp.py | 9 +++- lib/iris/tests/unit/lazy_data/test_stack.py | 22 ++++++++++ 4 files changed, 66 insertions(+), 20 deletions(-) create mode 100644 lib/iris/tests/unit/lazy_data/test_stack.py diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index e1b2b07506..8d7e221545 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -9,6 +9,7 @@ """ from functools import lru_cache, wraps +from typing import Sequence import dask import dask.array as da @@ -221,7 +222,7 @@ def _optimum_chunksize( def as_lazy_data( - data, chunks=None, asarray=False, dims_fixed=None, dask_chunking=False + data, chunks=None, asarray=False, meta=None, dims_fixed=None, dask_chunking=False ): """Convert the input array `data` to a :class:`dask.array.Array`. @@ -235,6 +236,9 @@ def as_lazy_data( asarray : bool, default=False If True, then chunks will be converted to instances of `ndarray`. Set to False (default) to pass passed chunks through unchanged. + meta : numpy.ndarray, optional + Empty ndarray created with same NumPy backend, ndim and dtype as the + Dask Array being created. dims_fixed : list of bool, optional If set, a list of values equal in length to 'chunks' or data.ndim. 'True' values indicate a dimension which can not be changed, i.e. the @@ -258,12 +262,14 @@ def as_lazy_data( but reduced by a factor if that exceeds the dask default chunksize. """ + if isinstance(data, ma.core.MaskedConstant): + data = ma.masked_array(data.data, mask=data.mask) + if dask_chunking: if chunks is not None: raise ValueError( f"Dask chunking chosen, but chunks already assigned value {chunks}" ) - lazy_params = {"asarray": asarray} else: if chunks is None: # No existing chunks : Make a chunk the shape of the entire input array @@ -281,14 +287,9 @@ def as_lazy_data( dtype=data.dtype, dims_fixed=dims_fixed, ) - lazy_params = { - "chunks": chunks, - "asarray": asarray, - } - if isinstance(data, ma.core.MaskedConstant): - data = ma.masked_array(data.data, mask=data.mask) + if not is_lazy_data(data): - data = da.from_array(data, **lazy_params) + data = da.from_array(data, chunks=chunks, asarray=asarray, meta=meta) return data @@ -350,7 +351,22 @@ def as_concrete_data(data): return data -def multidim_lazy_stack(stack): +def stack(seq: Sequence[da.Array | np.ndarray]) -> da.Array: + """Stack arrays along a new axis. + + This version of :func:`da.stack` ensures all slices of the resulting array + are masked arrays if any of the input arrays is masked. + """ + + def is_masked(a): + return isinstance(da.utils.meta_from_array(a), np.ma.MaskedArray) + + if any(is_masked(a) for a in seq): + seq = [a if is_masked(a) else da.ma.masked_array(a) for a in seq] + return da.stack(seq) + + +def multidim_lazy_stack(arr): """Recursively build a multidimensional stacked dask array. This is needed because :meth:`dask.array.Array.stack` only accepts a @@ -358,7 +374,7 @@ def multidim_lazy_stack(stack): Parameters ---------- - stack : + arr : An ndarray of :class:`dask.array.Array`. Returns @@ -366,15 +382,15 @@ def multidim_lazy_stack(stack): The input array converted to a lazy :class:`dask.array.Array`. """ - if stack.ndim == 0: + if arr.ndim == 0: # A 0-d array cannot be stacked. - result = stack.item() - elif stack.ndim == 1: + result = arr.item() + elif arr.ndim == 1: # Another base case : simple 1-d goes direct in dask. - result = da.stack(list(stack)) + result = stack(list(arr)) else: # Recurse because dask.stack does not do multi-dimensional. - result = da.stack([multidim_lazy_stack(subarray) for subarray in stack]) + result = stack([multidim_lazy_stack(subarray) for subarray in arr]) return result diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 16fe567c9b..32e363a247 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -239,8 +239,11 @@ def _get_cf_var_data(cf_var, filename): ) # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". + meta = np.ma.array( + np.empty((0,) * proxy.ndim, dtype=proxy.dtype), mask=True + ) if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK: - result = as_lazy_data(proxy, chunks=None, dask_chunking=True) + result = as_lazy_data(proxy, meta=meta, chunks=None, dask_chunking=True) else: chunks = cf_var.cf_data.chunking() if chunks is None: @@ -285,7 +288,7 @@ def _get_cf_var_data(cf_var, filename): if dims_fixed is None: dims_fixed = [dims_fixed] result = as_lazy_data( - proxy, chunks=chunks, dims_fixed=tuple(dims_fixed) + proxy, meta=meta, chunks=chunks, dims_fixed=tuple(dims_fixed) ) return result diff --git a/lib/iris/fileformats/pp.py b/lib/iris/fileformats/pp.py index 0a159b46b6..9acd2e9104 100644 --- a/lib/iris/fileformats/pp.py +++ b/lib/iris/fileformats/pp.py @@ -1756,7 +1756,8 @@ def _create_field_data(field, data_shape, land_mask_field=None): if land_mask_field is None: # For a "normal" (non-landsea-masked) field, the proxy can be # wrapped directly as a deferred array. - field.data = as_lazy_data(proxy, chunks=block_shape) + meta = np.empty((0,) * proxy.ndim, dtype=proxy.dtype) + field.data = as_lazy_data(proxy, meta=meta, chunks=block_shape) else: # This is a landsea-masked field, and its data must be handled in # a different way : Because data shape/size is not known in @@ -1807,8 +1808,12 @@ def calc_array(mask, values): return result delayed_result = calc_array(mask_field_array, delayed_valid_values) + meta = np.ma.array(np.empty((0,) * proxy.ndim, dtype=dtype), mask=True) lazy_result_array = da.from_delayed( - delayed_result, shape=block_shape, dtype=dtype + delayed_result, + shape=block_shape, + dtype=dtype, + meta=meta, ) field.data = lazy_result_array diff --git a/lib/iris/tests/unit/lazy_data/test_stack.py b/lib/iris/tests/unit/lazy_data/test_stack.py new file mode 100644 index 0000000000..880bad7df1 --- /dev/null +++ b/lib/iris/tests/unit/lazy_data/test_stack.py @@ -0,0 +1,22 @@ +# Copyright Iris contributors +# +# This file is part of Iris and is released under the BSD license. +# See LICENSE in the root of the repository for full licensing details. +"""Test function :func:`iris._lazy data.stack`.""" + +import dask.array as da +import numpy as np + +from iris._lazy_data import stack + + +def test_stack(): + seq = [ + da.arange(2), + da.ma.masked_array(da.arange(2)), + ] + result = stack(seq) + + assert isinstance(result[0].compute(), np.ma.MaskedArray) + assert isinstance(result[1].compute(), np.ma.MaskedArray) + np.testing.assert_array_equal(stack(seq).compute(), da.stack(seq).compute()) From 6d37db9421996f5b3f771486d0abe977d6910a0f Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Tue, 16 Apr 2024 16:37:29 +0200 Subject: [PATCH 3/9] Use meta for data proxies --- lib/iris/_lazy_data.py | 23 +++++++--------- .../fileformats/netcdf/_thread_safe_nc.py | 4 +++ lib/iris/fileformats/netcdf/loader.py | 7 ++--- lib/iris/fileformats/pp.py | 7 +++-- .../netcdf/loader/test__chunk_control.py | 2 +- .../fileformats/pp/test__create_field_data.py | 1 + .../unit/lazy_data/test_as_concrete_data.py | 6 +++-- .../tests/unit/lazy_data/test_as_lazy_data.py | 26 ++++++++++--------- .../unit/lazy_data/test_co_realise_cubes.py | 3 ++- 9 files changed, 43 insertions(+), 36 deletions(-) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 8d7e221545..1a2047cbd7 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -221,9 +221,7 @@ def _optimum_chunksize( ) -def as_lazy_data( - data, chunks=None, asarray=False, meta=None, dims_fixed=None, dask_chunking=False -): +def as_lazy_data(data, chunks=None, asarray=False, meta=None, dims_fixed=None): """Convert the input array `data` to a :class:`dask.array.Array`. Parameters @@ -233,6 +231,8 @@ def as_lazy_data( This will be converted to a :class:`dask.array.Array`. chunks : list of int, optional If present, a source chunk shape, e.g. for a chunked netcdf variable. + If set to "auto", Iris chunking optimisation will be bypassed, and dask's + default chunking will be used instead. asarray : bool, default=False If True, then chunks will be converted to instances of `ndarray`. Set to False (default) to pass passed chunks through unchanged. @@ -243,10 +243,6 @@ def as_lazy_data( If set, a list of values equal in length to 'chunks' or data.ndim. 'True' values indicate a dimension which can not be changed, i.e. the result for that index must equal the value in 'chunks' or data.shape. - dask_chunking : bool, default=False - If True, Iris chunking optimisation will be bypassed, and dask's default - chunking will be used instead. Including a value for chunks while dask_chunking - is set to True will result in a failure. Returns ------- @@ -265,12 +261,13 @@ def as_lazy_data( if isinstance(data, ma.core.MaskedConstant): data = ma.masked_array(data.data, mask=data.mask) - if dask_chunking: - if chunks is not None: - raise ValueError( - f"Dask chunking chosen, but chunks already assigned value {chunks}" - ) - else: + if meta is None and not isinstance(data, (np.ndarray, da.Array)): + raise ValueError( + "For performance reasons, `meta` cannot be `None` if `data` is " + "anything other than a Numpy or Dask array." + ) + + if chunks != "auto": if chunks is None: # No existing chunks : Make a chunk the shape of the entire input array # (but we will subdivide it if too big). diff --git a/lib/iris/fileformats/netcdf/_thread_safe_nc.py b/lib/iris/fileformats/netcdf/_thread_safe_nc.py index 675a151868..f62f138191 100644 --- a/lib/iris/fileformats/netcdf/_thread_safe_nc.py +++ b/lib/iris/fileformats/netcdf/_thread_safe_nc.py @@ -323,6 +323,10 @@ def ndim(self): # noqa: D102 return len(self.shape) + @property + def meta(self): + return np.ma.array(np.empty((0,) * self.ndim, dtype=self.dtype), mask=True) + def __getitem__(self, keys): # Using a DatasetWrapper causes problems with invalid ID's and the # netCDF4 library, presumably because __getitem__ gets called so many diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index 32e363a247..ab517cfc43 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -239,11 +239,8 @@ def _get_cf_var_data(cf_var, filename): ) # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". - meta = np.ma.array( - np.empty((0,) * proxy.ndim, dtype=proxy.dtype), mask=True - ) if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK: - result = as_lazy_data(proxy, meta=meta, chunks=None, dask_chunking=True) + result = as_lazy_data(proxy, meta=proxy.meta, chunks="auto") else: chunks = cf_var.cf_data.chunking() if chunks is None: @@ -288,7 +285,7 @@ def _get_cf_var_data(cf_var, filename): if dims_fixed is None: dims_fixed = [dims_fixed] result = as_lazy_data( - proxy, meta=meta, chunks=chunks, dims_fixed=tuple(dims_fixed) + proxy, meta=proxy.meta, chunks=chunks, dims_fixed=tuple(dims_fixed) ) return result diff --git a/lib/iris/fileformats/pp.py b/lib/iris/fileformats/pp.py index 9acd2e9104..f82a1bff30 100644 --- a/lib/iris/fileformats/pp.py +++ b/lib/iris/fileformats/pp.py @@ -606,6 +606,10 @@ def fill_value(self): def ndim(self): return len(self.shape) + @property + def meta(self): + return np.empty((0,) * self.ndim, dtype=self.dtype) + def __getitem__(self, keys): with open(self.path, "rb") as pp_file: pp_file.seek(self.offset, os.SEEK_SET) @@ -1756,8 +1760,7 @@ def _create_field_data(field, data_shape, land_mask_field=None): if land_mask_field is None: # For a "normal" (non-landsea-masked) field, the proxy can be # wrapped directly as a deferred array. - meta = np.empty((0,) * proxy.ndim, dtype=proxy.dtype) - field.data = as_lazy_data(proxy, meta=meta, chunks=block_shape) + field.data = as_lazy_data(proxy, meta=proxy.meta, chunks=block_shape) else: # This is a landsea-masked field, and its data must be handled in # a different way : Because data shape/size is not known in diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py index d4c813502f..3051754423 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__chunk_control.py @@ -203,7 +203,7 @@ def test_as_dask(tmp_filepath, save_cubelist_with_sigma): except RuntimeError as e: if str(e) != message: raise e - as_lazy_data.assert_called_with(ANY, chunks=None, dask_chunking=True) + as_lazy_data.assert_called_with(ANY, meta=ANY, chunks="auto") def test_pinned_optimisation(tmp_filepath, save_cubelist_with_sigma): diff --git a/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py b/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py index ab80332186..2467b4243b 100644 --- a/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py +++ b/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py @@ -55,6 +55,7 @@ def test_deferred_bytes(self): data_shape = (100, 120) proxy = mock.Mock( dtype=np.dtype("f4"), + meta=np.empty((0,) * len(data_shape), dtype=np.dtype("f4")), shape=data_shape, spec=pp.PPDataProxy, ndim=len(data_shape), diff --git a/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py b/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py index 91b22a3c0e..9b9929a156 100644 --- a/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py +++ b/lib/iris/tests/unit/lazy_data/test_as_concrete_data.py @@ -60,7 +60,8 @@ def test_lazy_mask_data(self): def test_lazy_scalar_proxy(self): a = np.array(5) proxy = MyProxy(a) - lazy_array = as_lazy_data(proxy) + meta = np.empty((0,) * proxy.ndim, dtype=proxy.dtype) + lazy_array = as_lazy_data(proxy, meta=meta) self.assertTrue(is_lazy_data(lazy_array)) result = as_concrete_data(lazy_array) self.assertFalse(is_lazy_data(result)) @@ -69,7 +70,8 @@ def test_lazy_scalar_proxy(self): def test_lazy_scalar_proxy_masked(self): a = np.ma.masked_array(5, True) proxy = MyProxy(a) - lazy_array = as_lazy_data(proxy) + meta = np.ma.array(np.empty((0,) * proxy.ndim, dtype=proxy.dtype), mask=True) + lazy_array = as_lazy_data(proxy, meta=meta) self.assertTrue(is_lazy_data(lazy_array)) result = as_concrete_data(lazy_array) self.assertFalse(is_lazy_data(result)) diff --git a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py index 0f741b10a3..90166b5e78 100644 --- a/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py +++ b/lib/iris/tests/unit/lazy_data/test_as_lazy_data.py @@ -46,26 +46,28 @@ def test_dask_chunking(self): chunks = (12,) optimum = self.patch("iris._lazy_data._optimum_chunksize") optimum.return_value = chunks - _ = as_lazy_data(data, chunks=None, dask_chunking=True) + _ = as_lazy_data(data, chunks="auto") self.assertFalse(optimum.called) - def test_dask_chunking_error(self): - data = np.arange(24) - chunks = (12,) - optimum = self.patch("iris._lazy_data._optimum_chunksize") - optimum.return_value = chunks - with self.assertRaisesRegex( - ValueError, - r"Dask chunking chosen, but chunks already assigned value", - ): - as_lazy_data(data, chunks=chunks, dask_chunking=True) - def test_with_masked_constant(self): masked_data = ma.masked_array([8], mask=True) masked_constant = masked_data[0] result = as_lazy_data(masked_constant) self.assertIsInstance(result, da.core.Array) + def test_missing_meta(self): + class MyProxy: + pass + + data = MyProxy() + + with self.assertRaisesRegex( + ValueError, + r"`meta` cannot be `None` if `data` is anything other than a Numpy " + r"or Dask array.", + ): + as_lazy_data(data) + class Test__optimised_chunks(tests.IrisTest): # Stable, known chunksize for testing. diff --git a/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py b/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py index 3b265d615d..4e304d4910 100644 --- a/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py +++ b/lib/iris/tests/unit/lazy_data/test_co_realise_cubes.py @@ -21,6 +21,7 @@ def __init__(self, array): self.ndim = array.ndim self._array = array self.access_count = 0 + self.meta = np.empty((0,) * array.ndim, dtype=array.dtype) def __getitem__(self, keys): self.access_count += 1 @@ -55,7 +56,7 @@ def test_multi(self): def test_combined_access(self): wrapped_array = ArrayAccessCounter(np.arange(3.0)) - lazy_array = as_lazy_data(wrapped_array) + lazy_array = as_lazy_data(wrapped_array, meta=wrapped_array.meta) derived_a = lazy_array + 1 derived_b = lazy_array + 2 derived_c = lazy_array + 3 From d3593c109505a6cb3cd67d62184978ec59a68d8d Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 17 Apr 2024 09:37:53 +0200 Subject: [PATCH 4/9] Add test --- .../fileformats/netcdf/loader/test__get_cf_var_data.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py index 92cb93496e..efa291a0b4 100644 --- a/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py +++ b/lib/iris/tests/unit/fileformats/netcdf/loader/test__get_cf_var_data.py @@ -10,7 +10,7 @@ from unittest import mock -from dask.array import Array as dask_array +import dask.array as da import numpy as np from iris._lazy_data import _optimum_chunksize @@ -50,7 +50,8 @@ def test_cf_data_type(self): chunks = [1, 12, 100] cf_var = self._make(chunks) lazy_data = _get_cf_var_data(cf_var, self.filename) - self.assertIsInstance(lazy_data, dask_array) + self.assertIsInstance(lazy_data, da.Array) + self.assertIsInstance(da.utils.meta_from_array(lazy_data), np.ma.MaskedArray) def test_cf_data_chunks(self): chunks = [2500, 240, 200] @@ -90,7 +91,7 @@ def test_cf_data_contiguous(self): def test_type__1kf8_is_lazy(self): cf_var = self._make(shape=(1000,), dtype="f8") var_data = _get_cf_var_data(cf_var, self.filename) - self.assertIsInstance(var_data, dask_array) + self.assertIsInstance(var_data, da.Array) def test_arraytype__1ki2_is_real(self): cf_var = self._make(shape=(1000,), dtype="i2") From be91f697a62ca3ae4023b7b939bf576cad456821 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Wed, 17 Apr 2024 09:42:09 +0200 Subject: [PATCH 5/9] Add whatsnew --- docs/src/whatsnew/latest.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index 29818ded77..2b98e897f2 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -36,7 +36,8 @@ This document explains the changes made to Iris for this release 🐛 Bugs Fixed ============= -#. N/A +#. `@bouweandela`_ updated the ``chunktype`` of Dask arrays, so it corresponds + to the array content. (:pull:`5801`) 💣 Incompatible Changes From 423533a31cf9efd46243108fdf1a22f4dee1fddc Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 25 Apr 2024 12:38:58 +0200 Subject: [PATCH 6/9] Use unified stack and concatenate functions --- lib/iris/_concatenate.py | 8 ++-- lib/iris/_lazy_data.py | 89 +++++++++++++++++++++++++++++++++++----- lib/iris/aux_factory.py | 3 +- lib/iris/cube.py | 15 ++----- 4 files changed, 88 insertions(+), 27 deletions(-) diff --git a/lib/iris/_concatenate.py b/lib/iris/_concatenate.py index 8d929c2af2..b25698da89 100644 --- a/lib/iris/_concatenate.py +++ b/lib/iris/_concatenate.py @@ -7,9 +7,9 @@ from collections import defaultdict, namedtuple import warnings -import dask.array as da import numpy as np +from iris._lazy_data import concatenate as concatenate_arrays import iris.coords import iris.cube import iris.exceptions @@ -1112,7 +1112,7 @@ def _build_cell_measures(self): skton.signature.cell_measures_and_dims[i].coord.data for skton in skeletons ] - data = np.concatenate(tuple(data), axis=dim) + data = concatenate_arrays(tuple(data), axis=dim) # Generate the associated metadata. kwargs = cube_signature.cm_metadata[i].defn._asdict() @@ -1152,7 +1152,7 @@ def _build_ancillary_variables(self): skton.signature.ancillary_variables_and_dims[i].coord.data for skton in skeletons ] - data = np.concatenate(tuple(data), axis=dim) + data = concatenate_arrays(tuple(data), axis=dim) # Generate the associated metadata. kwargs = cube_signature.av_metadata[i].defn._asdict() @@ -1245,7 +1245,7 @@ def _build_data(self): skeletons = self._skeletons data = [skeleton.data for skeleton in skeletons] - data = da.concatenate(data, self.axis) + data = concatenate_arrays(data, self.axis) return data diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 1a2047cbd7..9bb5293d25 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -42,6 +42,11 @@ def is_lazy_data(data): return result +def is_masked_data(data: np.ndarray | da.Array) -> bool: + """Return whether the argument is a masked array.""" + return isinstance(da.utils.meta_from_array(data), np.ma.MaskedArray) + + def is_lazy_masked_data(data): """Determine whether managed data is lazy and masked. @@ -49,7 +54,7 @@ def is_lazy_masked_data(data): underlying array is of masked type. Otherwise return False. """ - return is_lazy_data(data) and ma.isMA(da.utils.meta_from_array(data)) + return is_lazy_data(data) and is_masked_data(data) @lru_cache @@ -348,19 +353,83 @@ def as_concrete_data(data): return data -def stack(seq: Sequence[da.Array | np.ndarray]) -> da.Array: - """Stack arrays along a new axis. +def _combine( + arrays: Sequence[da.Array | np.ndarray], + operation: str, + **kwargs, +) -> da.Array | np.ndarray: + """Combine multiple arrays into a single array. + + Parameters + ---------- + arrays : + The arrays to combine. + operation : + The combination operation to apply. + **kwargs : + Any keyword arguments to pass to the combination operation. + + """ + lazy = any(is_lazy_data(a) for a in arrays) + masked = any(is_masked_data(a) for a in arrays) + + array_module = np + if masked: + if lazy: + # Avoid inconsistent array type when slicing resulting array + arrays = tuple( + a if is_lazy_masked_data(a) else da.ma.masked_array(a) for a in arrays + ) + else: + # Avoid dropping the masks + array_module = np.ma + + func = getattr(array_module, operation) + return func(arrays, **kwargs) + + +def concatenate( + arrays: Sequence[da.Array | np.ndarray], + axis: int = 0, +) -> da.Array | np.ndarray: + """Concatenate a sequence of arrays along a new axis. + + Parameters + ---------- + arrays : + The arrays must have the same shape, except in the dimension + corresponding to `axis` (the first, by default). + axis : + Dimension along which to align all of the arrays. If axis is None, + arrays are flattened before use. + + Returns + ------- + The concatenated array. - This version of :func:`da.stack` ensures all slices of the resulting array - are masked arrays if any of the input arrays is masked. """ + return _combine(arrays, operation="concatenate", axis=axis) + - def is_masked(a): - return isinstance(da.utils.meta_from_array(a), np.ma.MaskedArray) +def stack( + arrays: Sequence[da.Array | np.ndarray], + axis: int = 0, +) -> da.Array | np.ndarray: + """Stack a sequence of arrays along a new axis. - if any(is_masked(a) for a in seq): - seq = [a if is_masked(a) else da.ma.masked_array(a) for a in seq] - return da.stack(seq) + Parameters + ---------- + arrays : + The arrays must have the same shape. + axis : + Dimension along which to align all of the arrays. + + Returns + ------- + The stacked array. + + """ + return _combine(arrays, operation="stack", axis=axis) def multidim_lazy_stack(arr): diff --git a/lib/iris/aux_factory.py b/lib/iris/aux_factory.py index cd59575f93..41e1e9f573 100644 --- a/lib/iris/aux_factory.py +++ b/lib/iris/aux_factory.py @@ -11,6 +11,7 @@ import dask.array as da import numpy as np +from iris._lazy_data import concatenate from iris.common import CFVariableMixin, CoordMetadata, metadata_manager_factory import iris.coords from iris.warnings import IrisIgnoringBoundsWarning @@ -1076,7 +1077,7 @@ def _derive(self, sigma, eta, depth, depth_c, zlev, nsigma, coord_dims_func): result_rest_levs = zlev[z_slices_rest] * ones_full_result[z_slices_rest] # Combine nsigma and 'rest' levels for the final result. - result = da.concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim) + result = concatenate([result_nsigma_levs, result_rest_levs], axis=z_dim) return result def make_coord(self, coord_dims_func): diff --git a/lib/iris/cube.py b/lib/iris/cube.py index fecbcaf0da..8418a630b5 100644 --- a/lib/iris/cube.py +++ b/lib/iris/cube.py @@ -25,7 +25,6 @@ import zlib from cf_units import Unit -import dask.array as da import numpy as np import numpy.ma as ma @@ -3134,12 +3133,7 @@ def make_chunk(key): result = chunks[0] else: chunk_data = [chunk.core_data() for chunk in chunks] - if self.has_lazy_data(): - func = da.concatenate - else: - module = ma if ma.isMaskedArray(self.data) else np - func = module.concatenate - data = func(chunk_data, dim) + data = _lazy.concatenate(chunk_data, axis=dim) result = iris.cube.Cube(data) result.metadata = deepcopy(self.metadata) @@ -4432,13 +4426,10 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs): # Choose appropriate data and functions for data aggregation. if aggregator.lazy_func is not None and self.has_lazy_data(): - stack = da.stack input_data = self.lazy_data() agg_method = aggregator.lazy_aggregate else: input_data = self.data - # Note numpy.stack does not preserve masks. - stack = ma.stack if ma.isMaskedArray(input_data) else np.stack agg_method = aggregator.aggregate # Create data and weights slices. @@ -4475,11 +4466,11 @@ def aggregated_by(self, coords, aggregator, climatological=False, **kwargs): # before combining the different slices. if return_weights: result, weights_result = list(zip(*result)) - aggregateby_weights = stack(weights_result, axis=dimension_to_groupby) + aggregateby_weights = _lazy.stack(weights_result, axis=dimension_to_groupby) else: aggregateby_weights = None - aggregateby_data = stack(result, axis=dimension_to_groupby) + aggregateby_data = _lazy.stack(result, axis=dimension_to_groupby) # Ensure plain ndarray is output if plain ndarray was input. if ma.isMaskedArray(aggregateby_data) and not ma.isMaskedArray(input_data): aggregateby_data = ma.getdata(aggregateby_data) From 3bb6de6ebd3d146d587f15dfeb5dfb929277484f Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 25 Apr 2024 20:54:03 +0200 Subject: [PATCH 7/9] Improve docstrings Co-authored-by: Patrick Peglar --- lib/iris/_lazy_data.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 9bb5293d25..50d1b19a02 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -360,6 +360,9 @@ def _combine( ) -> da.Array | np.ndarray: """Combine multiple arrays into a single array. + Provides enhanced versions of :func:`~dask.array.concatenate` or :func:`~dask.array.stack`, + which ensure that all computed results are masked-array, if the combined .meta is masked. + Parameters ---------- arrays : @@ -394,6 +397,9 @@ def concatenate( ) -> da.Array | np.ndarray: """Concatenate a sequence of arrays along a new axis. + Improves on the regular :func:`dask.array.concatenate` by always respecting a masked + ``.meta``, as described for :func:`_combine`. + Parameters ---------- arrays : @@ -417,6 +423,9 @@ def stack( ) -> da.Array | np.ndarray: """Stack a sequence of arrays along a new axis. +Improves on the regular :func:`dask.array.stack` by always respecting a masked + ``.meta``, as described for :func:`_combine`. + Parameters ---------- arrays : From 0a8fbc4c7f3b3af1685e18ea98a92445ff3450a0 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 25 Apr 2024 18:55:08 +0000 Subject: [PATCH 8/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- lib/iris/_lazy_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 50d1b19a02..b1f8e9aa85 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -423,7 +423,7 @@ def stack( ) -> da.Array | np.ndarray: """Stack a sequence of arrays along a new axis. -Improves on the regular :func:`dask.array.stack` by always respecting a masked + Improves on the regular :func:`dask.array.stack` by always respecting a masked ``.meta``, as described for :func:`_combine`. Parameters From 00af67eed9fce33a901d646d14034e7135d6bec9 Mon Sep 17 00:00:00 2001 From: Bouwe Andela Date: Thu, 25 Apr 2024 21:15:47 +0200 Subject: [PATCH 9/9] Rename proxy.meta to proxy.dask_meta --- lib/iris/fileformats/netcdf/_thread_safe_nc.py | 2 +- lib/iris/fileformats/netcdf/loader.py | 7 +++++-- lib/iris/fileformats/pp.py | 4 ++-- .../tests/unit/fileformats/pp/test__create_field_data.py | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/iris/fileformats/netcdf/_thread_safe_nc.py b/lib/iris/fileformats/netcdf/_thread_safe_nc.py index f62f138191..3a556f5447 100644 --- a/lib/iris/fileformats/netcdf/_thread_safe_nc.py +++ b/lib/iris/fileformats/netcdf/_thread_safe_nc.py @@ -324,7 +324,7 @@ def ndim(self): return len(self.shape) @property - def meta(self): + def dask_meta(self): return np.ma.array(np.empty((0,) * self.ndim, dtype=self.dtype), mask=True) def __getitem__(self, keys): diff --git a/lib/iris/fileformats/netcdf/loader.py b/lib/iris/fileformats/netcdf/loader.py index ab517cfc43..95cbc120f5 100644 --- a/lib/iris/fileformats/netcdf/loader.py +++ b/lib/iris/fileformats/netcdf/loader.py @@ -240,7 +240,7 @@ def _get_cf_var_data(cf_var, filename): # Get the chunking specified for the variable : this is either a shape, or # maybe the string "contiguous". if CHUNK_CONTROL.mode is ChunkControl.Modes.AS_DASK: - result = as_lazy_data(proxy, meta=proxy.meta, chunks="auto") + result = as_lazy_data(proxy, meta=proxy.dask_meta, chunks="auto") else: chunks = cf_var.cf_data.chunking() if chunks is None: @@ -285,7 +285,10 @@ def _get_cf_var_data(cf_var, filename): if dims_fixed is None: dims_fixed = [dims_fixed] result = as_lazy_data( - proxy, meta=proxy.meta, chunks=chunks, dims_fixed=tuple(dims_fixed) + proxy, + meta=proxy.dask_meta, + chunks=chunks, + dims_fixed=tuple(dims_fixed), ) return result diff --git a/lib/iris/fileformats/pp.py b/lib/iris/fileformats/pp.py index f82a1bff30..c2660d022c 100644 --- a/lib/iris/fileformats/pp.py +++ b/lib/iris/fileformats/pp.py @@ -607,7 +607,7 @@ def ndim(self): return len(self.shape) @property - def meta(self): + def dask_meta(self): return np.empty((0,) * self.ndim, dtype=self.dtype) def __getitem__(self, keys): @@ -1760,7 +1760,7 @@ def _create_field_data(field, data_shape, land_mask_field=None): if land_mask_field is None: # For a "normal" (non-landsea-masked) field, the proxy can be # wrapped directly as a deferred array. - field.data = as_lazy_data(proxy, meta=proxy.meta, chunks=block_shape) + field.data = as_lazy_data(proxy, meta=proxy.dask_meta, chunks=block_shape) else: # This is a landsea-masked field, and its data must be handled in # a different way : Because data shape/size is not known in diff --git a/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py b/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py index 2467b4243b..266502253a 100644 --- a/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py +++ b/lib/iris/tests/unit/fileformats/pp/test__create_field_data.py @@ -55,7 +55,7 @@ def test_deferred_bytes(self): data_shape = (100, 120) proxy = mock.Mock( dtype=np.dtype("f4"), - meta=np.empty((0,) * len(data_shape), dtype=np.dtype("f4")), + dask_meta=np.empty((0,) * len(data_shape), dtype=np.dtype("f4")), shape=data_shape, spec=pp.PPDataProxy, ndim=len(data_shape),