-
Notifications
You must be signed in to change notification settings - Fork 284
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Data first #2392
Data first #2392
Changes from 38 commits
667a191
dd593e8
d519ea3
02ec854
eee7580
beb5189
10704cf
a7c79f3
ad1ad55
aa0d9d7
966af7e
59dcb80
6c08ae4
4e5ec49
235ef13
3567369
d182e72
1755ede
18795ad
4f21dc7
96d60ae
5716327
e9f444f
cb9e675
2bcd16b
7dee954
64794b4
582625e
d40a4e6
9b2dce3
1e7b6cb
26a1028
426d48f
6488aba
0bcb822
fda4732
af9c942
b4ab240
3586a28
ca61115
02ff5ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
Iris Dask Interface | ||
******************* | ||
|
||
Iris uses dask (http://dask.pydata.org) to manage lazy data interfaces and processing graphs. The key principles which define this interface are: | ||
|
||
* A call to `cube.data` will always load all of the data. | ||
* Once this has happened: | ||
* `cube.data` is a mutable numpy masked array or ndarray; | ||
* `cube._numpy_array` is a private numpy masked array, accessible via `cube.data`, which may strip off the mask and return a reference to the bare ndarray. | ||
* `cube.data` may be used to set the data, this accepts: | ||
* a numpy array (including masked array), which is assigned to `cube._numpy_array`; | ||
* a dask array, which is assigned to `cube._dask_array` an `cube._numpy_array` is set to None. | ||
* `cube._dask_array` may be None, otherwise it is expected to be a dask graph: | ||
* this may wrap a proxy to a file collection; | ||
* this may wrap the numpy array in `cube._numpy_array`. | ||
* All dask graphs wrap array-like object where missing data is represented by `nan`: | ||
* masked arrays derived from these arrays shall create their mask using the nan location; | ||
* where dask wrapped `int` arrays require masks, these will first be cast to `float`. | ||
* In order to support this mask conversion, cube's have a `fill_value` as part of their metadata, which may be None. | ||
* Array copying is kept to an absolute minimum: | ||
* array references should always be passed, not new arrays created, unless an explicit copy operation is requested. | ||
* To test for the presence of a dask array of any sort, we use: | ||
* `iris._lazy_data.is_lazy_data` which is implemented as `hasattr(data, 'compute')`. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,3 +38,4 @@ | |
tests.rst | ||
deprecations.rst | ||
release.rst | ||
dask_interface.rst |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,8 +36,8 @@ | |
import iris.cube | ||
import iris.coords | ||
import iris.exceptions | ||
from iris._lazy_data import is_lazy_data, as_concrete_data, as_lazy_data | ||
import iris.util | ||
from iris._lazy_data import is_lazy_data, array_masked_to_nans | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marqh Minor point but |
||
|
||
|
||
# | ||
|
@@ -1231,14 +1231,18 @@ def merge(self, unique=True): | |
if is_lazy_data(data): | ||
all_have_data = False | ||
else: | ||
data = as_lazy_data(data) | ||
if isinstance(data, np.ma.MaskedArray): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marqh This should be |
||
if np.ma.is_masked(data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
data = array_masked_to_nans(data) | ||
data = data.data | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marqh Okay, I've been waiting to see this kinda pattern appear in the code base, and here it is ... On data = biggus.NumpyArrayAdapter(data) which takes the concrete data and wraps it up as a biggus thing, so that all data is on an equal footing i.e. a biggus array adapter of some form. For convenience, we could opt to do something similar, when we want a concrete array to be a lazy thing and ensure that masking is deal with appropriately. This could be achieved by having a new So, for above we would have something simply along the lines of data = da.from_array(DataProxy(data), chunks=data.shape) ... and when (if) dask supports masked arrays, this line would become (something like) data = da.from_array(data, chunks=data.shape) That's pretty attractive to me. It compartmentalizes the data munging to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @pp-mo and i had a similar conversation about this pattern. I think there is a neater abstraction to be had. please may we continue with this slightly awkward implementation whilst this PR gets test hardy, then implement an improved pattern as a follow up? |
||
data = da.from_array(data, chunks=data.shape) | ||
stack[nd_index] = data | ||
|
||
merged_data = _multidim_daskstack(stack) | ||
if all_have_data: | ||
# All inputs were concrete, so turn the result back into a | ||
# normal array. | ||
merged_data = as_concrete_data(merged_data) | ||
merged_data = merged_data.compute() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @marqh I'd suggest that whenever we use We'll need to think (perhaps later once the dust settles) about how we pass options down into dask to control the scheduler of choice, see here, which may be relevant for those users targeting clusters and opting to use For now, I'd suggest: import multiprocessing
...
merged_data = merged_data.compute(num_workers=multiprocessing.cpu_count() - 1) |
||
# Unmask the array only if it is filled. | ||
if (ma.isMaskedArray(merged_data) and | ||
ma.count_masked(merged_data) == 0): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@marqh Of course if you don't pass in a masked array, then this will raise an exception ...
... every time this function is called (pretty) much, it's always within an
if
statement to check to see if thearray
is a masked array.Why don't we refactor this, so that any array can be passed in. If the
array
is not aMaskedArray
then just pass it through. If it's aMaskedArray
then do the NaN-ifycation, as is.That would save on the use of
if isinstance(array, ma.MaskedArray) .... array_masked_to_nans(...)
pattern.If you buy into that, then I'd simply suggest calling this function
array_to_nans
... but that's minor to my previous points.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There have been multiple requests to factor out this logic, but care is required as it is used in different ways in different contexts
that is not the case
this function is called 3 times so far, one of which is with a numpy array, 2 with masks.
We will want more numpy only uses, I am confident. In these cases the mask test has to be passed in explicitly.
plausible approaches:
mask
the type must be a masked arrayarray_masked_to_nans(mymaskedarray, mymaskedarray.mask)