Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix map_blocks HLG layering #3598

Merged
merged 4 commits into from
Dec 7, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions xarray/core/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
except ImportError:
pass

import collections
import itertools
import operator
from typing import (
Any,
Callable,
Dict,
DefaultDict,
Hashable,
Mapping,
Sequence,
Expand Down Expand Up @@ -222,6 +224,7 @@ def _wrapper(func, obj, to_array, args, kwargs):
indexes.update({k: template.indexes[k] for k in new_indexes})

graph: Dict[Any, Any] = {}
new_layers: DefaultDict[str, Dict[Any, Any]] = collections.defaultdict(dict)
gname = "{}-{}".format(
dask.utils.funcname(func), dask.base.tokenize(dataset, args, kwargs)
)
Expand Down Expand Up @@ -310,9 +313,13 @@ def _wrapper(func, obj, to_array, args, kwargs):
# unchunked dimensions in the input have one chunk in the result
key += (0,)

graph[key] = (operator.getitem, from_wrapper, name)
new_layers[gname_l][key] = (operator.getitem, from_wrapper, name)
dcherian marked this conversation as resolved.
Show resolved Hide resolved

graph = HighLevelGraph.from_collections(gname, graph, dependencies=[dataset])
hlg = HighLevelGraph.from_collections(gname, graph, dependencies=[dataset])
dcherian marked this conversation as resolved.
Show resolved Hide resolved

for gname_l, layer in new_layers.items():
dcherian marked this conversation as resolved.
Show resolved Hide resolved
hlg.dependencies[gname_l] = {gname}
hlg.layers[gname_l] = layer

result = Dataset(coords=indexes, attrs=template.attrs)
for name, gname_l in var_key_map.items():
Expand All @@ -325,7 +332,7 @@ def _wrapper(func, obj, to_array, args, kwargs):
var_chunks.append((len(indexes[dim]),))

data = dask.array.Array(
graph, name=gname_l, chunks=var_chunks, dtype=template[name].dtype
hlg, name=gname_l, chunks=var_chunks, dtype=template[name].dtype
)
result[name] = (dims, data, template[name].attrs)

Expand Down
7 changes: 7 additions & 0 deletions xarray/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,13 @@ def func(obj):
assert_identical(expected.compute(), actual.compute())


def test_map_blocks_hlg_layers():
ds = xr.Dataset({"x": (("y",), dask.array.ones(10, chunks=(5,)))})
dcherian marked this conversation as resolved.
Show resolved Hide resolved
mapped = ds.map_blocks(lambda x: x)

xr.testing.assert_equal(mapped, ds) # does not work
dcherian marked this conversation as resolved.
Show resolved Hide resolved


def test_make_meta(map_ds):
from ..core.parallel import make_meta

Expand Down