Skip to content

Commit

Permalink
Data first (#2392)
Browse files Browse the repository at this point in the history
Core refactor for dask usage
  • Loading branch information
marqh committed Feb 23, 2017
1 parent 758e8e1 commit d2116d9
Show file tree
Hide file tree
Showing 50 changed files with 406 additions and 539 deletions.
23 changes: 23 additions & 0 deletions docs/iris/src/developers_guide/dask_interface.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Iris Dask Interface
*******************

Iris uses dask (http://dask.pydata.org) to manage lazy data interfaces and processing graphs. The key principles which define this interface are:

* A call to `cube.data` will always load all of the data.
* Once this has happened:
* `cube.data` is a mutable numpy masked array or ndarray;
* `cube._numpy_array` is a private numpy masked array, accessible via `cube.data`, which may strip off the mask and return a reference to the bare ndarray.
* `cube.data` may be used to set the data, this accepts:
* a numpy array (including masked array), which is assigned to `cube._numpy_array`;
* a dask array, which is assigned to `cube._dask_array` an `cube._numpy_array` is set to None.
* `cube._dask_array` may be None, otherwise it is expected to be a dask graph:
* this may wrap a proxy to a file collection;
* this may wrap the numpy array in `cube._numpy_array`.
* All dask graphs wrap array-like object where missing data is represented by `nan`:
* masked arrays derived from these arrays shall create their mask using the nan location;
* where dask wrapped `int` arrays require masks, these will first be cast to `float`.
* In order to support this mask conversion, cube's have a `fill_value` as part of their metadata, which may be None.
* Array copying is kept to an absolute minimum:
* array references should always be passed, not new arrays created, unless an explicit copy operation is requested.
* To test for the presence of a dask array of any sort, we use:
* `iris._lazy_data.is_lazy_data` which is implemented as `hasattr(data, 'compute')`.
1 change: 1 addition & 0 deletions docs/iris/src/developers_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@
tests.rst
deprecations.rst
release.rst
dask_interface.rst
104 changes: 6 additions & 98 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@
import numpy as np


# Whether to recognise biggus arrays as lazy, *as well as* dask.
# NOTE: in either case, this module will not *make* biggus arrays, only dask.
_SUPPORT_BIGGUS = True

if _SUPPORT_BIGGUS:
import biggus


def is_lazy_data(data):
"""
Return whether the argument is an Iris 'lazy' data array.
Expand All @@ -45,102 +37,18 @@ def is_lazy_data(data):
"""
result = hasattr(data, 'compute')
if not result and _SUPPORT_BIGGUS:
result = isinstance(data, biggus.Array)
return result


def as_concrete_data(data):
"""
Return the actual content of the argument, as a numpy masked array.
If lazy, return the realised data, otherwise return the argument unchanged.
"""
if is_lazy_data(data):
if _SUPPORT_BIGGUS and isinstance(data, biggus.Array):
# Realise biggus array.
# treat all as masked, for standard cube.data behaviour.
data = data.masked_array()
else:
# Grab a fill value, in case this is just a converted masked array.
fill_value = getattr(data, 'fill_value', None)
# Realise dask array.
data = data.compute()
# Convert NaN arrays into masked arrays for Iris' consumption.
mask = np.isnan(data)
if np.all(~mask):
mask = None
data = np.ma.masked_array(data, mask=mask,
fill_value=fill_value)
return data


# A magic value, borrowed from biggus
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2


def as_lazy_data(data):
"""
Return a lazy equivalent of the argument, as a lazy array.
For an existing lazy array, return it unchanged.
Otherwise, return the argument wrapped with dask.array.from_array.
This assumes the underlying object has numpy-array-like properties.
.. Note::
For now at least, chunksize is set to an arbitrary fixed value.
"""
if not is_lazy_data(data):
# record the original fill value.
fill_value = getattr(data, 'fill_value', None)
if isinstance(data, np.ma.MaskedArray):
# Use with NaNs replacing the mask.
data = array_masked_to_nans(data)
data = da.from_array(data, chunks=_MAX_CHUNK_SIZE)
# Attach any fill value to the dask object.
# Note: this is not passed on to dask arrays derived from this one.
data.fill_value = fill_value
elif not hasattr(data, 'fill_value'):
data.fill_value = None # make it look more like a biggus Array ?
return data


def array_masked_to_nans(array):
def array_masked_to_nans(array, mask=None):
"""
Convert a masked array to a normal array with NaNs at masked points.
This is used for dask integration, as dask does not support masked arrays.
Note that any fill value will be lost.
"""
if np.ma.is_masked(array):
# Array has some masked points : use unmasked near-equivalent.
if array.dtype.kind == 'f':
# Floating : convert the masked points to NaNs.
array = array.filled(np.nan)
else:
# Integer : no conversion (i.e. do *NOT* fill with fill value)
# array = array.filled()
array = array.data
else:
# Ensure result is not masked (converts arrays with empty masks).
if isinstance(array, np.ma.MaskedArray):
array = array.data
return array


def array_nans_to_masked(array):
"""
Convert an array into a masked array, masking any NaN points.
"""
if (not isinstance(array, np.ma.masked_array) and
array.dtype.kind == 'f'):
mask = np.isnan(array)
if np.any(mask):
# Turn any unmasked array with NaNs into a masked array.
array = np.ma.masked_array(array, mask=mask)
if mask is None:
mask = array.mask
if array.dtype.kind == 'i':
array = array.astype(np.dtype('f8'))
array[mask] = np.nan
return array
10 changes: 7 additions & 3 deletions lib/iris/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import numpy as np
import numpy.ma as ma

from iris._lazy_data import is_lazy_data, array_masked_to_nans
import iris.cube
import iris.coords
import iris.exceptions
from iris._lazy_data import is_lazy_data, as_concrete_data, as_lazy_data
import iris.util


Expand Down Expand Up @@ -1231,14 +1231,18 @@ def merge(self, unique=True):
if is_lazy_data(data):
all_have_data = False
else:
data = as_lazy_data(data)
if isinstance(data, ma.MaskedArray):
if ma.is_masked(data):
data = array_masked_to_nans(data)
data = data.data
data = da.from_array(data, chunks=data.shape)
stack[nd_index] = data

merged_data = _multidim_daskstack(stack)
if all_have_data:
# All inputs were concrete, so turn the result back into a
# normal array.
merged_data = as_concrete_data(merged_data)
merged_data = merged_data.compute()
# Unmask the array only if it is filled.
if (ma.isMaskedArray(merged_data) and
ma.count_masked(merged_data) == 0):
Expand Down
Loading

0 comments on commit d2116d9

Please sign in to comment.