Skip to content

Commit

Permalink
Refactor concat to use merge for non-concatenated variables (#3239)
Browse files Browse the repository at this point in the history
* Add compat = 'override' and data_vars/coords='sensible'

* concat tests.

* Update docstring.

* Begin merge, combine.

* Merge non concatenated variables.

* Fix tests.

* Fix tests 2

* Fix test 3

* Cleanup: reduce number of times we loop over datasets.

* unique_variable does minimum number of loads: fixes dask test

* docstrings for compat='override'

* concat compat docstring.

* remove the sensible option.

* reduce silly changes.

* fix groupby order test.

* cleanup: var names + remove one loop through datasets.

* Add whats-new entry.

* Add note in io.rst

* fix warning.

* Update netcdf multi-file dataset section in io.rst.

* Update mfdataset in dask.rst.

* simplify parse_datasets.

* Avoid using merge_variables. unique_variable instead.

* small stuff.

* Update docs.

* minor fix.

* minor fix.

* lint.

* Better error message.

* rename to shorter variable names.

* Cleanup: fillna preserves attrs now.

* Look  for concat dim in data_vars also.

* Update xarray/core/merge.py

Co-Authored-By: Stephan Hoyer <shoyer@google.com>

* avoid unnecessary computes.

* minor cleanups.
  • Loading branch information
dcherian authored Sep 16, 2019
1 parent b65ce86 commit 756c941
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 222 deletions.
7 changes: 4 additions & 3 deletions doc/dask.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ entirely equivalent to opening a dataset using ``open_dataset`` and then
chunking the data using the ``chunk`` method, e.g.,
``xr.open_dataset('example-data.nc').chunk({'time': 10})``.

To open multiple files simultaneously, use :py:func:`~xarray.open_mfdataset`::
To open multiple files simultaneously in parallel using Dask delayed,
use :py:func:`~xarray.open_mfdataset`::

xr.open_mfdataset('my/files/*.nc')
xr.open_mfdataset('my/files/*.nc', parallel=True)

This function will automatically concatenate and merge dataset into one in
the simple cases that it understands (see :py:func:`~xarray.auto_combine`
for the full disclaimer). By default, ``open_mfdataset`` will chunk each
for the full disclaimer). By default, :py:func:`~xarray.open_mfdataset` will chunk each
netCDF file into a single Dask array; again, supply the ``chunks`` argument to
control the size of the resulting Dask arrays. In more complex cases, you can
open each file individually using ``open_dataset`` and merge the result, as
Expand Down
244 changes: 147 additions & 97 deletions doc/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ netCDF
The recommended way to store xarray data structures is `netCDF`__, which
is a binary file format for self-described datasets that originated
in the geosciences. xarray is based on the netCDF data model, so netCDF files
on disk directly correspond to :py:class:`~xarray.Dataset` objects.
on disk directly correspond to :py:class:`~xarray.Dataset` objects (more accurately,
a group in a netCDF file directly corresponds to a to :py:class:`~xarray.Dataset` object.
See :ref:`io.netcdf_groups` for more.)

NetCDF is supported on almost all platforms, and parsers exist
for the vast majority of scientific programming languages. Recent versions of
Expand All @@ -121,7 +123,7 @@ read/write netCDF V4 files and use the compression options described below).
__ https://github.com/Unidata/netcdf4-python

We can save a Dataset to disk using the
:py:attr:`Dataset.to_netcdf <xarray.Dataset.to_netcdf>` method:
:py:meth:`~Dataset.to_netcdf` method:

.. ipython:: python
Expand All @@ -147,19 +149,6 @@ convert the ``DataArray`` to a ``Dataset`` before saving, and then convert back
when loading, ensuring that the ``DataArray`` that is loaded is always exactly
the same as the one that was saved.

NetCDF groups are not supported as part of the
:py:class:`~xarray.Dataset` data model. Instead, groups can be loaded
individually as Dataset objects.
To do so, pass a ``group`` keyword argument to the
``open_dataset`` function. The group can be specified as a path-like
string, e.g., to access subgroup 'bar' within group 'foo' pass
'/foo/bar' as the ``group`` argument.
In a similar way, the ``group`` keyword argument can be given to the
:py:meth:`~xarray.Dataset.to_netcdf` method to write to a group
in a netCDF file.
When writing multiple groups in one file, pass ``mode='a'`` to ``to_netcdf``
to ensure that each call does not delete the file.

Data is always loaded lazily from netCDF files. You can manipulate, slice and subset
Dataset and DataArray objects, and no array values are loaded into memory until
you try to perform some sort of actual computation. For an example of how these
Expand Down Expand Up @@ -195,6 +184,24 @@ It is possible to append or overwrite netCDF variables using the ``mode='a'``
argument. When using this option, all variables in the dataset will be written
to the original netCDF file, regardless if they exist in the original dataset.


.. _io.netcdf_groups:

Groups
~~~~~~

NetCDF groups are not supported as part of the :py:class:`~xarray.Dataset` data model.
Instead, groups can be loaded individually as Dataset objects.
To do so, pass a ``group`` keyword argument to the
:py:func:`~xarray.open_dataset` function. The group can be specified as a path-like
string, e.g., to access subgroup ``'bar'`` within group ``'foo'`` pass
``'/foo/bar'`` as the ``group`` argument.
In a similar way, the ``group`` keyword argument can be given to the
:py:meth:`~xarray.Dataset.to_netcdf` method to write to a group
in a netCDF file.
When writing multiple groups in one file, pass ``mode='a'`` to
:py:meth:`~xarray.Dataset.to_netcdf` to ensure that each call does not delete the file.

.. _io.encoding:

Reading encoded data
Expand All @@ -203,7 +210,7 @@ Reading encoded data
NetCDF files follow some conventions for encoding datetime arrays (as numbers
with a "units" attribute) and for packing and unpacking data (as
described by the "scale_factor" and "add_offset" attributes). If the argument
``decode_cf=True`` (default) is given to ``open_dataset``, xarray will attempt
``decode_cf=True`` (default) is given to :py:func:`~xarray.open_dataset`, xarray will attempt
to automatically decode the values in the netCDF objects according to
`CF conventions`_. Sometimes this will fail, for example, if a variable
has an invalid "units" or "calendar" attribute. For these cases, you can
Expand Down Expand Up @@ -247,6 +254,130 @@ will remove encoding information.
import os
os.remove('saved_on_disk.nc')
.. _combining multiple files:

Reading multi-file datasets
...........................

NetCDF files are often encountered in collections, e.g., with different files
corresponding to different model runs or one file per timestamp.
xarray can straightforwardly combine such files into a single Dataset by making use of
:py:func:`~xarray.concat`, :py:func:`~xarray.merge`, :py:func:`~xarray.combine_nested` and
:py:func:`~xarray.combine_by_coords`. For details on the difference between these
functions see :ref:`combining data`.

Xarray includes support for manipulating datasets that don't fit into memory
with dask_. If you have dask installed, you can open multiple files
simultaneously in parallel using :py:func:`~xarray.open_mfdataset`::

xr.open_mfdataset('my/files/*.nc', parallel=True)

This function automatically concatenates and merges multiple files into a
single xarray dataset.
It is the recommended way to open multiple files with xarray.
For more details on parallel reading, see :ref:`combining.multi`, :ref:`dask.io` and a
`blog post`_ by Stephan Hoyer.
:py:func:`~xarray.open_mfdataset` takes many kwargs that allow you to
control its behaviour (for e.g. ``parallel``, ``combine``, ``compat``, ``join``, ``concat_dim``).
See its docstring for more details.


.. note::

A common use-case involves a dataset distributed across a large number of files with
each file containing a large number of variables. Commonly a few of these variables
need to be concatenated along a dimension (say ``"time"``), while the rest are equal
across the datasets (ignoring floating point differences). The following command
with suitable modifications (such as ``parallel=True``) works well with such datasets::

xr.open_mfdataset('my/files/*.nc', concat_dim="time",
data_vars='minimal', coords='minimal', compat='override')

This command concatenates variables along the ``"time"`` dimension, but only those that
already contain the ``"time"`` dimension (``data_vars='minimal', coords='minimal'``).
Variables that lack the ``"time"`` dimension are taken from the first dataset
(``compat='override'``).


.. _dask: http://dask.pydata.org
.. _blog post: http://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/

Sometimes multi-file datasets are not conveniently organized for easy use of :py:func:`~xarray.open_mfdataset`.
One can use the ``preprocess`` argument to provide a function that takes a dataset
and returns a modified Dataset.
:py:func:`~xarray.open_mfdataset` will call ``preprocess`` on every dataset
(corresponding to each file) prior to combining them.


If :py:func:`~xarray.open_mfdataset` does not meet your needs, other approaches are possible.
The general pattern for parallel reading of multiple files
using dask, modifying those datasets and then combining into a single ``Dataset`` is::

def modify(ds):
# modify ds here
return ds


# this is basically what open_mfdataset does
open_kwargs = dict(decode_cf=True, decode_times=False)
open_tasks = [dask.delayed(xr.open_dataset)(f, **open_kwargs) for f in file_names]
tasks = [dask.delayed(modify)(task) for task in open_tasks]
datasets = dask.compute(tasks) # get a list of xarray.Datasets
combined = xr.combine_nested(datasets) # or some combination of concat, merge


As an example, here's how we could approximate ``MFDataset`` from the netCDF4
library::

from glob import glob
import xarray as xr

def read_netcdfs(files, dim):
# glob expands paths with * to a list of files, like the unix shell
paths = sorted(glob(files))
datasets = [xr.open_dataset(p) for p in paths]
combined = xr.concat(dataset, dim)
return combined

combined = read_netcdfs('/all/my/files/*.nc', dim='time')

This function will work in many cases, but it's not very robust. First, it
never closes files, which means it will fail one you need to load more than
a few thousands file. Second, it assumes that you want all the data from each
file and that it can all fit into memory. In many situations, you only need
a small subset or an aggregated summary of the data from each file.

Here's a slightly more sophisticated example of how to remedy these
deficiencies::

def read_netcdfs(files, dim, transform_func=None):
def process_one_path(path):
# use a context manager, to ensure the file gets closed after use
with xr.open_dataset(path) as ds:
# transform_func should do some sort of selection or
# aggregation
if transform_func is not None:
ds = transform_func(ds)
# load all data from the transformed dataset, to ensure we can
# use it after closing each original file
ds.load()
return ds

paths = sorted(glob(files))
datasets = [process_one_path(p) for p in paths]
combined = xr.concat(datasets, dim)
return combined

# here we suppose we only care about the combined mean of each file;
# you might also use indexing operations like .sel to subset datasets
combined = read_netcdfs('/all/my/files/*.nc', dim='time',
transform_func=lambda ds: ds.mean())

This pattern works well and is very robust. We've used similar code to process
tens of thousands of files constituting 100s of GB of data.


.. _io.netcdf.writing_encoded:

Writing encoded data
Expand Down Expand Up @@ -817,84 +948,3 @@ For CSV files, one might also consider `xarray_extras`_.
.. _xarray_extras: https://xarray-extras.readthedocs.io/en/latest/api/csv.html

.. _IO tools: http://pandas.pydata.org/pandas-docs/stable/io.html


.. _combining multiple files:


Combining multiple files
------------------------

NetCDF files are often encountered in collections, e.g., with different files
corresponding to different model runs. xarray can straightforwardly combine such
files into a single Dataset by making use of :py:func:`~xarray.concat`,
:py:func:`~xarray.merge`, :py:func:`~xarray.combine_nested` and
:py:func:`~xarray.combine_by_coords`. For details on the difference between these
functions see :ref:`combining data`.

.. note::

Xarray includes support for manipulating datasets that don't fit into memory
with dask_. If you have dask installed, you can open multiple files
simultaneously using :py:func:`~xarray.open_mfdataset`::

xr.open_mfdataset('my/files/*.nc')

This function automatically concatenates and merges multiple files into a
single xarray dataset.
It is the recommended way to open multiple files with xarray.
For more details, see :ref:`combining.multi`, :ref:`dask.io` and a
`blog post`_ by Stephan Hoyer.

.. _dask: http://dask.pydata.org
.. _blog post: http://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/

For example, here's how we could approximate ``MFDataset`` from the netCDF4
library::

from glob import glob
import xarray as xr

def read_netcdfs(files, dim):
# glob expands paths with * to a list of files, like the unix shell
paths = sorted(glob(files))
datasets = [xr.open_dataset(p) for p in paths]
combined = xr.concat(dataset, dim)
return combined

combined = read_netcdfs('/all/my/files/*.nc', dim='time')

This function will work in many cases, but it's not very robust. First, it
never closes files, which means it will fail one you need to load more than
a few thousands file. Second, it assumes that you want all the data from each
file and that it can all fit into memory. In many situations, you only need
a small subset or an aggregated summary of the data from each file.

Here's a slightly more sophisticated example of how to remedy these
deficiencies::

def read_netcdfs(files, dim, transform_func=None):
def process_one_path(path):
# use a context manager, to ensure the file gets closed after use
with xr.open_dataset(path) as ds:
# transform_func should do some sort of selection or
# aggregation
if transform_func is not None:
ds = transform_func(ds)
# load all data from the transformed dataset, to ensure we can
# use it after closing each original file
ds.load()
return ds

paths = sorted(glob(files))
datasets = [process_one_path(p) for p in paths]
combined = xr.concat(datasets, dim)
return combined

# here we suppose we only care about the combined mean of each file;
# you might also use indexing operations like .sel to subset datasets
combined = read_netcdfs('/all/my/files/*.nc', dim='time',
transform_func=lambda ds: ds.mean())

This pattern works well and is very robust. We've used similar code to process
tens of thousands of files constituting 100s of GB of data.
29 changes: 24 additions & 5 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ New functions/methods
By `Deepak Cherian <https://github.com/dcherian>`_ and `David Mertz
<http://github.com/DavidMertz>`_.

- Dataset plotting API for visualizing dependencies between two `DataArray`s!
- Dataset plotting API for visualizing dependencies between two DataArrays!
Currently only :py:meth:`Dataset.plot.scatter` is implemented.
By `Yohai Bar Sinai <https://github.com/yohai>`_ and `Deepak Cherian <https://github.com/dcherian>`_

Expand All @@ -103,11 +103,30 @@ New functions/methods
Enhancements
~~~~~~~~~~~~

- Added ``join='override'``. This only checks that index sizes are equal among objects and skips
checking indexes for equality. By `Deepak Cherian <https://github.com/dcherian>`_.
- Multiple enhancements to :py:func:`~xarray.concat` and :py:func:`~xarray.open_mfdataset`.

- :py:func:`~xarray.concat` and :py:func:`~xarray.open_mfdataset` now support the ``join`` kwarg.
It is passed down to :py:func:`~xarray.align`. By `Deepak Cherian <https://github.com/dcherian>`_.
- Added ``compat='override'``. When merging, this option picks the variable from the first dataset
and skips all comparisons.

- Added ``join='override'``. When aligning, this only checks that index sizes are equal among objects
and skips checking indexes for equality.

- :py:func:`~xarray.concat` and :py:func:`~xarray.open_mfdataset` now support the ``join`` kwarg.
It is passed down to :py:func:`~xarray.align`.

- :py:func:`~xarray.concat` now calls :py:func:`~xarray.merge` on variables that are not concatenated
(i.e. variables without ``concat_dim`` when ``data_vars`` or ``coords`` are ``"minimal"``).
:py:func:`~xarray.concat` passes its new ``compat`` kwarg down to :py:func:`~xarray.merge`.
(:issue:`2064`)

Users can avoid a common bottleneck when using :py:func:`~xarray.open_mfdataset` on a large number of
files with variables that are known to be aligned and some of which need not be concatenated.
Slow equality comparisons can now be avoided, for e.g.::

data = xr.open_mfdataset(files, concat_dim='time', data_vars='minimal',
coords='minimal', compat='override', join='override')

By `Deepak Cherian <https://github.com/dcherian>`_:

- In :py:meth:`~xarray.Dataset.to_zarr`, passing ``mode`` is not mandatory if
``append_dim`` is set, as it will automatically be set to ``'a'`` internally.
Expand Down
3 changes: 2 additions & 1 deletion xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ def open_mfdataset(
`xarray.auto_combine` is used, but in the future this behavior will
switch to use `xarray.combine_by_coords` by default.
compat : {'identical', 'equals', 'broadcast_equals',
'no_conflicts'}, optional
'no_conflicts', 'override'}, optional
String indicating how to compare variables of the same name for
potential conflicts when merging:
* 'broadcast_equals': all values must be equal when variables are
Expand All @@ -772,6 +772,7 @@ def open_mfdataset(
* 'no_conflicts': only values which are not null in both datasets
must be equal. The returned dataset then contains the combination
of all non-null values.
* 'override': skip comparing and pick variable from first dataset
preprocess : callable, optional
If provided, call this function on each dataset prior to concatenation.
You can find the file-name from which each dataset was loaded in
Expand Down
Loading

0 comments on commit 756c941

Please sign in to comment.