Skip to content
forked from pydata/xarray

Commit

Permalink
__dask_tokenize__ (pydata#3446)
Browse files Browse the repository at this point in the history
* Implement __dask_tokenize__

* Fix window test

* Code review

* Test change in IndexVariable
  • Loading branch information
dcherian authored and crusaderky committed Oct 31, 2019
1 parent 8fbe1f8 commit 53c5199
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 5 deletions.
20 changes: 16 additions & 4 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ v0.14.1 (unreleased)
Breaking changes
~~~~~~~~~~~~~~~~

- Minimum cftime version is now 1.0.3. By `Deepak Cherian <https://github.com/dcherian>`_.
- Broken compatibility with cftime < 1.0.3.
By `Deepak Cherian <https://github.com/dcherian>`_.

.. note::

cftime version 1.0.4 is broken (`cftime/126 <https://github.com/Unidata/cftime/issues/126>`_), use version 1.0.4.2 instead.
cftime version 1.0.4 is broken
(`cftime/126 <https://github.com/Unidata/cftime/issues/126>`_);
please use version 1.0.4.2 instead.

- All leftover support for dates from non-standard calendars through netcdftime, the
module included in versions of netCDF4 prior to 1.4 that eventually became the
cftime package, has been removed in favor of relying solely on the standalone
cftime package (:pull:`3450`). By `Spencer Clark
<https://github.com/spencerkclark>`_.
cftime package (:pull:`3450`).
By `Spencer Clark <https://github.com/spencerkclark>`_.

New Features
~~~~~~~~~~~~
Expand All @@ -52,6 +55,14 @@ New Features
for now. Enable it with :py:meth:`xarray.set_options(display_style="html")`.
(:pull:`3425`) by `Benoit Bovy <https://github.com/benbovy>`_ and
`Julia Signell <https://github.com/jsignell>`_.
- Implement `dask deterministic hashing
<https://docs.dask.org/en/latest/custom-collections.html#deterministic-hashing>`_
for xarray objects. Note that xarray objects with a dask.array backend already used
deterministic hashing in previous releases; this change implements it when whole
xarray objects are embedded in a dask graph, e.g. when :meth:`DataArray.map` is
invoked. (:issue:`3378`, :pull:`3446`)
By `Deepak Cherian <https://github.com/dcherian>`_ and
`Guido Imperiale <https://github.com/crusaderky>`_.

Bug fixes
~~~~~~~~~
Expand Down Expand Up @@ -96,6 +107,7 @@ Internal Changes
- Use Python 3.6 idioms throughout the codebase. (:pull:3419)
By `Maximilian Roos <https://github.com/max-sixty>`_


.. _whats-new.0.14.0:

v0.14.0 (14 Oct 2019)
Expand Down
3 changes: 3 additions & 0 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,9 @@ def reset_coords(
dataset[self.name] = self.variable
return dataset

def __dask_tokenize__(self):
return (type(self), self._variable, self._coords, self._name)

def __dask_graph__(self):
return self._to_temp_dataset().__dask_graph__()

Expand Down
3 changes: 3 additions & 0 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,9 @@ def load(self, **kwargs) -> "Dataset":

return self

def __dask_tokenize__(self):
return (type(self), self._variables, self._coord_names, self._attrs)

def __dask_graph__(self):
graphs = {k: v.__dask_graph__() for k, v in self.variables.items()}
graphs = {k: v for k, v in graphs.items() if v is not None}
Expand Down
9 changes: 9 additions & 0 deletions xarray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,11 @@ def compute(self, **kwargs):
new = self.copy(deep=False)
return new.load(**kwargs)

def __dask_tokenize__(self):
# Use v.data, instead of v._data, in order to cope with the wrappers
# around NetCDF and the like
return type(self), self._dims, self.data, self._attrs

def __dask_graph__(self):
if isinstance(self._data, dask_array_type):
return self._data.__dask_graph__()
Expand Down Expand Up @@ -1963,6 +1968,10 @@ def __init__(self, dims, data, attrs=None, encoding=None, fastpath=False):
if not isinstance(self._data, PandasIndexAdapter):
self._data = PandasIndexAdapter(self._data)

def __dask_tokenize__(self):
# Don't waste time converting pd.Index to np.ndarray
return (type(self), self._dims, self._data.array, self._attrs)

def load(self):
# data is already loaded into memory for IndexVariable
return self
Expand Down
94 changes: 94 additions & 0 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import operator
import pickle
import sys
from contextlib import suppress
from distutils.version import LooseVersion
from textwrap import dedent
Expand All @@ -21,12 +22,16 @@
assert_frame_equal,
assert_identical,
raises_regex,
requires_scipy_or_netCDF4,
)
from .test_backends import create_tmp_file

dask = pytest.importorskip("dask")
da = pytest.importorskip("dask.array")
dd = pytest.importorskip("dask.dataframe")

ON_WINDOWS = sys.platform == "win32"


class CountingScheduler:
""" Simple dask scheduler counting the number of computes.
Expand Down Expand Up @@ -1135,3 +1140,92 @@ def test_make_meta(map_ds):
for variable in map_ds.data_vars:
assert variable in meta.data_vars
assert meta.data_vars[variable].shape == (0,) * meta.data_vars[variable].ndim


@pytest.mark.parametrize(
"obj", [make_da(), make_da().compute(), make_ds(), make_ds().compute()]
)
@pytest.mark.parametrize(
"transform",
[
lambda x: x.reset_coords(),
lambda x: x.reset_coords(drop=True),
lambda x: x.isel(x=1),
lambda x: x.attrs.update(new_attrs=1),
lambda x: x.assign_coords(cxy=1),
lambda x: x.rename({"x": "xnew"}),
lambda x: x.rename({"cxy": "cxynew"}),
],
)
def test_token_changes_on_transform(obj, transform):
with raise_if_dask_computes():
assert dask.base.tokenize(obj) != dask.base.tokenize(transform(obj))


@pytest.mark.parametrize(
"obj", [make_da(), make_da().compute(), make_ds(), make_ds().compute()]
)
def test_token_changes_when_data_changes(obj):
with raise_if_dask_computes():
t1 = dask.base.tokenize(obj)

# Change data_var
if isinstance(obj, DataArray):
obj *= 2
else:
obj["a"] *= 2
with raise_if_dask_computes():
t2 = dask.base.tokenize(obj)
assert t2 != t1

# Change non-index coord
obj.coords["ndcoord"] *= 2
with raise_if_dask_computes():
t3 = dask.base.tokenize(obj)
assert t3 != t2

# Change IndexVariable
obj.coords["x"] *= 2
with raise_if_dask_computes():
t4 = dask.base.tokenize(obj)
assert t4 != t3


@pytest.mark.parametrize("obj", [make_da().compute(), make_ds().compute()])
def test_token_changes_when_buffer_changes(obj):
with raise_if_dask_computes():
t1 = dask.base.tokenize(obj)

if isinstance(obj, DataArray):
obj[0, 0] = 123
else:
obj["a"][0, 0] = 123
with raise_if_dask_computes():
t2 = dask.base.tokenize(obj)
assert t2 != t1

obj.coords["ndcoord"][0] = 123
with raise_if_dask_computes():
t3 = dask.base.tokenize(obj)
assert t3 != t2


@pytest.mark.parametrize(
"transform",
[lambda x: x, lambda x: x.copy(deep=False), lambda x: x.copy(deep=True)],
)
@pytest.mark.parametrize("obj", [make_da(), make_ds(), make_ds().variables["a"]])
def test_token_identical(obj, transform):
with raise_if_dask_computes():
assert dask.base.tokenize(obj) == dask.base.tokenize(transform(obj))
assert dask.base.tokenize(obj.compute()) == dask.base.tokenize(
transform(obj.compute())
)


@requires_scipy_or_netCDF4
def test_normalize_token_with_backend(map_ds):
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as tmp_file:
map_ds.to_netcdf(tmp_file)
read = xr.open_dataset(tmp_file)
assert not dask.base.tokenize(map_ds) == dask.base.tokenize(read)
22 changes: 21 additions & 1 deletion xarray/tests/test_sparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from xarray.core.npcompat import IS_NEP18_ACTIVE
from xarray.core.pycompat import sparse_array_type

from . import assert_equal, assert_identical
from . import assert_equal, assert_identical, requires_dask

param = pytest.param
xfail = pytest.mark.xfail
Expand Down Expand Up @@ -849,3 +849,23 @@ def test_chunk():
dsc = ds.chunk(2)
assert dsc.chunks == {"dim_0": (2, 2)}
assert_identical(dsc, ds)


@requires_dask
def test_dask_token():
import dask

s = sparse.COO.from_numpy(np.array([0, 0, 1, 2]))
a = DataArray(s)
t1 = dask.base.tokenize(a)
t2 = dask.base.tokenize(a)
t3 = dask.base.tokenize(a + 1)
assert t1 == t2
assert t3 != t2
assert isinstance(a.data, sparse.COO)

ac = a.chunk(2)
t4 = dask.base.tokenize(ac)
t5 = dask.base.tokenize(ac + 1)
assert t4 != t5
assert isinstance(ac.data._meta, sparse.COO)

0 comments on commit 53c5199

Please sign in to comment.