-
Notifications
You must be signed in to change notification settings - Fork 13
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Potential bottlenecks for large input datasets #171
Comments
@spencerahill this is really good motivating case for looking deeper into how we integrate with dask! I've done a little bit of further investigation. I have found that we are also implicitly loading data into memory when we call Offline, I've experimented with doing that and removing all the explicit calls to load you listed above and was successful in having dask arrays propagated all the way through to the Does that solve the problem here? Not completely: I think this is where this discussion connects back with #169 (comment). When doing this, and using either a Lastly, if we do this we'd also want to either come up with some heuristics for automatically chunking Datasets we load in for best performance, or perhaps enable the user to set the default chunk arrangement in a DataLoader constructor. Those are my (somewhat rambling) thoughts; I'm happy to talk about this more offline if it would help. |
Thanks for digging into this!
And also a few lines above via times.numpy_datetime_workaround_encode_cf, although in that case if I'm understanding correctly we do pull out the time array already, so it's not called on the full Dataset.
Sorry, how does that differ from our current use of
Nice! Do you recall the nature of the problems before though that led us to adding the load() calls? Wondering if those could potentially resurface.
Exactly. Our joblib-based step, though very exciting, doesn't change this one-task-per-Calc paradigm. And that does seem like a major undertaking. I've got some vague ideas that I'll try to flesh out. But this doesn't preclude proceeding with #169 nor cleaning up these load() calls to the extent possible. |
That particular step uses
This is the crude re-write the sidesteps loading the Dataset into memory in def _prep_time_data(ds):
"""Prepare time coord. information in Dataset for use in aospy.
1. Edit units attribute of time variable if it contains
a Timestamp invalid date
2. If the Dataset contains a time bounds coordinate, add
attributes representing the true beginning and end dates of
the time interval used to construct the Dataset
3. Decode the times into np.datetime64 objects for time
indexing
Parameters
----------
ds : Dataset
Pre-processed Dataset with time coordinate renamed to
internal_names.TIME_STR
Returns
-------
Dataset, int
The processed Dataset and minimum year in the loaded data
"""
ds = times.ensure_time_as_dim(ds)
ds, min_year = times.numpy_datetime_workaround_encode_cf(ds)
if internal_names.TIME_BOUNDS_STR in ds:
ds = times.ensure_time_avg_has_cf_metadata(ds)
else:
logging.warning("dt array not found. Assuming equally spaced "
"values in time, even though this may not be "
"the case")
ds = times.add_uniform_time_weights(ds)
ds_temp = xr.Dataset()
for v in internal_names.ALL_TIME_VARS:
if v in ds:
ds_temp[v] = ds[v]
ds_temp = xr.decode_cf(ds_temp, decode_times=True,
decode_coords=False, mask_and_scale=False)
for v in internal_names.ALL_TIME_VARS:
if v in ds:
ds[v] = ds_temp[v]
return ds, min_year
The motivation for eagerly loading data into memory was originally rooted in the concern that data might not be chunked well by default for performance. As long as we come up with a reasonable solution to that issue, I see no problem in using dask by default (rather than numpy) in most calculations. That being said, if one has more complicated operations hidden in a function defined in one's object library (for example, things in infinite-diff) that are not dask-array compatible, one will run into issues. So perhaps we should support both pathways with a flag in |
agreed
No need to shame me for infinite-diff's shortcomings 😉 . Yes I think it would be prudent to have a def _local_ts(self, *data):
"""Perform the computation at each gridpoint and time index."""
if self._do_force_load: # or however we ultimately choose to encode that option
[d.load() for d in data]
arr = self.function(*data)
... |
Also thanks for clarifying the distinction w/ numpy_datetime_workaround_encode_cf...that makes sense now. |
Sorry, I didn't mean to single it out! I'm sure there are plenty of other examples; this was just the first that came to mind that I had a link to.
Nice! Yes, it does seem like something as simple as that could work. When I get a chance I'll give it a try.
Upon a little more reflection on this, what do you think about making this an attribute of a Var object that one could set in its constructor instead? For example: dask_incompatible_var = Var(
name='dask_incompatible',
...
func=calcs.dask_incompatible_func,
dask_compatible=False
) I think we could make the default value |
Haha sorry, I was totally kidding!
Great call. The point of this is to be Var-specific anyways. Maybe it's good to keep the |
See pydata/xarray#1372, which implies that this eager loading is a bug that will eventually be fixed in xarray |
A colleague wants to use aospy on 0.1 degree ocean data; see /archive/hmz/CM2.6/ocean/ on GFDL's filesystem. This is GFDL 'av' data organized as annual means, one year per file, for 200 years: ocean.0001.ann.nc, ..., ocean.0200.ann.nc. Each file is ~14GB, and in total it's ~3.1TB
While we generally use xarray.open_mfdataset and hence lazily-load, there are three places where data explicitly gets loaded into memory via
load()
:chunks={}
option that would make it lazily load.In this particular case, the grid attributes can come from the smaller
/archive/hmz/CM2.6/ocean.static.nc
file, but that itself isn't trivially small, at 371 MB.@spencerkclark, do you recall the nature of the bugs when we didn't force loading? Any thoughts more generally about making all of the above logic more performant with large datasets? Ideally we never call load() on a full dataset; rather we take individual variables, reduce them as much as possible (in space and time), and then load.
The text was updated successfully, but these errors were encountered: