Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributed.Client.compute fails on DataArray #3171

Closed
crusaderky opened this issue Jul 31, 2019 · 2 comments · Fixed by #3173
Closed

distributed.Client.compute fails on DataArray #3171

crusaderky opened this issue Jul 31, 2019 · 2 comments · Fixed by #3173

Comments

@crusaderky
Copy link
Contributor

crusaderky commented Jul 31, 2019

As of

  • dask 2.1.0
  • distributed 2.1.0
  • xarray 0.12.1 or git head (didn't try older versions):
>>> import xarray
>>> import distributed
>>> client = distributed.Client(set_as_default=False)
>>> ds = xarray.Dataset({'d': ('x', [1, 2])}).chunk(1)
>>> client.compute(ds).result()
<xarray.Dataset>
Dimensions:  (x: 2)
Dimensions without coordinates: x
Data variables:
    d        (x) int64 1 2

>>> client.compute(ds.d).result()
distributed.worker - WARNING -  Compute Failed
Function:  _dask_finalize
args:      ([[array([1]), array([2])]], <function Dataset._dask_postcompute at 0x316a1db70>, ([(True, <this-array>, (<function Variable._dask_finalize at 0x3168f7f28>, (<function finalize at 0x1166bb8c8>, (), ('x',), OrderedDict(), None)))], set(), {'x': 2}, None, None, None, None), 'd')
kwargs:    {}
Exception: KeyError(<this-array>)

---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-8-2dbfe1b2ff17> in <module>
----> 1 client.compute(ds.d).result()

/anaconda3/lib/python3.7/site-packages/distributed/client.py in result(self, timeout)
    226         result = self.client.sync(self._result, callback_timeout=timeout, raiseit=False)
    227         if self.status == "error":
--> 228             six.reraise(*result)
    229         elif self.status == "cancelled":
    230             raise result

/anaconda3/lib/python3.7/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/PycharmProjects/xarray/xarray/core/dataarray.py in _dask_finalize()
    706     def _dask_finalize(results, func, args, name):
    707         ds = func(results, *args)
--> 708         variable = ds._variables.pop(_THIS_ARRAY)
    709         coords = ds._variables
    710         return DataArray(variable, coords, name=name, fastpath=True)
@crusaderky
Copy link
Contributor Author

The problem is that the hash of ReprObject changes on a pickle round-trip:

>>> import pickle
>>> from xarray.core.dataarray import _THIS_ARRAY
>>> hash(_THIS_ARRAY)
840169974
>>> hash(pickle.loads(pickle.dumps(_THIS_ARRAY)))
286354559

@crusaderky
Copy link
Contributor Author

crusaderky commented Aug 1, 2019

I could not figure out how DataArray.compute and dask.compute using a registered distributed.Client instead managed to somehow work.
I get that in some cases DataArray._dask_finalize is triggered, while in others it's DataArray._from_temp_dataset, but the two functions look the same to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants