Skip to content

Commit

Permalink
Improve performance of volume_statistics (#1545)
Browse files Browse the repository at this point in the history
* Use dask and iris

* Fix flake

* Add mask

* Fix

* Improve test coverage

* Fix flake

* Fix typo

* Add test

* Remove duplicated test

* Add docstring

* Add back accidentally removed test

* Collapse all axis at once
  • Loading branch information
sloosvel authored Jun 17, 2022
1 parent d7ce1d2 commit acc5377
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 171 deletions.
158 changes: 12 additions & 146 deletions esmvalcore/preprocessor/_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
depth or height regions; constructing volumetric averages;
"""
import logging
from copy import deepcopy

import dask.array as da
import iris
Expand Down Expand Up @@ -52,90 +51,6 @@ def extract_volume(cube, z_min, z_max):
return cube.extract(z_constraint)


def _create_cube_time(src_cube, data, times):
"""Generate a new cube with the volume averaged data.
The resultant cube is seeded with `src_cube` metadata and coordinates,
excluding any source coordinates that span the associated vertical
dimension. The `times` of interpolation are used along with the
associated source cube time coordinate metadata to add a new
time coordinate to the resultant cube.
Based on the _create_cube method from _regrid.py.
Parameters
----------
src_cube : cube
The source cube that was vertically interpolated.
data : array
The payload resulting from interpolating the source cube
over the specified times.
times : array
The array of times.
Returns
-------
cube
.. note::
If there is only one level of interpolation, the resultant cube
will be collapsed over the associated vertical dimension, and a
scalar vertical coordinate will be added.
"""
# Get the source cube vertical coordinate and associated dimension.
src_times = src_cube.coord('time')
t_dim, = src_cube.coord_dims(src_times)

if data.shape[t_dim] != len(times):
emsg = ('Mismatch between data and times for data dimension {!r}, '
'got data shape {!r} with times shape {!r}.')
raise ValueError(emsg.format(t_dim, data.shape, times.shape))

# Construct the resultant cube with the interpolated data
# and the source cube metadata.
kwargs = deepcopy(src_cube.metadata)._asdict()
result = iris.cube.Cube(data, **kwargs)

# Add the appropriate coordinates to the cube, excluding
# any coordinates that span the z-dimension of interpolation.
for coord in src_cube.dim_coords:
[dim] = src_cube.coord_dims(coord)
if dim != t_dim:
result.add_dim_coord(coord.copy(), dim)

for coord in src_cube.aux_coords:
dims = src_cube.coord_dims(coord)
if t_dim not in dims:
result.add_aux_coord(coord.copy(), dims)

for coord in src_cube.derived_coords:
dims = src_cube.coord_dims(coord)
if t_dim not in dims:
result.add_aux_coord(coord.copy(), dims)

# Construct the new vertical coordinate for the interpolated
# z-dimension, using the associated source coordinate metadata.
metadata = src_times.metadata

kwargs = {
'standard_name': metadata.standard_name,
'long_name': metadata.long_name,
'var_name': metadata.var_name,
'units': metadata.units,
'attributes': metadata.attributes,
'coord_system': metadata.coord_system,
'climatological': metadata.climatological,
}

try:
coord = iris.coords.DimCoord(times, **kwargs)
result.add_dim_coord(coord, t_dim)
except ValueError:
coord = iris.coords.AuxCoord(times, **kwargs)
result.add_aux_coord(coord, t_dim)

return result


def calculate_volume(cube):
"""Calculate volume from a cube.
Expand All @@ -162,7 +77,7 @@ def calculate_volume(cube):

# ####
# Calculate grid volume:
area = iris.analysis.cartography.area_weights(cube)
area = da.array(iris.analysis.cartography.area_weights(cube))
if thickness.ndim == 1 and z_dim == 1:
grid_volume = area * thickness[None, :, None, None]
if thickness.ndim == 4 and z_dim == 1:
Expand Down Expand Up @@ -198,9 +113,8 @@ def volume_statistics(cube, operator):
# TODO: Test sigma coordinates.
# TODO: Add other operations.

# ####
# Load z coordinate field and figure out which dim is which.
t_dim = cube.coord_dims('time')[0]
if operator != 'mean':
raise ValueError(f'Volume operator {operator} not recognised.')

try:
grid_volume = cube.cell_measure('ocean_volume').core_data()
Expand All @@ -214,65 +128,17 @@ def volume_statistics(cube, operator):

if cube.data.shape != grid_volume.shape:
raise ValueError('Cube shape ({}) doesn`t match grid volume shape '
'({})'.format(cube.data.shape, grid_volume.shape))

# #####
# Calculate global volume weighted average
result = []
# #####
# iterate over time and z-coordinate dimensions.
for time_itr in range(cube.shape[t_dim]):
# ####
# create empty output arrays
column = []
depth_volume = []
f'({cube.shape, grid_volume.shape})')

# ####
# iterate over time and z-coordinate dimensions.
for z_itr in range(cube.shape[1]):
# ####
# Calculate weighted mean for this time and layer
if operator == 'mean':
total = cube[time_itr, z_itr].collapsed(
[cube.coord(axis='z'), 'longitude', 'latitude'],
iris.analysis.MEAN,
weights=grid_volume[time_itr, z_itr]).data
else:
raise ValueError('Volume operator ({}) not '
'recognised.'.format(operator))
column.append(total)

try:
layer_vol = np.ma.masked_where(cube[time_itr, z_itr].data.mask,
grid_volume[time_itr,
z_itr]).sum()

except AttributeError:
# ####
# No mask in the cube data.
layer_vol = grid_volume.sum()
depth_volume.append(layer_vol)
# ####
# Calculate weighted mean over the water volumn
column = np.ma.array(column)
depth_volume = np.ma.array(depth_volume)
result.append(np.ma.average(column, weights=depth_volume))
masked_volume = da.ma.masked_where(
da.ma.getmaskarray(cube.lazy_data()),
grid_volume)
result = cube.collapsed(
[cube.coord(axis='Z'), cube.coord(axis='Y'), cube.coord(axis='X')],
iris.analysis.MEAN,
weights=masked_volume)

# ####
# Send time series and dummy cube to cube creating tool.
times = np.array(cube.coord('time').points.astype(float))
result = np.ma.array(result)

# #####
# Create a small dummy output array for the output cube
if operator == 'mean':
src_cube = cube[:2, :2].collapsed(
[cube.coord(axis='z'), 'longitude', 'latitude'],
iris.analysis.MEAN,
weights=grid_volume[:2, :2],
)

return _create_cube_time(src_cube, result, times)
return result


def depth_integration(cube):
Expand Down
73 changes: 48 additions & 25 deletions tests/unit/preprocessor/_volume/test_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@
from cf_units import Unit

import tests
from esmvalcore.preprocessor._volume import (volume_statistics,
depth_integration,
extract_trajectory,
extract_transect,
extract_volume,
calculate_volume)
from esmvalcore.preprocessor._volume import (
calculate_volume,
depth_integration,
extract_trajectory,
extract_transect,
extract_volume,
volume_statistics,
)


class Test(tests.Test):
"""Test class for _volume_pp"""
"""Test class for _volume_pp."""

def setUp(self):
"""Prepare tests"""
"""Prepare tests."""
coord_sys = iris.coord_systems.GeogCS(iris.fileformats.pp.EARTH_RADIUS)
data1 = np.ones((3, 2, 2))
data2 = np.ma.ones((2, 3, 2, 2))
Expand Down Expand Up @@ -86,9 +88,8 @@ def test_extract_volume(self):
self.assert_array_equal(result.data, expected)

def test_extract_volume_mean(self):
"""
Test to extract the top two layers and compute the
weighted average of a cube."""
"""Test to extract the top two layers and compute the weighted average
of a cube."""
grid_volume = calculate_volume(self.grid_4d)
measure = iris.coords.CellMeasure(
grid_volume,
Expand All @@ -110,8 +111,8 @@ def test_volume_statistics(self):
self.assert_array_equal(result.data, expected)

def test_volume_statistics_cell_measure(self):
"""
Test to take the volume weighted average of a (2,3,2,2) cube.
"""Test to take the volume weighted average of a (2,3,2,2) cube.
The volume measure is pre-loaded in the cube.
"""
grid_volume = calculate_volume(self.grid_4d)
Expand All @@ -126,36 +127,58 @@ def test_volume_statistics_cell_measure(self):
self.assert_array_equal(result.data, expected)

def test_volume_statistics_long(self):
"""
Test to take the volume weighted average of a (4,3,2,2) cube.
"""Test to take the volume weighted average of a (4,3,2,2) cube.
This extra time is needed, as the volume average calculation uses
different methods for small and large cubes.
This extra time is needed, as the volume average calculation
uses different methods for small and large cubes.
"""
result = volume_statistics(self.grid_4d_2, 'mean')
expected = np.ma.array([1., 1., 1., 1.], mask=False)
self.assert_array_equal(result.data, expected)

def test_volume_statistics_masked_level(self):
"""
Test to take the volume weighted average of a (2,3,2,2) cube
where the last depth level is fully masked.
"""
"""Test to take the volume weighted average of a (2,3,2,2) cube where
the last depth level is fully masked."""
self.grid_4d.data[:, -1, :, :] = np.ma.masked_all((2, 2, 2))
result = volume_statistics(self.grid_4d, 'mean')
expected = np.ma.array([1., 1.], mask=False)
self.assert_array_equal(result.data, expected)

def test_volume_statistics_masked_timestep(self):
"""
Test to take the volume weighted average of a (2,3,2,2) cube
where the first timestep is fully masked.
"""
"""Test to take the volume weighted average of a (2,3,2,2) cube where
the first timestep is fully masked."""
self.grid_4d.data[0, :, :, :] = np.ma.masked_all((3, 2, 2))
result = volume_statistics(self.grid_4d, 'mean')
expected = np.ma.array([1., 1], mask=[True, False])
self.assert_array_equal(result.data, expected)

def test_volume_statistics_weights(self):
"""Test to take the volume weighted average of a (2,3,2,2) cube.
The data and weights are not defined as arrays of ones.
"""
data = np.ma.arange(1, 25).reshape(2, 3, 2, 2)
self.grid_4d.data = data
measure = iris.coords.CellMeasure(
data,
standard_name='ocean_volume',
units='m3',
measure='volume'
)
self.grid_4d.add_cell_measure(measure, range(0, measure.ndim))
result = volume_statistics(self.grid_4d, 'mean')
expected = np.ma.array(
[8.333333333333334, 19.144144144144143],
mask=[False, False])
self.assert_array_equal(result.data, expected)

def test_volume_statistics_wrong_operator(self):
with self.assertRaises(ValueError) as err:
volume_statistics(self.grid_4d, 'wrong')
self.assertEqual(
'Volume operator wrong not recognised.',
str(err.exception))

def test_depth_integration_1d(self):
"""Test to take the depth integration of a 3 layer cube."""
result = depth_integration(self.grid_3d[:, 0, 0])
Expand Down

0 comments on commit acc5377

Please sign in to comment.