Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow optional path in from_zarr and to_zarr #391

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,15 @@ def _from_array(block, input_array, outchunks=None, asarray=None, block_id=None)
return out


def from_zarr(store, spec=None) -> "Array":
def from_zarr(store, path=None, spec=None) -> "Array":
"""Load an array from Zarr storage.

Parameters
----------
store : string
Path to input Zarr store
store : string or Zarr Store
Input Zarr store
path : string, optional
Group path
spec : cubed.Spec, optional
The spec to use for the computation.

Expand All @@ -107,7 +109,7 @@ def from_zarr(store, spec=None) -> "Array":
The array loaded from Zarr storage.
"""
name = gensym()
target = zarr.open(store, mode="r")
target = zarr.open(store, path=path, mode="r")

from cubed.array_api import Array

Expand Down Expand Up @@ -162,7 +164,7 @@ def store(sources: Union["Array", Sequence["Array"]], targets, executor=None, **
compute(*arrays, executor=executor, _return_in_memory_array=False, **kwargs)


def to_zarr(x: "Array", store, executor=None, **kwargs):
def to_zarr(x: "Array", store, path=None, executor=None, **kwargs):
"""Save an array to Zarr storage.

Note that this operation is eager, and will run the computation
Expand All @@ -172,8 +174,10 @@ def to_zarr(x: "Array", store, executor=None, **kwargs):
----------
x : cubed.Array
Array to save
store : string
Path to output Zarr store
store : string or Zarr Store
Output Zarr store
path : string, optional
Group path
executor : cubed.runtime.types.Executor, optional
The executor to use to run the computation.
Defaults to using the in-process Python executor.
Expand All @@ -183,7 +187,14 @@ def to_zarr(x: "Array", store, executor=None, **kwargs):
identity = lambda a: a
ind = tuple(range(x.ndim))
out = blockwise(
identity, ind, x, ind, dtype=x.dtype, align_arrays=False, target_store=store
identity,
ind,
x,
ind,
dtype=x.dtype,
align_arrays=False,
target_store=store,
target_path=path,
)
out.compute(executor=executor, _return_in_memory_array=False, **kwargs)

Expand All @@ -197,6 +208,7 @@ def blockwise(
new_axes=None,
align_arrays=True,
target_store=None,
target_path=None,
extra_func_kwargs=None,
**kwargs,
) -> "Array":
Expand Down Expand Up @@ -286,6 +298,7 @@ def blockwise(
reserved_mem=spec.reserved_mem,
extra_projected_mem=extra_projected_mem,
target_store=target_store,
target_path=target_path,
shape=shape,
dtype=dtype,
chunks=_chunks,
Expand Down Expand Up @@ -318,6 +331,7 @@ def general_blockwise(
dtype,
chunks,
target_store=None,
target_path=None,
extra_func_kwargs=None,
**kwargs,
) -> "Array":
Expand Down Expand Up @@ -346,6 +360,7 @@ def general_blockwise(
reserved_mem=spec.reserved_mem,
extra_projected_mem=extra_projected_mem,
target_store=target_store,
target_path=target_path,
shape=shape,
dtype=dtype,
chunks=chunks,
Expand Down
5 changes: 4 additions & 1 deletion cubed/primitive/blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def blockwise(
allowed_mem: int,
reserved_mem: int,
target_store: T_Store,
target_path: Optional[str] = None,
shape: T_Shape,
dtype: T_DType,
chunks: T_Chunks,
Expand Down Expand Up @@ -203,6 +204,7 @@ def blockwise(
allowed_mem=allowed_mem,
reserved_mem=reserved_mem,
target_store=target_store,
target_path=target_path,
shape=shape,
dtype=dtype,
chunks=chunks,
Expand All @@ -222,6 +224,7 @@ def general_blockwise(
allowed_mem: int,
reserved_mem: int,
target_store: T_Store,
target_path: Optional[str] = None,
shape: T_Shape,
dtype: T_DType,
chunks: T_Chunks,
Expand Down Expand Up @@ -277,7 +280,7 @@ def general_blockwise(
target_array = target_store
else:
target_array = lazy_empty(
shape, dtype=dtype, chunks=chunksize, store=target_store
shape, dtype=dtype, chunks=chunksize, store=target_store, path=target_path
)

func_kwargs = extra_func_kwargs or {}
Expand Down
21 changes: 17 additions & 4 deletions cubed/storage/zarr.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Union
from typing import Any, Optional, Union

import zarr

Expand All @@ -19,6 +19,7 @@ def __init__(
dtype: T_DType,
chunks: T_RegularChunks,
store: T_Store,
path: Optional[str] = None,
fill_value: Any = None,
**kwargs,
):
Expand All @@ -33,6 +34,7 @@ def __init__(
self.nbytes = template.nbytes

self.store = store
self.path = path
self.fill_value = fill_value
self.kwargs = kwargs

Expand All @@ -54,6 +56,7 @@ def create(self, mode: str = "w-") -> zarr.Array:
shape=self.shape,
dtype=self.dtype,
chunks=self.chunks,
path=self.path,
fill_value=self.fill_value,
**self.kwargs,
)
Expand All @@ -71,6 +74,7 @@ def open(self) -> zarr.Array:
shape=self.shape,
dtype=self.dtype,
chunks=self.chunks,
path=self.path,
)

def __repr__(self) -> str:
Expand All @@ -81,9 +85,15 @@ def __repr__(self) -> str:


def lazy_empty(
shape: T_Shape, *, dtype: T_DType, chunks: T_RegularChunks, store: T_Store, **kwargs
shape: T_Shape,
*,
dtype: T_DType,
chunks: T_RegularChunks,
store: T_Store,
path: Optional[str] = None,
**kwargs,
) -> LazyZarrArray:
return LazyZarrArray(shape, dtype, chunks, store, **kwargs)
return LazyZarrArray(shape, dtype, chunks, store, path=path, **kwargs)


def lazy_full(
Expand All @@ -93,9 +103,12 @@ def lazy_full(
dtype: T_DType,
chunks: T_RegularChunks,
store: T_Store,
path: Optional[str] = None,
**kwargs,
) -> LazyZarrArray:
return LazyZarrArray(shape, dtype, chunks, store, fill_value=fill_value, **kwargs)
return LazyZarrArray(
shape, dtype, chunks, store, path=path, fill_value=fill_value, **kwargs
)


def open_if_lazy_zarr_array(array: T_ZarrArray) -> zarr.Array:
Expand Down
15 changes: 9 additions & 6 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ def test_from_array_zarr(tmp_path, spec):
assert_array_equal(a, za)


def test_from_zarr(tmp_path, spec, executor):
@pytest.mark.parametrize("path", [None, "sub", "sub/group"])
def test_from_zarr(tmp_path, spec, executor, path):
store = store = tmp_path / "source.zarr"
create_zarr(
[[1, 2, 3], [4, 5, 6], [7, 8, 9]],
chunks=(2, 2),
store=store,
path=path,
)
a = cubed.from_zarr(store, spec=spec)
a = cubed.from_zarr(store, path=path, spec=spec)
assert_array_equal(
a.compute(executor=executor), np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
)
Expand Down Expand Up @@ -150,11 +152,12 @@ def test_store_fails(tmp_path, spec):
cubed.store([1], [target])


def test_to_zarr(tmp_path, spec, executor):
@pytest.mark.parametrize("path", [None, "sub", "sub/group"])
def test_to_zarr(tmp_path, spec, executor, path):
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
output = tmp_path / "output.zarr"
cubed.to_zarr(a, output, executor=executor)
res = zarr.open_array(output)
store = zarr.storage.DirectoryStore(tmp_path / "output.zarr")
cubed.to_zarr(a, store, path=path, executor=executor)
res = zarr.open_array(store, path=path)
assert_array_equal(res[:], np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))


Expand Down
6 changes: 4 additions & 2 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def on_task_end(self, event):
self.value += event.num_tasks


def create_zarr(a, /, store, *, dtype=None, chunks=None):
def create_zarr(a, /, store, *, dtype=None, chunks=None, path=None):
# from dask.asarray
if not isinstance(getattr(a, "shape", None), Iterable):
# ensure blocks are arrays
Expand All @@ -91,7 +91,9 @@ def create_zarr(a, /, store, *, dtype=None, chunks=None):
dtype = a.dtype

# write to zarr
za = zarr.open(store, mode="w", shape=a.shape, dtype=dtype, chunks=chunks)
za = zarr.open(
store, mode="w", shape=a.shape, dtype=dtype, chunks=chunks, path=path
)
za[:] = a
return za

Expand Down
2 changes: 1 addition & 1 deletion cubed/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
int, T_RegularChunks, T_RectangularChunks, Dict[Any, Any], Literal["auto"]
]

T_Store = str # TODO: expand this
T_Store = Any # TODO: improve this
Loading