Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data first #2392

Merged
merged 41 commits into from
Feb 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
667a191
not my data
marqh Feb 16, 2017
dd593e8
missing data and fill values
marqh Feb 17, 2017
d519ea3
graph array reference
marqh Feb 18, 2017
02ec854
b0rked referencing
marqh Feb 18, 2017
eee7580
make data primate once called
marqh Feb 18, 2017
beb5189
fill_value not done yet
marqh Feb 18, 2017
10704cf
fill value handling and naming
marqh Feb 19, 2017
a7c79f3
_my_data is always masked, data is not
marqh Feb 19, 2017
ad1ad55
retire _lazy_data
marqh Feb 19, 2017
aa0d9d7
Merge branch 'dmerge' into data_first
marqh Feb 19, 2017
966af7e
ensure proxies are wrapped in dask arrays
marqh Feb 19, 2017
59dcb80
developer docs
marqh Feb 20, 2017
6c08ae4
make data_graph private
marqh Feb 20, 2017
4e5ec49
array types, ensure nans in lazy array
marqh Feb 21, 2017
235ef13
avoid copying
marqh Feb 21, 2017
3567369
mask handling
marqh Feb 21, 2017
d182e72
centralise is_dask_array
marqh Feb 21, 2017
1755ede
rename; _my_data: _numpy_array ; _data_graph: _dask_array
marqh Feb 21, 2017
18795ad
util.is_dask_array
marqh Feb 21, 2017
4f21dc7
coding standards
marqh Feb 21, 2017
96d60ae
fill_value, dtype, shape on cube; proxies always return nan arrays
marqh Feb 21, 2017
5716327
test bug fixing
marqh Feb 21, 2017
e9f444f
test failure fixing
marqh Feb 21, 2017
cb9e675
remove biggus from tests
marqh Feb 21, 2017
2bcd16b
skip intersection tests
marqh Feb 21, 2017
7dee954
test fixing
marqh Feb 22, 2017
64794b4
further test bugs
marqh Feb 22, 2017
582625e
test patching
marqh Feb 22, 2017
d40a4e6
f
marqh Feb 22, 2017
9b2dce3
minimal tests
marqh Feb 22, 2017
1e7b6cb
various test skippers
marqh Feb 22, 2017
26a1028
f
marqh Feb 22, 2017
426d48f
review changes
marqh Feb 22, 2017
6488aba
return _lazy_data helpers
marqh Feb 22, 2017
0bcb822
cube data setter
marqh Feb 22, 2017
fda4732
pp tests
marqh Feb 22, 2017
af9c942
test bugs
marqh Feb 22, 2017
b4ab240
testing
marqh Feb 22, 2017
3586a28
review actions
marqh Feb 23, 2017
ca61115
removed pp data_cache
marqh Feb 23, 2017
02ff5ba
skip mo_pack patch
marqh Feb 23, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/iris/src/developers_guide/dask_interface.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Iris Dask Interface
*******************

Iris uses dask (http://dask.pydata.org) to manage lazy data interfaces and processing graphs. The key principles which define this interface are:

* A call to `cube.data` will always load all of the data.
* Once this has happened:
* `cube.data` is a mutable numpy masked array or ndarray;
* `cube._numpy_array` is a private numpy masked array, accessible via `cube.data`, which may strip off the mask and return a reference to the bare ndarray.
* `cube.data` may be used to set the data, this accepts:
* a numpy array (including masked array), which is assigned to `cube._numpy_array`;
* a dask array, which is assigned to `cube._dask_array` an `cube._numpy_array` is set to None.
* `cube._dask_array` may be None, otherwise it is expected to be a dask graph:
* this may wrap a proxy to a file collection;
* this may wrap the numpy array in `cube._numpy_array`.
* All dask graphs wrap array-like object where missing data is represented by `nan`:
* masked arrays derived from these arrays shall create their mask using the nan location;
* where dask wrapped `int` arrays require masks, these will first be cast to `float`.
* In order to support this mask conversion, cube's have a `fill_value` as part of their metadata, which may be None.
* Array copying is kept to an absolute minimum:
* array references should always be passed, not new arrays created, unless an explicit copy operation is requested.
* To test for the presence of a dask array of any sort, we use:
* `iris._lazy_data.is_lazy_data` which is implemented as `hasattr(data, 'compute')`.
1 change: 1 addition & 0 deletions docs/iris/src/developers_guide/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@
tests.rst
deprecations.rst
release.rst
dask_interface.rst
104 changes: 6 additions & 98 deletions lib/iris/_lazy_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,6 @@
import numpy as np


# Whether to recognise biggus arrays as lazy, *as well as* dask.
# NOTE: in either case, this module will not *make* biggus arrays, only dask.
_SUPPORT_BIGGUS = True

if _SUPPORT_BIGGUS:
import biggus


def is_lazy_data(data):
"""
Return whether the argument is an Iris 'lazy' data array.
Expand All @@ -45,102 +37,18 @@ def is_lazy_data(data):

"""
result = hasattr(data, 'compute')
if not result and _SUPPORT_BIGGUS:
result = isinstance(data, biggus.Array)
return result


def as_concrete_data(data):
"""
Return the actual content of the argument, as a numpy masked array.

If lazy, return the realised data, otherwise return the argument unchanged.

"""
if is_lazy_data(data):
if _SUPPORT_BIGGUS and isinstance(data, biggus.Array):
# Realise biggus array.
# treat all as masked, for standard cube.data behaviour.
data = data.masked_array()
else:
# Grab a fill value, in case this is just a converted masked array.
fill_value = getattr(data, 'fill_value', None)
# Realise dask array.
data = data.compute()
# Convert NaN arrays into masked arrays for Iris' consumption.
mask = np.isnan(data)
if np.all(~mask):
mask = None
data = np.ma.masked_array(data, mask=mask,
fill_value=fill_value)
return data


# A magic value, borrowed from biggus
_MAX_CHUNK_SIZE = 8 * 1024 * 1024 * 2


def as_lazy_data(data):
"""
Return a lazy equivalent of the argument, as a lazy array.

For an existing lazy array, return it unchanged.
Otherwise, return the argument wrapped with dask.array.from_array.
This assumes the underlying object has numpy-array-like properties.

.. Note::

For now at least, chunksize is set to an arbitrary fixed value.

"""
if not is_lazy_data(data):
# record the original fill value.
fill_value = getattr(data, 'fill_value', None)
if isinstance(data, np.ma.MaskedArray):
# Use with NaNs replacing the mask.
data = array_masked_to_nans(data)
data = da.from_array(data, chunks=_MAX_CHUNK_SIZE)
# Attach any fill value to the dask object.
# Note: this is not passed on to dask arrays derived from this one.
data.fill_value = fill_value
elif not hasattr(data, 'fill_value'):
data.fill_value = None # make it look more like a biggus Array ?
return data


def array_masked_to_nans(array):
def array_masked_to_nans(array, mask=None):
"""
Convert a masked array to a normal array with NaNs at masked points.

This is used for dask integration, as dask does not support masked arrays.
Note that any fill value will be lost.

"""
if np.ma.is_masked(array):
# Array has some masked points : use unmasked near-equivalent.
if array.dtype.kind == 'f':
# Floating : convert the masked points to NaNs.
array = array.filled(np.nan)
else:
# Integer : no conversion (i.e. do *NOT* fill with fill value)
# array = array.filled()
array = array.data
else:
# Ensure result is not masked (converts arrays with empty masks).
if isinstance(array, np.ma.MaskedArray):
array = array.data
return array


def array_nans_to_masked(array):
"""
Convert an array into a masked array, masking any NaN points.

"""
if (not isinstance(array, np.ma.masked_array) and
array.dtype.kind == 'f'):
mask = np.isnan(array)
if np.any(mask):
# Turn any unmasked array with NaNs into a masked array.
array = np.ma.masked_array(array, mask=mask)
if mask is None:
mask = array.mask
Copy link
Member

@bjlittle bjlittle Feb 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marqh Of course if you don't pass in a masked array, then this will raise an exception ...

... every time this function is called (pretty) much, it's always within an if statement to check to see if the array is a masked array.

Why don't we refactor this, so that any array can be passed in. If the array is not a MaskedArray then just pass it through. If it's a MaskedArray then do the NaN-ifycation, as is.

That would save on the use of if isinstance(array, ma.MaskedArray) .... array_masked_to_nans(...) pattern.

If you buy into that, then I'd simply suggest calling this function array_to_nans ... but that's minor to my previous points.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There have been multiple requests to factor out this logic, but care is required as it is used in different ways in different contexts

... every time this function is called (pretty) much, it's always within an if statement to check to see if the array is a masked array.

that is not the case

this function is called 3 times so far, one of which is with a numpy array, 2 with masks.
We will want more numpy only uses, I am confident. In these cases the mask test has to be passed in explicitly.

plausible approaches:

  • as is, if you use this wrong, it fails; it is in a private module so should only be used by Iris code
  • test for type within the function, such that if it is called without mask the type must be a masked array
  • demand that a mask is always provided, such that the call becomes array_masked_to_nans(mymaskedarray, mymaskedarray.mask)
  • ...??

if array.dtype.kind == 'i':
array = array.astype(np.dtype('f8'))
array[mask] = np.nan
return array
10 changes: 7 additions & 3 deletions lib/iris/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import numpy as np
import numpy.ma as ma

from iris._lazy_data import is_lazy_data, array_masked_to_nans
import iris.cube
import iris.coords
import iris.exceptions
from iris._lazy_data import is_lazy_data, as_concrete_data, as_lazy_data
import iris.util


Expand Down Expand Up @@ -1231,14 +1231,18 @@ def merge(self, unique=True):
if is_lazy_data(data):
all_have_data = False
else:
data = as_lazy_data(data)
if isinstance(data, ma.MaskedArray):
if ma.is_masked(data):
data = array_masked_to_nans(data)
data = data.data
data = da.from_array(data, chunks=data.shape)
stack[nd_index] = data

merged_data = _multidim_daskstack(stack)
if all_have_data:
# All inputs were concrete, so turn the result back into a
# normal array.
merged_data = as_concrete_data(merged_data)
merged_data = merged_data.compute()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@marqh dask is pretty greedy, and why not. By default dask.array uses the dask.threaded.get scheduler and will also default to using as many worker as there are cores.

I'd suggest that whenever we use compute we factor in a bit of grace and not handover all of the cores to dask. For me, a sensible initial default might be something like multiprocessing.cpu_count() - 1.

We'll need to think (perhaps later once the dust settles) about how we pass options down into dask to control the scheduler of choice, see here, which may be relevant for those users targeting clusters and opting to use distributed.

For now, I'd suggest:

import multiprocessing
...
    merged_data = merged_data.compute(num_workers=multiprocessing.cpu_count() - 1)

# Unmask the array only if it is filled.
if (ma.isMaskedArray(merged_data) and
ma.count_masked(merged_data) == 0):
Expand Down
Loading