diff --git a/doc/_static/dask-array.svg b/doc/_static/dask-array.svg
new file mode 100644
index 00000000000..bdf33c0ac70
--- /dev/null
+++ b/doc/_static/dask-array.svg
@@ -0,0 +1,349 @@
+
+
diff --git a/doc/_static/dask_array.png b/doc/_static/dask_array.png
deleted file mode 100644
index 7ddb6e400ef..00000000000
Binary files a/doc/_static/dask_array.png and /dev/null differ
diff --git a/doc/user-guide/dask.rst b/doc/user-guide/dask.rst
index bc4aaa61c68..d7fb7cbd41e 100644
--- a/doc/user-guide/dask.rst
+++ b/doc/user-guide/dask.rst
@@ -2,325 +2,243 @@
.. _dask:
-Parallel computing with Dask
+Parallel Computing with Dask
============================
-Xarray integrates with `Dask `__ to support parallel
-computations and streaming computation on datasets that don't fit into memory.
-Currently, Dask is an entirely optional feature for xarray. However, the
-benefits of using Dask are sufficiently strong that Dask may become a required
-dependency in a future version of xarray.
+Xarray integrates with `Dask `__, a general purpose library for parallel computing, to handle larger-than-memory computations.
-For a full example of how to use xarray's Dask integration, read the
-`blog post introducing xarray and Dask`_. More up-to-date examples
-may be found at the `Pangeo project's gallery `_
-and at the `Dask examples website `_.
+If you’ve been using Xarray to read in large datasets or split up data across a number of files, you may already be using Dask:
-.. _blog post introducing xarray and Dask: https://stephanhoyer.com/2015/06/11/xray-dask-out-of-core-labeled-arrays/
+.. code-block:: python
+
+ import xarray as xr
+
+ ds = xr.open_zarr("/path/to/data.zarr")
+ timeseries = ds["temp"].mean(dim=["x", "y"]).compute() # Compute result
+
+Using Dask with Xarray feels similar to working with NumPy arrays, but on much larger datasets. The Dask integration is transparent, so you usually don’t need to manage the parallelism directly; Xarray and Dask handle these aspects behind the scenes. This makes it easy to write code that scales from small, in-memory datasets on a single machine to large datasets that are distributed across a cluster, with minimal code changes.
+
+Examples
+--------
+
+If you're new to using Xarray with Dask, we recommend the `Xarray + Dask Tutorial `_.
+
+Here are some examples for using Xarray with Dask at scale:
-What is a Dask array?
----------------------
+- `Zonal averaging with the NOAA National Water Model `_
+- `CMIP6 Precipitation Frequency Analysis `_
+- `Using Dask + Cloud Optimized GeoTIFFs `_
-.. image:: ../_static/dask_array.png
- :width: 40 %
+Find more examples at the `Project Pythia cookbook gallery `_.
+
+
+Using Dask with Xarray
+----------------------
+
+.. image:: ../_static/dask-array.svg
+ :width: 50 %
:align: right
:alt: A Dask array
-Dask divides arrays into many small pieces, called *chunks*, each of which is
-presumed to be small enough to fit into memory.
+Dask divides arrays into smaller parts called chunks. These chunks are small, manageable pieces of the larger dataset, that Dask is able to process in parallel (see the `Dask Array docs on chunks `_). Commonly chunks are set when reading data, but you can also set the chunksize manually at any point in your workflow using :py:meth:`Dataset.chunk` and :py:meth:`DataArray.chunk`. See :ref:`dask.chunks` for more.
-Unlike NumPy, which has eager evaluation, operations on Dask arrays are lazy.
-Operations queue up a series of tasks mapped over blocks, and no computation is
-performed until you actually ask values to be computed (e.g., to print results
-to your screen or write to disk). At that point, data is loaded into memory
-and computation proceeds in a streaming fashion, block-by-block.
+Xarray operations on Dask-backed arrays are lazy. This means computations are not executed immediately, but are instead queued up as tasks in a Dask graph.
-The actual computation is controlled by a multi-processing or thread pool,
-which allows Dask to take full advantage of multiple processors available on
-most modern computers.
+When a result is requested (e.g., for plotting, writing to disk, or explicitly computing), Dask executes the task graph. The computations are carried out in parallel, with each chunk being processed independently. This parallel execution is key to handling large datasets efficiently.
-For more details, read the `Dask documentation `__.
-Note that xarray only makes use of ``dask.array`` and ``dask.delayed``.
+Nearly all Xarray methods have been extended to work automatically with Dask Arrays. This includes things like indexing, concatenating, rechunking, grouped operations, etc. Common operations are covered in more detail in each of the sections below.
.. _dask.io:
Reading and writing data
-------------------------
+~~~~~~~~~~~~~~~~~~~~~~~~
-The usual way to create a ``Dataset`` filled with Dask arrays is to load the
-data from a netCDF file or files. You can do this by supplying a ``chunks``
-argument to :py:func:`~xarray.open_dataset` or using the
-:py:func:`~xarray.open_mfdataset` function.
+When reading data, Dask divides your dataset into smaller chunks. You can specify the size of chunks with the ``chunks`` argument. Specifying ``chunks="auto"`` will set the dask chunk sizes to be a multiple of the on-disk chunk sizes. This can be a good idea, but usually the appropriate dask chunk size will depend on your workflow.
-.. ipython:: python
- :suppress:
+.. tab:: Zarr
- import os
+ The `Zarr `_ format is ideal for working with large datasets. Each chunk is stored in a separate file, allowing parallel reading and writing with Dask. You can also use Zarr to read/write directly from cloud storage buckets (see the `Dask documentation on connecting to remote data `__)
- import numpy as np
- import pandas as pd
- import xarray as xr
+ When you open a Zarr dataset with :py:func:`~xarray.open_zarr`, it is loaded as a Dask array by default (if Dask is installed)::
- np.random.seed(123456)
- np.set_printoptions(precision=3, linewidth=100, threshold=100, edgeitems=3)
+ ds = xr.open_zarr("path/to/directory.zarr")
- ds = xr.Dataset(
- {
- "temperature": (
- ("time", "latitude", "longitude"),
- np.random.randn(30, 180, 180),
- ),
- "time": pd.date_range("2015-01-01", periods=30),
- "longitude": np.arange(180),
- "latitude": np.arange(89.5, -90.5, -1),
- }
- )
- ds.to_netcdf("example-data.nc")
+ See :ref:`io.zarr` for more details.
-.. ipython:: python
+.. tab:: NetCDF
- ds = xr.open_dataset("example-data.nc", chunks={"time": 10})
- ds
+ Open a single netCDF file with :py:func:`~xarray.open_dataset` and supplying a ``chunks`` argument::
-In this example ``latitude`` and ``longitude`` do not appear in the ``chunks``
-dict, so only one chunk will be used along those dimensions. It is also
-entirely equivalent to opening a dataset using :py:func:`~xarray.open_dataset`
-and then chunking the data using the ``chunk`` method, e.g.,
-``xr.open_dataset('example-data.nc').chunk({'time': 10})``.
+ ds = xr.open_dataset("example-data.nc", chunks={"time": 10})
-To open multiple files simultaneously in parallel using Dask delayed,
-use :py:func:`~xarray.open_mfdataset`::
+ Or open multiple files in parallel with py:func:`~xarray.open_mfdataset`::
- xr.open_mfdataset('my/files/*.nc', parallel=True)
+ xr.open_mfdataset('my/files/*.nc', parallel=True)
-This function will automatically concatenate and merge datasets into one in
-the simple cases that it understands (see :py:func:`~xarray.combine_by_coords`
-for the full disclaimer). By default, :py:func:`~xarray.open_mfdataset` will chunk each
-netCDF file into a single Dask array; again, supply the ``chunks`` argument to
-control the size of the resulting Dask arrays. In more complex cases, you can
-open each file individually using :py:func:`~xarray.open_dataset` and merge the result, as
-described in :ref:`combining data`. Passing the keyword argument ``parallel=True`` to
-:py:func:`~xarray.open_mfdataset` will speed up the reading of large multi-file datasets by
-executing those read tasks in parallel using ``dask.delayed``.
+ .. tip::
-.. warning::
+ When reading in many netCDF files with py:func:`~xarray.open_mfdataset`, using ``engine="h5netcdf"`` can
+ be faster than the default which uses the netCDF4 package.
- :py:func:`~xarray.open_mfdataset` called without ``chunks`` argument will return
- dask arrays with chunk sizes equal to the individual files. Re-chunking
- the dataset after creation with ``ds.chunk()`` will lead to an ineffective use of
- memory and is not recommended.
+ Save larger-than-memory netCDF files::
-You'll notice that printing a dataset still shows a preview of array values,
-even if they are actually Dask arrays. We can do this quickly with Dask because
-we only need to compute the first few values (typically from the first block).
-To reveal the true nature of an array, print a DataArray:
+ ds.to_netcdf("my-big-file.nc")
-.. ipython:: python
+ Or set ``compute=False`` to return a dask.delayed object that can be computed later::
- ds.temperature
+ delayed_write = ds.to_netcdf("my-big-file.nc", compute=False)
+ delayed_write.compute()
-Once you've manipulated a Dask array, you can still write a dataset too big to
-fit into memory back to disk by using :py:meth:`~xarray.Dataset.to_netcdf` in the
-usual way.
+ .. note::
-.. ipython:: python
+ When using Dask’s distributed scheduler to write NETCDF4 files, it may be necessary to set the environment variable ``HDF5_USE_FILE_LOCKING=FALSE`` to avoid competing locks within the HDF5 SWMR file locking scheme. Note that writing netCDF files with Dask’s distributed scheduler is only supported for the netcdf4 backend.
- ds.to_netcdf("manipulated-example-data.nc")
+ See :ref:`io.netcdf` for more details.
-By setting the ``compute`` argument to ``False``, :py:meth:`~xarray.Dataset.to_netcdf`
-will return a ``dask.delayed`` object that can be computed later.
+.. tab:: HDF5
-.. ipython:: python
+ Open HDF5 files with :py:func:`~xarray.open_dataset`::
- from dask.diagnostics import ProgressBar
+ xr.open_dataset("/path/to/my/file.h5", chunks='auto')
- # or distributed.progress when using the distributed scheduler
- delayed_obj = ds.to_netcdf("manipulated-example-data.nc", compute=False)
- with ProgressBar():
- results = delayed_obj.compute()
+ See :ref:`io.hdf5` for more details.
-.. ipython:: python
- :suppress:
+.. tab:: GeoTIFF
- os.remove("manipulated-example-data.nc") # Was not opened.
+ Open large geoTIFF files with rioxarray::
-.. note::
+ xds = rioxarray.open_rasterio("my-satellite-image.tif", chunks='auto')
- When using Dask's distributed scheduler to write NETCDF4 files,
- it may be necessary to set the environment variable `HDF5_USE_FILE_LOCKING=FALSE`
- to avoid competing locks within the HDF5 SWMR file locking scheme. Note that
- writing netCDF files with Dask's distributed scheduler is only supported for
- the `netcdf4` backend.
+ See :ref:`io.rasterio` for more details.
-A dataset can also be converted to a Dask DataFrame using :py:meth:`~xarray.Dataset.to_dask_dataframe`.
-.. ipython:: python
- :okwarning:
+Loading Dask Arrays
+~~~~~~~~~~~~~~~~~~~
- df = ds.to_dask_dataframe()
- df
+.. ipython:: python
+ :suppress:
-Dask DataFrames do not support multi-indexes so the coordinate variables from the dataset are included as columns in the Dask DataFrame.
+ import os
+ import numpy as np
+ import pandas as pd
+ import xarray as xr
-Using Dask with xarray
-----------------------
+ np.random.seed(123456)
+ np.set_printoptions(precision=3, linewidth=100, threshold=100, edgeitems=3)
-Nearly all existing xarray methods (including those for indexing, computation,
-concatenating and grouped operations) have been extended to work automatically
-with Dask arrays. When you load data as a Dask array in an xarray data
-structure, almost all xarray operations will keep it as a Dask array; when this
-is not possible, they will raise an exception rather than unexpectedly loading
-data into memory. Converting a Dask array into memory generally requires an
-explicit conversion step. One notable exception is indexing operations: to
-enable label based indexing, xarray will automatically load coordinate labels
-into memory.
+ ds = xr.Dataset(
+ {
+ "temperature": (
+ ("time", "latitude", "longitude"),
+ np.random.randn(30, 180, 180),
+ ),
+ "time": pd.date_range("2015-01-01", periods=30),
+ "longitude": np.arange(180),
+ "latitude": np.arange(89.5, -90.5, -1),
+ }
+ )
+ ds.to_netcdf("example-data.nc")
-.. tip::
+There are a few common cases where you may want to convert lazy Dask arrays into eager, in-memory Xarray data structures:
- By default, dask uses its multi-threaded scheduler, which distributes work across
- multiple cores and allows for processing some datasets that do not fit into memory.
- For running across a cluster, `setup the distributed scheduler `_.
+- You want to inspect smaller intermediate results when working interactively or debugging
+- You've reduced the dataset (by filtering or with a groupby, for example) and now have something much smaller that fits in memory
+- You need to compute intermediate results since Dask is unable (or struggles) to perform a certain computation. The canonical example of this is normalizing a dataset, e.g., ``ds - ds.mean()``, when ``ds`` is larger than memory. Typically, you should either save ``ds`` to disk or compute ``ds.mean()`` eagerly.
-The easiest way to convert an xarray data structure from lazy Dask arrays into
-*eager*, in-memory NumPy arrays is to use the :py:meth:`~xarray.Dataset.load` method:
+To do this, you can use :py:meth:`Dataset.compute` or :py:meth:`DataArray.compute`:
.. ipython:: python
- ds.load()
+ ds.compute()
-You can also access :py:attr:`~xarray.DataArray.values`, which will always be a
-NumPy array:
-
-.. ipython::
- :verbatim:
+.. note::
- In [5]: ds.temperature.values
- Out[5]:
- array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01],
- [ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
- ...
- # truncated for brevity
+ Using :py:meth:`Dataset.compute` is preferred to :py:meth:`Dataset.load`, which changes the results in-place.
-Explicit conversion by wrapping a DataArray with ``np.asarray`` also works:
+You can also access :py:attr:`DataArray.values`, which will always be a NumPy array:
.. ipython::
:verbatim:
- In [5]: np.asarray(ds.temperature)
+ In [5]: ds.temperature.values
Out[5]:
array([[[ 4.691e-01, -2.829e-01, ..., -5.577e-01, 3.814e-01],
[ 1.337e+00, -1.531e+00, ..., 8.726e-01, -1.538e+00],
...
+ # truncated for brevity
-Alternatively you can load the data into memory but keep the arrays as
-Dask arrays using the :py:meth:`~xarray.Dataset.persist` method:
-
-.. ipython:: python
-
- persisted = ds.persist()
-
-:py:meth:`~xarray.Dataset.persist` is particularly useful when using a
-distributed cluster because the data will be loaded into distributed memory
-across your machines and be much faster to use than reading repeatedly from
-disk.
-
-.. warning::
-
- On a single machine :py:meth:`~xarray.Dataset.persist` will try to load all of
- your data into memory. You should make sure that your dataset is not larger than
- available memory.
-
-.. note::
-
- For more on the differences between :py:meth:`~xarray.Dataset.persist` and
- :py:meth:`~xarray.Dataset.compute` see this `Stack Overflow answer on the differences between client persist and client compute `_ and the `Dask documentation `_.
-
-For performance you may wish to consider chunk sizes. The correct choice of
-chunk size depends both on your data and on the operations you want to perform.
-With xarray, both converting data to a Dask arrays and converting the chunk
-sizes of Dask arrays is done with the :py:meth:`~xarray.Dataset.chunk` method:
+NumPy ufuncs like :py:func:`numpy.sin` transparently work on all xarray objects, including those
+that store lazy Dask arrays:
.. ipython:: python
- rechunked = ds.chunk({"latitude": 100, "longitude": 100})
+ import numpy as np
-.. warning::
+ np.sin(ds)
- Rechunking an existing dask array created with :py:func:`~xarray.open_mfdataset`
- is not recommended (see above).
+To access Dask arrays directly, use the :py:attr:`DataArray.data` attribute which exposes the DataArray's underlying array type.
-You can view the size of existing chunks on an array by viewing the
-:py:attr:`~xarray.Dataset.chunks` attribute:
+If you're using a Dask cluster, you can also use :py:meth:`Dataset.persist` for quickly accessing intermediate outputs. This is most helpful after expensive operations like rechunking or setting an index. It's a way of telling the cluster that it should start executing the computations that you have defined so far, and that it should try to keep those results in memory. You will get back a new Dask array that is semantically equivalent to your old array, but now points to running data.
-.. ipython:: python
+.. code-block:: python
- rechunked.chunks
+ ds = ds.persist()
-If there are not consistent chunksizes between all the arrays in a dataset
-along a particular dimension, an exception is raised when you try to access
-``.chunks``.
+.. tip::
-.. note::
+ Remember to save the dataset returned by persist! This is a common mistake.
- In the future, we would like to enable automatic alignment of Dask
- chunksizes (but not the other way around). We might also require that all
- arrays in a dataset share the same chunking alignment. Neither of these
- are currently done.
+.. _dask.chunks:
-NumPy ufuncs like ``np.sin`` transparently work on all xarray objects, including those
-that store lazy Dask arrays:
+Chunking and performance
+~~~~~~~~~~~~~~~~~~~~~~~~
-.. ipython:: python
+The way a dataset is chunked can be critical to performance when working with large datasets. You'll want chunk sizes large enough to reduce the number of chunks that Dask has to think about (to reduce overhead from the task graph) but also small enough so that many of them can fit in memory at once.
- import numpy as np
+.. tip::
- np.sin(rechunked)
+ A good rule of thumb is to create arrays with a minimum chunk size of at least one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), you may need larger chunks. See `Choosing good chunk sizes in Dask `_.
-To access Dask arrays directly, use the
-:py:attr:`DataArray.data ` attribute. This attribute exposes
-array data either as a Dask array or as a NumPy array, depending on whether it has been
-loaded into Dask or not:
+It can be helpful to choose chunk sizes based on your downstream analyses and to chunk as early as possible. Datasets with smaller chunks along the time axis, for example, can make time domain problems easier to parallelize since Dask can perform the same operation on each time chunk. If you're working with a large dataset with chunks that make downstream analyses challenging, you may need to rechunk your data. This is an expensive operation though, so is only recommended when needed.
-.. ipython:: python
+You can chunk or rechunk a dataset by:
- ds.temperature.data
+- Specifying the ``chunks`` kwarg when reading in your dataset. If you know you'll want to do some spatial subsetting, for example, you could use ``chunks={'latitude': 10, 'longitude': 10}`` to specify small chunks across space. This can avoid loading subsets of data that span multiple chunks, thus reducing the number of file reads. Note that this will only work, though, for chunks that are similar to how the data is chunked on disk. Otherwise, it will be very slow and require a lot of network bandwidth.
+- Many array file formats are chunked on disk. You can specify ``chunks={}`` to have a single dask chunk map to a single on-disk chunk, and ``chunks="auto"`` to have a single dask chunk be a automatically chosen multiple of the on-disk chunks.
+- Using :py:meth:`Dataset.chunk` after you've already read in your dataset. For time domain problems, for example, you can use ``ds.chunk(time=TimeResampler())`` to rechunk according to a specified unit of time. ``ds.chunk(time=TimeResampler("MS"))``, for example, will set the chunks so that a month of data is contained in one chunk.
-.. note::
- ``.data`` is also used to expose other "computable" array backends beyond Dask and
- NumPy (e.g. sparse and pint arrays).
+For large-scale rechunking tasks (e.g., converting a simulation dataset stored with chunking only along time to a dataset with chunking only across space), consider writing another copy of your data on disk and/or using dedicated tools such as `Rechunker `_.
.. _dask.automatic-parallelization:
-Automatic parallelization with ``apply_ufunc`` and ``map_blocks``
------------------------------------------------------------------
-
-.. tip::
+Parallelize custom functions with ``apply_ufunc`` and ``map_blocks``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
- Some problems can become embarrassingly parallel and thus easy to parallelize
- automatically by rechunking to a frequency, e.g. ``ds.chunk(time=TimeResampler("YE"))``.
- See :py:meth:`Dataset.chunk` for more.
-
-Almost all of xarray's built-in operations work on Dask arrays. If you want to
-use a function that isn't wrapped by xarray, and have it applied in parallel on
+Almost all of Xarray's built-in operations work on Dask arrays. If you want to
+use a function that isn't wrapped by Xarray, and have it applied in parallel on
each block of your xarray object, you have three options:
-1. Extract Dask arrays from xarray objects (``.data``) and use Dask directly.
-2. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays.
-3. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks`
+1. Use :py:func:`~xarray.apply_ufunc` to apply functions that consume and return NumPy arrays.
+2. Use :py:func:`~xarray.map_blocks`, :py:meth:`Dataset.map_blocks` or :py:meth:`DataArray.map_blocks`
to apply functions that consume and return xarray objects.
+3. Extract Dask Arrays from xarray objects with :py:attr:`DataArray.data` and use Dask directly.
+
+.. tip::
+
+ See the extensive Xarray tutorial on `apply_ufunc `_.
``apply_ufunc``
-~~~~~~~~~~~~~~~
+###############
:py:func:`~xarray.apply_ufunc` automates `embarrassingly parallel
`__ "map" type operations
where a function written for processing NumPy arrays should be repeatedly
-applied to xarray objects containing Dask arrays. It works similarly to
+applied to Xarray objects containing Dask Arrays. It works similarly to
:py:func:`dask.array.map_blocks` and :py:func:`dask.array.blockwise`, but without
-requiring an intermediate layer of abstraction.
+requiring an intermediate layer of abstraction. See the `Dask documentation `__ for more details.
For the best performance when using Dask's multi-threaded scheduler, wrap a
function that already releases the global interpreter lock, which fortunately
@@ -415,9 +333,7 @@ application.
.. tip::
- For the majority of NumPy functions that are already wrapped by Dask, it's
- usually a better idea to use the pre-existing ``dask.array`` function, by
- using either a pre-existing xarray methods or
+ When possible, it's recommended to use pre-existing ``dask.array`` functions, either with existing xarray methods or
:py:func:`~xarray.apply_ufunc()` with ``dask='allowed'``. Dask can often
have a more efficient implementation that makes use of the specialized
structure of a problem, unlike the generic speedups offered by
@@ -425,10 +341,10 @@ application.
``map_blocks``
-~~~~~~~~~~~~~~
+##############
-Functions that consume and return xarray objects can be easily applied in parallel using :py:func:`map_blocks`.
-Your function will receive an xarray Dataset or DataArray subset to one chunk
+Functions that consume and return Xarray objects can be easily applied in parallel using :py:func:`map_blocks`.
+Your function will receive an Xarray Dataset or DataArray subset to one chunk
along each chunked dimension.
.. ipython:: python
@@ -455,7 +371,7 @@ Notice that the :py:meth:`map_blocks` call printed
``func`` is received 0-sized blocks! :py:meth:`map_blocks` needs to know what the final result
looks like in terms of dimensions, shapes etc. It does so by running the provided function on 0-shaped
inputs (*automated inference*). This works in many cases, but not all. If automatic inference does not
-work for your function, provide the ``template`` kwarg (see below).
+work for your function, provide the ``template`` kwarg (see :ref:`below `).
In this case, automatic inference has worked so let's check that the result is as expected.
@@ -469,7 +385,6 @@ This executes the Dask graph in `serial` using a for loop, but allows for printi
debugging techniques. We can easily see that our function is receiving blocks of shape 10x180x180 and
the returned result is identical to ``ds.time`` as expected.
-
Here is a common example where automated inference will not work.
.. ipython:: python
@@ -489,6 +404,8 @@ what the function returns) with dimensions, shapes, chunk sizes, attributes, coo
variables that look exactly like the expected result. The variables should be dask-backed and hence not
incur much memory cost.
+.. _template-note:
+
.. note::
Note that when ``template`` is provided, ``attrs`` from ``template`` are copied over to the result. Any
@@ -533,61 +450,45 @@ Notice that the 0-shaped sizes were not printed to screen. Since ``template`` ha
As :py:func:`map_blocks` loads each block into memory, reduce as much as possible objects consumed by user functions.
For example, drop useless variables before calling ``func`` with :py:func:`map_blocks`.
+Deploying Dask
+--------------
+By default, Dask uses the multi-threaded scheduler, which distributes work across multiple cores on a single machine and allows for processing some datasets that do not fit into memory. However, this has two limitations:
-Chunking and performance
-------------------------
-
-The ``chunks`` parameter has critical performance implications when using Dask
-arrays. If your chunks are too small, queueing up operations will be extremely
-slow, because Dask will translate each operation into a huge number of
-operations mapped across chunks. Computation on Dask arrays with small chunks
-can also be slow, because each operation on a chunk has some fixed overhead from
-the Python interpreter and the Dask task executor.
-
-Conversely, if your chunks are too big, some of your computation may be wasted,
-because Dask only computes results one chunk at a time.
-
-A good rule of thumb is to create arrays with a minimum chunksize of at least
-one million elements (e.g., a 1000x1000 matrix). With large arrays (10+ GB), the
-cost of queueing up Dask operations can be noticeable, and you may need even
-larger chunksizes.
-
-.. tip::
-
- Check out the `dask documentation on chunks `_.
-
-.. tip::
+- You are limited by the size of your hard drive
+- Downloading data can be slow and expensive
- Many time domain problems become amenable to an embarrassingly parallel or blockwise solution
- (e.g. using :py:func:`xarray.map_blocks`, :py:func:`dask.array.map_blocks`, or
- :py:func:`dask.array.blockwise`) by rechunking to a frequency along the time dimension.
- Provide :py:class:`xarray.groupers.TimeResampler` objects to :py:meth:`Dataset.chunk` to do so.
- For example ``ds.chunk(time=TimeResampler("MS"))`` will set the chunks so that a month of
- data is contained in one chunk. The resulting chunk sizes need not be uniform, depending on
- the frequency of the data, and the calendar.
+Instead, it can be faster and cheaper to run your computations close to where your data is stored, distributed across many machines on a Dask cluster. Often, this means deploying Dask on HPC clusters or on the cloud. See the `Dask deployment documentation `__ for more details.
+Best Practices
+--------------
-Optimization Tips
------------------
+Dask is pretty easy to use but there are some gotchas, many of which are under active development. Here are some tips we have found through experience. We also recommend checking out the `Dask best practices `_.
-With analysis pipelines involving both spatial subsetting and temporal resampling, Dask performance
-can become very slow or memory hungry in certain cases. Here are some optimization tips we have found
-through experience:
-
-1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early in the pipeline, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_).
+1. Do your spatial and temporal indexing (e.g. ``.sel()`` or ``.isel()``) early, especially before calling ``resample()`` or ``groupby()``. Grouping and resampling triggers some computation on all the blocks, which in theory should commute with indexing, but this optimization hasn't been implemented in Dask yet. (See `Dask issue #746 `_).
2. More generally, ``groupby()`` is a costly operation and will perform a lot better if the ``flox`` package is installed.
See the `flox documentation `_ for more. By default Xarray will use ``flox`` if installed.
3. Save intermediate results to disk as a netCDF files (using ``to_netcdf()``) and then load them again with ``open_dataset()`` for further computations. For example, if subtracting temporal mean from a dataset, save the temporal mean to disk before subtracting. Again, in theory, Dask should be able to do the computation in a streaming fashion, but in practice this is a fail case for the Dask scheduler, because it tries to keep every chunk of an array that it computes in memory. (See `Dask issue #874 `_)
-4. Specify smaller chunks across space when using :py:meth:`~xarray.open_mfdataset` (e.g., ``chunks={'latitude': 10, 'longitude': 10}``). This makes spatial subsetting easier, because there's no risk you will load subsets of data which span multiple chunks. On individual files, prefer to subset before chunking (suggestion 1).
+4. Use the `Dask dashboard `_ to identify performance bottlenecks.
+
+Here's an example of a simplified workflow putting some of these tips together:
+
+.. code-block:: python
-5. Chunk as early as possible, and avoid rechunking as much as possible. Always pass the ``chunks={}`` argument to :py:func:`~xarray.open_mfdataset` to avoid redundant file reads.
+ import xarray
-6. Using the h5netcdf package by passing ``engine='h5netcdf'`` to :py:meth:`~xarray.open_mfdataset` can be quicker than the default ``engine='netcdf4'`` that uses the netCDF4 package.
+ ds = xr.open_zarr( # Since we're doing a spatial reduction, increase chunk size in x, y
+ "my-data.zarr", chunks={"x": 100, "y": 100}
+ )
+
+ time_subset = ds.sea_temperature.sel(
+ time=slice("2020-01-01", "2020-12-31") # Filter early
+ )
-7. Find `best practices specific to Dask arrays in the documentation `_.
+ # faster resampling when flox is installed
+ daily = ds.resample(time="D").mean()
-8. The `dask diagnostics `_ can be useful in identifying performance bottlenecks.
+ daily.load() # Pull smaller results into memory after reducing the dataset