diff --git a/docs/src/whatsnew/latest.rst b/docs/src/whatsnew/latest.rst index 76745f8667..9150568316 100644 --- a/docs/src/whatsnew/latest.rst +++ b/docs/src/whatsnew/latest.rst @@ -51,6 +51,10 @@ This document explains the changes made to Iris for this release #. `@bouweandela`_ made :func:`iris.util.rolling_window` work with lazy arrays. (:pull:`5775`) +#. `@stephenworsley`_ fixed a potential memory leak for Iris uses of + :func:`dask.array.map_blocks`; known specifically to be a problem in the + :class:`iris.analysis.AreaWeighted` regridder. (:pull:`5767`) + 🔥 Deprecations =============== diff --git a/lib/iris/_lazy_data.py b/lib/iris/_lazy_data.py index 7791b654a1..40984248d1 100644 --- a/lib/iris/_lazy_data.py +++ b/lib/iris/_lazy_data.py @@ -450,10 +450,11 @@ def lazy_elementwise(lazy_array, elementwise_op): return da.map_blocks(elementwise_op, lazy_array, dtype=dtype) -def map_complete_blocks(src, func, dims, out_sizes): +def map_complete_blocks(src, func, dims, out_sizes, *args, **kwargs): """Apply a function to complete blocks. Complete means that the data is not chunked along the chosen dimensions. + Uses :func:`dask.array.map_blocks` to implement the mapping. Parameters ---------- @@ -465,27 +466,47 @@ def map_complete_blocks(src, func, dims, out_sizes): Dimensions that cannot be chunked. out_sizes : tuple of int Output size of dimensions that cannot be chunked. + *args : tuple + Additional arguments to pass to `func`. + **kwargs : dict + Additional keyword arguments to pass to `func`. + + Returns + ------- + Array-like + + See Also + -------- + :func:`dask.array.map_blocks` : The function used for the mapping. """ + data = None + result = None + if is_lazy_data(src): data = src elif not hasattr(src, "has_lazy_data"): # Not a lazy array and not a cube. So treat as ordinary numpy array. - return func(src) + result = func(src, *args, **kwargs) elif not src.has_lazy_data(): - return func(src.data) + result = func(src.data, *args, **kwargs) else: data = src.lazy_data() - # Ensure dims are not chunked - in_chunks = list(data.chunks) - for dim in dims: - in_chunks[dim] = src.shape[dim] - data = data.rechunk(in_chunks) + if result is None and data is not None: + # Ensure dims are not chunked + in_chunks = list(data.chunks) + for dim in dims: + in_chunks[dim] = src.shape[dim] + data = data.rechunk(in_chunks) - # Determine output chunks - out_chunks = list(data.chunks) - for dim, size in zip(dims, out_sizes): - out_chunks[dim] = size + # Determine output chunks + out_chunks = list(data.chunks) + for dim, size in zip(dims, out_sizes): + out_chunks[dim] = size - return data.map_blocks(func, chunks=out_chunks, dtype=src.dtype) + result = data.map_blocks( + func, *args, chunks=out_chunks, dtype=src.dtype, **kwargs + ) + + return result diff --git a/lib/iris/analysis/__init__.py b/lib/iris/analysis/__init__.py index 7972282201..6678237c1c 100644 --- a/lib/iris/analysis/__init__.py +++ b/lib/iris/analysis/__init__.py @@ -1388,18 +1388,16 @@ def _percentile(data, percent, fast_percentile_method=False, **kwargs): percent = [percent] percent = np.array(percent) - # Perform the percentile calculation. - _partial_percentile = functools.partial( + result = iris._lazy_data.map_complete_blocks( + data, _calc_percentile, + (-1,), + percent.shape, percent=percent, fast_percentile_method=fast_percentile_method, **kwargs, ) - result = iris._lazy_data.map_complete_blocks( - data, _partial_percentile, (-1,), percent.shape - ) - # Check whether to reduce to a scalar result, as per the behaviour # of other aggregators. if result.shape == (1,): diff --git a/lib/iris/analysis/_area_weighted.py b/lib/iris/analysis/_area_weighted.py index 9a63a49457..a25a21bb47 100644 --- a/lib/iris/analysis/_area_weighted.py +++ b/lib/iris/analysis/_area_weighted.py @@ -392,9 +392,11 @@ def _regrid_area_weighted_rectilinear_src_and_grid__perform( tgt_shape = (len(grid_y.points), len(grid_x.points)) - # Calculate new data array for regridded cube. - regrid = functools.partial( + new_data = map_complete_blocks( + src_cube, _regrid_along_dims, + (src_y_dim, src_x_dim), + meshgrid_x.shape, x_dim=src_x_dim, y_dim=src_y_dim, weights=weights, @@ -402,10 +404,6 @@ def _regrid_area_weighted_rectilinear_src_and_grid__perform( mdtol=mdtol, ) - new_data = map_complete_blocks( - src_cube, regrid, (src_y_dim, src_x_dim), meshgrid_x.shape - ) - # Wrap up the data as a Cube. _regrid_callback = functools.partial( diff --git a/lib/iris/analysis/_regrid.py b/lib/iris/analysis/_regrid.py index 42c6bad499..6c10b8c404 100644 --- a/lib/iris/analysis/_regrid.py +++ b/lib/iris/analysis/_regrid.py @@ -935,9 +935,11 @@ def __call__(self, src): x_dim = src.coord_dims(src_x_coord)[0] y_dim = src.coord_dims(src_y_coord)[0] - # Define regrid function - regrid = functools.partial( + data = map_complete_blocks( + src, self._regrid, + (y_dim, x_dim), + sample_grid_x.shape, x_dim=x_dim, y_dim=y_dim, src_x_coord=src_x_coord, @@ -948,8 +950,6 @@ def __call__(self, src): extrapolation_mode=self._extrapolation_mode, ) - data = map_complete_blocks(src, regrid, (y_dim, x_dim), sample_grid_x.shape) - # Wrap up the data as a Cube. _regrid_callback = functools.partial( self._regrid,