Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for remote string paths to h5netcdf engine #8424

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
8 changes: 7 additions & 1 deletion xarray/backends/file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,17 @@ def __init__(

def _make_key(self):
"""Make a key for caching files in the LRU cache."""
kwargs = self._kwargs
# storage_options is a non-hashable dict, so we implement special logic for hashing
if self._kwargs.get("storage_options", None) is not None:
kwargs = self._kwargs.copy()
kwargs["storage_options"] = tuple(sorted(kwargs["storage_options"].items()))

value = (
self._opener,
self._args,
"a" if self._mode == "w" else self._mode,
tuple(sorted(self._kwargs.items())),
tuple(sorted(kwargs.items())),
self._manager_id,
)
return _HashedSequence(value)
Expand Down
24 changes: 21 additions & 3 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@ def _h5netcdf_create_group(dataset, name):
return dataset.create_group(name)


def _h5netcdf_opener(filename, mode, storage_options=None, **kwargs):
import h5netcdf

if isinstance(filename, str) and is_remote_uri(filename):
import fsspec

mode_ = "rb" if mode == "r" else mode
fs, _, _ = fsspec.get_fs_token_paths(
filename, mode=mode_, storage_options=storage_options
)
filename = fs.open(filename, mode=mode_)
Copy link
Contributor

@dcherian dcherian Nov 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an expert in this bit but there's a _find_absolute_path in backends/common.py that shares a lot of code with the first three lines here. It includes a nice error message if fsspec is not installed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to unconditionally open remote urls with fsspec? This contradicts the usage of native implementations (via "driver"-kwarg, see #8360) in h5py/hdf5.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to unconditionally open remote urls with fsspec?

My guess is yes by default. That's what other pydata libraries like pandas, Dask, Zarr, etc. have converged on for file handling, so it would be familiar to many users. That said, if driver= offers some benefits over fsspec (again, I'm not familiar with the new driver functionality) it'd be easy for these two approaches to live alongside each other:

  • Use fsspec by default is no driver is specified
  • If driver is specified use that instead
  • If there are conflicting options provided for some reason (e.g. driver= and storage_options=) then raise an informative error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The algorithm shown here looks good to me. I also think fsspec is more used by users although keeping the ros3 alternative is desirable too (see this).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes looks indeed very similar to _find_absolute_paths, can we instead get that one working for this case as well?

def _find_absolute_paths(
paths: str | os.PathLike | NestedSequence[str | os.PathLike], **kwargs
) -> list[str]:
"""
Find absolute paths from the pattern.
Parameters
----------
paths :
Path(s) to file(s). Can include wildcards like * .
**kwargs :
Extra kwargs. Mainly for fsspec.
Examples
--------
>>> from pathlib import Path
>>> directory = Path(xr.backends.common.__file__).parent
>>> paths = str(Path(directory).joinpath("comm*n.py")) # Find common with wildcard
>>> paths = xr.backends.common._find_absolute_paths(paths)
>>> [Path(p).name for p in paths]
['common.py']
"""
if isinstance(paths, str):
if is_remote_uri(paths) and kwargs.get("engine", None) == "zarr":
try:
from fsspec.core import get_fs_token_paths
except ImportError as e:
raise ImportError(
"The use of remote URLs for opening zarr requires the package fsspec"
) from e
fs, _, _ = get_fs_token_paths(
paths,
mode="rb",
storage_options=kwargs.get("backend_kwargs", {}).get(
"storage_options", {}
),
expand=False,
)
tmp_paths = fs.glob(fs._strip_protocol(paths)) # finds directories
paths = [fs.get_mapper(path) for path in tmp_paths]
elif is_remote_uri(paths):
raise ValueError(
"cannot do wild-card matching for paths that are remote URLs "
f"unless engine='zarr' is specified. Got paths: {paths}. "
"Instead, supply paths as an explicit list of strings."
)
else:
paths = sorted(glob(_normalize_path(paths)))
elif isinstance(paths, os.PathLike):
paths = [os.fspath(paths)]
else:
paths = [os.fspath(p) if isinstance(p, os.PathLike) else p for p in paths]
return paths

return h5netcdf.File(filename, mode=mode, **kwargs)


class H5NetCDFStore(WritableCFDataStore):
"""Store for reading and writing data via h5netcdf"""

Expand Down Expand Up @@ -140,9 +154,8 @@ def open(
invalid_netcdf=None,
phony_dims=None,
decode_vlen_strings=True,
storage_options=None,
):
import h5netcdf

if isinstance(filename, bytes):
raise ValueError(
"can't open netCDF4/HDF5 as bytes "
Expand All @@ -161,6 +174,7 @@ def open(
kwargs = {
"invalid_netcdf": invalid_netcdf,
"decode_vlen_strings": decode_vlen_strings,
"storage_options": storage_options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can obtain the fsspec first, we could simplify this significantly along those lines:

# get open fsspec-handle first
if storage_options is not None:
    filename = _find_absolute_paths(filename, engine="h5netcdf", backend_kwargs=dict(storage_options=storage_options))

# other code

manager = CachingFileManager(
            h5netcdf.File, filename, mode=mode, kwargs=kwargs
        )

_find_absolute_paths would need changes to cover for this, though.

}
if phony_dims is not None:
kwargs["phony_dims"] = phony_dims
Expand All @@ -171,7 +185,9 @@ def open(
else:
lock = combine_locks([HDF5_LOCK, get_write_lock(filename)])

manager = CachingFileManager(h5netcdf.File, filename, mode=mode, kwargs=kwargs)
manager = CachingFileManager(
_h5netcdf_opener, filename, mode=mode, kwargs=kwargs
)
return cls(manager, group=group, mode=mode, lock=lock, autoclose=autoclose)

def _acquire(self, needs_lock=True):
Expand Down Expand Up @@ -397,6 +413,7 @@ def open_dataset( # type: ignore[override] # allow LSP violation, not supporti
invalid_netcdf=None,
phony_dims=None,
decode_vlen_strings=True,
storage_options=None,
) -> Dataset:
filename_or_obj = _normalize_path(filename_or_obj)
store = H5NetCDFStore.open(
Expand All @@ -407,6 +424,7 @@ def open_dataset( # type: ignore[override] # allow LSP violation, not supporti
invalid_netcdf=invalid_netcdf,
phony_dims=phony_dims,
decode_vlen_strings=decode_vlen_strings,
storage_options=storage_options,
)

store_entrypoint = StoreBackendEntrypoint()
Expand Down
21 changes: 21 additions & 0 deletions xarray/tests/test_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -2915,6 +2915,27 @@
assert_identical(ds, ds_a)


@requires_h5netcdf
@requires_fsspec
def test_h5netcdf_storage_options() -> None:
with create_tmp_files(2) as (f1, f2):
kmuehlbauer marked this conversation as resolved.
Show resolved Hide resolved
ds1 = create_test_data()
ds1.to_netcdf(f1, engine="h5netcdf")

ds2 = create_test_data()
ds2.to_netcdf(f2, engine="h5netcdf")

files = [f"file://{f}" for f in [f1, f2]]
ds = xr.open_mfdataset(
files,
engine="h5netcdf",
concat_dim="time",
combine="nested",
storage_options={"skip_instance_cache": False},
)
assert_identical(xr.concat([ds1, ds2], dim="time"), ds)


@requires_scipy
class TestScipyInMemoryData(CFEncodedBase, NetCDF3Only):
engine: T_NetcdfEngine = "scipy"
Expand Down Expand Up @@ -4217,14 +4238,14 @@
assert_identical(data, on_disk)

def test_deterministic_names(self) -> None:
with create_tmp_file() as tmp:

Check failure on line 4241 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_dask_roundtrip Failed: Timeout >180.0s
data = create_test_data()
data.to_netcdf(tmp)
with open_mfdataset(tmp, combine="by_coords") as ds:
original_names = {k: v.data.name for k, v in ds.data_vars.items()}
with open_mfdataset(tmp, combine="by_coords") as ds:
repeat_names = {k: v.data.name for k, v in ds.data_vars.items()}
for var_name, dask_name in original_names.items():

Check failure on line 4248 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_deterministic_names Failed: Timeout >180.0s
assert var_name in dask_name
assert dask_name[:13] == "open_dataset-"
assert original_names == repeat_names
Expand All @@ -4242,7 +4263,7 @@
def test_save_mfdataset_compute_false_roundtrip(self) -> None:
from dask.delayed import Delayed

original = Dataset({"foo": ("x", np.random.randn(10))}).chunk()

Check failure on line 4266 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_dataarray_compute Failed: Timeout >180.0s
datasets = [original.isel(x=slice(5)), original.isel(x=slice(5, 10))]
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as tmp1:
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as tmp2:
Expand All @@ -4255,7 +4276,7 @@
[tmp1, tmp2], combine="nested", concat_dim="x"
) as actual:
assert_identical(actual, original)

Check failure on line 4279 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_save_mfdataset_compute_false_roundtrip Failed: Timeout >180.0s
def test_load_dataset(self) -> None:
with create_tmp_file() as tmp:
original = Dataset({"foo": ("x", np.random.randn(10))})
Expand All @@ -4264,7 +4285,7 @@
# this would fail if we used open_dataset instead of load_dataset
ds.to_netcdf(tmp)

def test_load_dataarray(self) -> None:

Check failure on line 4288 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_load_dataset Failed: Timeout >180.0s
with create_tmp_file() as tmp:
original = Dataset({"foo": ("x", np.random.randn(10))})
original.to_netcdf(tmp)
Expand All @@ -4272,7 +4293,7 @@
# this would fail if we used open_dataarray instead of
# load_dataarray
ds.to_netcdf(tmp)

Check failure on line 4296 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_load_dataarray Failed: Timeout >180.0s
@pytest.mark.skipif(
ON_WINDOWS,
reason="counting number of tasks in graph fails on windows for some reason",
Expand All @@ -4285,7 +4306,7 @@

def num_graph_nodes(obj):
return len(obj.__dask_graph__())

Check failure on line 4309 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestDask.test_inline_array Failed: Timeout >180.0s
not_inlined_ds = open_dataset(tmp, inline_array=False, chunks=chunks)
inlined_ds = open_dataset(tmp, inline_array=True, chunks=chunks)
assert num_graph_nodes(inlined_ds) < num_graph_nodes(not_inlined_ds)
Expand Down Expand Up @@ -4332,7 +4353,7 @@

# global attributes should be global attributes on the dataset
assert "NC_GLOBAL" not in actual.attrs
assert "history" in actual.attrs

Check failure on line 4356 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestPydap.test_cmp_local_file Failed: Timeout >180.0s

# we don't check attributes exactly with assertDatasetIdentical()
# because the test DAP server seems to insert some extra
Expand Down Expand Up @@ -4366,7 +4387,7 @@
actual.to_netcdf(tmp_file)
with open_dataset(tmp_file) as actual2:
actual2["bears"] = actual2["bears"].astype(str)
assert_equal(actual2, expected)

Check failure on line 4390 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestPydap.test_compatible_to_netcdf Failed: Timeout >180.0s

@requires_dask
def test_dask(self) -> None:
Expand All @@ -4375,7 +4396,7 @@


@network
@requires_scipy_or_netCDF4

Check failure on line 4399 in xarray/tests/test_backends.py

View workflow job for this annotation

GitHub Actions / macos-latest py3.11

TestPydap.test_dask Failed: Timeout >180.0s
@requires_pydap
class TestPydapOnline(TestPydap):
@contextlib.contextmanager
Expand Down
Loading