Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasets): Add NetCDFDataSet class #360

Merged
merged 68 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
cfd040c
initialize template and early additions
riley-brady Sep 29, 2023
fa8f922
add placeholder for remote file system load
riley-brady Sep 29, 2023
b3ec640
switch to versioned dataset
riley-brady Sep 29, 2023
3d1b1f3
add initial remote -> local get for S3
riley-brady Sep 29, 2023
37ba9c2
further generalize remote retrieval
riley-brady Sep 29, 2023
0ccd58a
add in credentials
riley-brady Sep 29, 2023
de0b044
make temppath optional for remote datasets
riley-brady Sep 29, 2023
532fad8
add initial idea for multifile glob
riley-brady Sep 29, 2023
526a0ce
style: Introduce `ruff` for linting in all plugins. (#354)
merelcht Oct 2, 2023
84df521
add suggested style changes
riley-brady Oct 12, 2023
7bcef79
add temppath to attributes
riley-brady Oct 12, 2023
4dce2a5
more temppath fixes
riley-brady Oct 12, 2023
c9b320b
more temppath updates
riley-brady Oct 12, 2023
b67aabc
add better tempfile deletion and work on saving files
riley-brady Oct 12, 2023
0f018fe
make __del__ flexible
riley-brady Oct 12, 2023
0bff0fb
formatting
riley-brady Oct 12, 2023
b776e9e
feat(datasets): create custom `DeprecationWarning` (#356)
deepyaman Oct 2, 2023
9bb8063
docs(datasets): add note about DataSet deprecation (#357)
deepyaman Oct 3, 2023
99d80fd
test(datasets): skip `tensorflow` tests on Windows (#363)
deepyaman Oct 4, 2023
004203a
ci: Pin `tables` version (#370)
ankatiyar Oct 5, 2023
755ec17
build(datasets): Release `1.7.1` (#378)
merelcht Oct 6, 2023
037846d
docs: Update CONTRIBUTING.md and add one for `kedro-datasets` (#379)
ankatiyar Oct 6, 2023
76b32e6
ci(datasets): Run tensorflow tests separately from other dataset test…
merelcht Oct 6, 2023
283002b
feat: Kedro-Airflow convert all pipelines option (#335)
sbrugman Oct 9, 2023
50b84e9
docs(datasets): blacken code in rst literal blocks (#362)
deepyaman Oct 10, 2023
f6b1168
docs: cloudpickle is an interesting extension of the pickle functiona…
hfwittmann Oct 10, 2023
5ea49f1
fix(datasets): Fix secret scan entropy error (#383)
merelcht Oct 11, 2023
9cd98b7
style: Rename mentions of `DataSet` to `Dataset` in `kedro-airflow` a…
merelcht Oct 11, 2023
5468c65
feat(datasets): Migrated `PartitionedDataSet` and `IncrementalDataSet…
PtrBld Oct 11, 2023
6f93d70
fix: backwards compatibility for `kedro-airflow` (#381)
sbrugman Oct 12, 2023
b68bf41
fix(datasets): Don't warn for SparkDataset on Databricks when using s…
alamastor Oct 12, 2023
0aa1965
update docs API and release notes
riley-brady Oct 12, 2023
1d65b81
add netcdf requirements to setup
riley-brady Oct 12, 2023
4369f03
lint
riley-brady Oct 12, 2023
dfbf94f
add initial tests
riley-brady Oct 13, 2023
249deb7
update dataset exists for multifile
riley-brady Oct 13, 2023
df83360
Add full test suite for NetCDFDataSet
riley-brady Oct 13, 2023
ff2e0c2
Add docstring examples
riley-brady Oct 13, 2023
d17fa53
change xarray version req
riley-brady Oct 15, 2023
bf2235e
Merge branch 'main' into add_netcdf_zarr
riley-brady Oct 15, 2023
b09d927
update dask req
riley-brady Oct 15, 2023
9ff704a
rename DataSet -> Dataset
riley-brady Oct 16, 2023
7437e5d
Update xarray reqs for earlier python versions
riley-brady Oct 16, 2023
de0f135
fix setup
riley-brady Oct 16, 2023
0e93a62
update test coverage
riley-brady Oct 16, 2023
fb898d5
exclude init from test coverage
riley-brady Oct 16, 2023
32be659
Sub in pathlib for os.remove
riley-brady Oct 17, 2023
1cb07f8
add metadata to dataset
riley-brady Oct 17, 2023
ed5ca39
Merge branch 'main' into add_netcdf_zarr
riley-brady Oct 17, 2023
7130d2c
Merge branch 'main' into add_netcdf_zarr
merelcht Oct 18, 2023
50e093c
Merge branch 'main' into add_netcdf_zarr
noklam Oct 31, 2023
380ca34
add doctest for the new datasets
noklam Oct 31, 2023
35f9b11
Merge branch 'main' into add_netcdf_zarr
merelcht Nov 2, 2023
feb37b7
add patch for supporting http/https
riley-brady Jan 10, 2024
51feeab
Merge branch 'main' into add_netcdf_zarr
astrojuanlu Jan 31, 2024
411a057
Small fixes post-merge
astrojuanlu Jan 31, 2024
8588573
Lint
astrojuanlu Jan 31, 2024
b6ae60b
Fix import
astrojuanlu Feb 1, 2024
83e523c
Merge branch 'main' into add_netcdf_zarr
merelcht Feb 5, 2024
a2caff8
Merge branch 'main' into add_netcdf_zarr
riley-brady Feb 14, 2024
25c7c5c
Un-ignore NetCDF doctest
astrojuanlu Feb 15, 2024
f838783
Add fixture
ankatiyar Feb 19, 2024
195be05
Mark problematic test as xfail
astrojuanlu Feb 25, 2024
120a757
Merge branch 'main' into add_netcdf_zarr
astrojuanlu Feb 25, 2024
fc57ba2
Skip problematic test instead of making it fail
astrojuanlu Feb 26, 2024
16f906f
Merge branch 'main' into add_netcdf_zarr
ankatiyar Feb 28, 2024
210e4ed
Skip problematic tests and fix failing tests
ankatiyar Feb 28, 2024
88a63ea
Remove comment
ankatiyar Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Upcoming Release
## Major features and improvements
* Moved `PartitionedDataSet` and `IncrementalDataSet` from the core Kedro repo to `kedro-datasets` and renamed to `PartitionedDataset` and `IncrementalDataset`.
* Added `NetCDFDataset` for loading and saving `*.nc` files.

## Bug fixes and other changes
* Fix erroneous warning when using an cloud protocol file path with SparkDataSet on Databricks.
Expand Down
1 change: 1 addition & 0 deletions kedro-datasets/docs/source/kedro_datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ kedro_datasets
kedro_datasets.json.JSONDataSet
kedro_datasets.json.JSONDataset
kedro_datasets.matplotlib.MatplotlibWriter
kedro_datasets.netcdf.NetCDFDataset
kedro_datasets.networkx.GMLDataSet
kedro_datasets.networkx.GMLDataset
kedro_datasets.networkx.GraphMLDataSet
Expand Down
14 changes: 14 additions & 0 deletions kedro-datasets/kedro_datasets/netcdf/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""``NetCDFDataset`` is an ``AbstractDataset`` to save and load NetCDF files."""
from __future__ import annotations

from typing import Any

import lazy_loader as lazy

# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901
NetCDFDataset: type[NetCDFDataset]
NetCDFDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__, submod_attrs={"netcdf_dataset": ["NetCDFDataset"]}
)
222 changes: 222 additions & 0 deletions kedro-datasets/kedro_datasets/netcdf/netcdf_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
"""NetCDFDataset loads and saves data to a local netcdf (.nc) file."""
import logging
from copy import deepcopy
from glob import glob
from pathlib import Path, PurePosixPath
from typing import Any, Dict

import fsspec
import xarray as xr
from kedro.io.core import (
AbstractDataset,
DatasetError,
get_filepath_str,
get_protocol_and_path,
)

logger = logging.getLogger(__name__)


class NetCDFDataset(AbstractDataset):
"""``NetCDFDataset`` loads/saves data from/to a NetCDF file using an underlying
filesystem (e.g.: local, S3, GCS). It uses xarray to handle the NetCDF file.

Example usage for the
`YAML API <https://kedro.readthedocs.io/en/stable/data/\
data_catalog_yaml_examples.html>`_:

.. code-block:: yaml

single-file:
type: netcdf.NetCDFDataset
filepath: s3://bucket_name/path/to/folder/data.nc
save_args:
mode: a
load_args:
decode_times: False

multi-file:
type: netcdf.NetCDFDataset
filepath: s3://bucket_name/path/to/folder/data*.nc
load_args:
concat_dim: time
combine: nested
parallel: True

Example usage for the
`Python API <https://kedro.readthedocs.io/en/stable/data/\
advanced_data_catalog_usage.html>`_:

.. code-block:: pycon

>>> from kedro_datasets.netcdf import NetCDFDataset
>>> import xarray as xr
>>> ds = xr.DataArray(
... [0, 1, 2], dims=["x"], coords={"x": [0, 1, 2]}, name="data"
... ).to_dataset()
>>> dataset = NetCDFDataset(
... filepath="path/to/folder",
... save_args={"mode": "w"},
... )
>>> dataset.save(ds)
>>> reloaded = dataset.load()
"""

DEFAULT_LOAD_ARGS: Dict[str, Any] = {}
DEFAULT_SAVE_ARGS: Dict[str, Any] = {}

def __init__( # noqa
riley-brady marked this conversation as resolved.
Show resolved Hide resolved
self,
filepath: str,
temppath: str = None,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
credentials: Dict[str, Any] = None,
metadata: Dict[str, Any] = None,
):
"""Creates a new instance of ``NetCDFDataset`` pointing to a concrete NetCDF
file on a specific filesystem

Args:
filepath: Filepath in POSIX format to a NetCDF file prefixed with a
protocol like `s3://`. If prefix is not provided, `file` protocol
(local filesystem) will be used. The prefix should be any protocol
supported by ``fsspec``. It can also be a path to a glob. If a
glob is provided then it can be used for reading multiple NetCDF
files.
temppath: Local temporary directory, used when reading from remote storage,
since NetCDF files cannot be directly read from remote storage.
load_args: Additional options for loading NetCDF file(s).
Here you can find all available arguments when reading single file:
https://xarray.pydata.org/en/stable/generated/xarray.open_dataset.html
Here you can find all available arguments when reading multiple files:
https://xarray.pydata.org/en/stable/generated/xarray.open_mfdataset.html
All defaults are preserved.
save_args: Additional saving options for saving NetCDF file(s).
Here you can find all available arguments:
https://xarray.pydata.org/en/stable/generated/xarray.Dataset.to_netcdf.html
All defaults are preserved.
fs_args: Extra arguments to pass into underlying filesystem class
constructor (e.g. `{"cache_regions": "us-east-1"}` for
``s3fs.S3FileSystem``).
credentials: Credentials required to get access to the underlying filesystem.
E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
metadata: Any arbitrary metadata.
This is ignored by Kedro, but may be consumed by users or external plugins.
"""
self._fs_args = deepcopy(fs_args) or {}
self._credentials = deepcopy(credentials) or {}
self._temppath = Path(temppath) if temppath is not None else None
protocol, path = get_protocol_and_path(filepath)
if protocol == "file":
self._fs_args.setdefault("auto_mkdir", True)
elif protocol != "file" and self._temppath is None:
raise ValueError(
"Need to set temppath in catalog if NetCDF file exists on remote "
+ "filesystem"
)
self._protocol = protocol
self._filepath = PurePosixPath(path)

self._storage_options = {**self._credentials, **self._fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

self.metadata = metadata

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

# Determine if multiple NetCDF files are being loaded in.
self._is_multifile = True if "*" in str(self._filepath.stem) else False

def _load(self) -> xr.Dataset:
load_path = get_filepath_str(self._filepath, self._protocol)

# If NetCDF(s) are on any type of remote storage, need to sync to local to open.
# Kerchunk could be implemented here in the future for direct remote reading.
if self._protocol != "file":
riley-brady marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Syncing remote NetCDF file to local storage.")

# `get_filepath_str` drops remote protocol prefix.
load_path = self._protocol + "://" + load_path
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested with AWS and GCS, but not sure if this is generalized enough. get_filepath_str consistently drops the prefix of URI's from object storage, which is problematic for the subsequent fs.get

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah filepath mangling in fsspec is tricky... we have a related open issue about this kedro-org/kedro#3196

if self._is_multifile:
load_path = sorted(self._fs.glob(load_path))

self._fs.get(load_path, f"{self._temppath}/")
load_path = f"{self._temppath}/{self._filepath.stem}.nc"

if "*" in str(load_path):
data = xr.open_mfdataset(str(load_path), **self._load_args)
else:
data = xr.open_dataset(load_path, **self._load_args)

return data

def _save(self, data: xr.Dataset):
if self._is_multifile:
raise DatasetError(
"Globbed multifile datasets with '*' in filepath cannot be saved. "
+ "Create an alternate NetCDFDataset with a single .nc output file."
)
else:
save_path = get_filepath_str(self._filepath, self._protocol)

if self._protocol != "file":
# `get_filepath_str` drops remote protocol prefix.
save_path = self._protocol + "://" + save_path

bytes_buffer = data.to_netcdf(**self._save_args)

with self._fs.open(save_path, mode="wb") as fs_file:
fs_file.write(bytes_buffer)

self._invalidate_cache()

def _describe(self) -> Dict[str, Any]:
return dict(
filepath=self._filepath,
protocol=self._protocol,
load_args=self._load_args,
save_args=self._save_args,
)

def _exists(self) -> bool:
load_path = get_filepath_str(self._filepath, self._protocol)

if self._is_multifile:
files = self._fs.glob(load_path)
exists = True if files else False
else:
exists = self._fs.exists(load_path)

return exists

def _invalidate_cache(self):
"""Invalidate underlying filesystem caches."""
filepath = get_filepath_str(self._filepath, self._protocol)
self._fs.invalidate_cache(filepath)

def __del__(self):
"""Cleanup temporary directory"""
if self._temppath is not None:
logger.info("Deleting local temporary files.")
temp_filepath = self._temppath / self._filepath.stem
if self._is_multifile:
temp_files = glob(str(temp_filepath))
for file in temp_files:
try:
Path(file).unlink()
except FileNotFoundError: # pragma: no cover
pass # pragma: no cover
else:
temp_filepath = str(temp_filepath) + self._filepath.suffix
try:
Path(temp_filepath).unlink()
except FileNotFoundError:
pass
2 changes: 1 addition & 1 deletion kedro-datasets/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ version = {attr = "kedro_datasets.__version__"}
[tool.coverage.report]
fail_under = 100
show_missing = true
omit = ["tests/*", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*", "kedro_datasets/tensorflow/*", "kedro_datasets/__init__.py"]
omit = ["tests/*", "kedro_datasets/__init__.py", "kedro_datasets/holoviews/*", "kedro_datasets/snowflake/*", "kedro_datasets/tensorflow/*"]
exclude_lines = ["pragma: no cover", "raise NotImplementedError"]

[tool.pytest.ini_options]
Expand Down
19 changes: 16 additions & 3 deletions kedro-datasets/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ def _collect_requirements(requires):
}
holoviews_require = {"holoviews.HoloviewsWriter": ["holoviews~=1.13.0"]}
matplotlib_require = {"matplotlib.MatplotlibWriter": ["matplotlib>=3.0.3, <4.0"]}
netcdf_require = {
"netcdf.NetCDFDataset": [
"h5netcdf>=1.2.0",
astrojuanlu marked this conversation as resolved.
Show resolved Hide resolved
"netcdf4>=1.6.4",
"xarray<=0.20.2; python_version == '3.7'",
"xarray>=2023.1.0; python_version >= '3.8'",
]
}
networkx_require = {"networkx.NetworkXDataSet": ["networkx~=2.4"]}
pandas_require = {
"pandas.CSVDataSet": [PANDAS],
Expand Down Expand Up @@ -55,9 +63,11 @@ def _collect_requirements(requires):
}
polars_require = {
"polars.CSVDataSet": [POLARS],
"polars.GenericDataSet":
[
POLARS, "pyarrow>=4.0", "xlsx2csv>=0.8.0", "deltalake >= 0.6.2"
"polars.GenericDataSet": [
POLARS,
"pyarrow>=4.0",
"xlsx2csv>=0.8.0",
"deltalake >= 0.6.2",
],
}
redis_require = {"redis.PickleDataSet": ["redis~=4.1"]}
Expand Down Expand Up @@ -94,6 +104,7 @@ def _collect_requirements(requires):
"geopandas": _collect_requirements(geopandas_require),
"holoviews": _collect_requirements(holoviews_require),
"matplotlib": _collect_requirements(matplotlib_require),
"netcdf": _collect_requirements(netcdf_require),
"networkx": _collect_requirements(networkx_require),
"pandas": _collect_requirements(pandas_require),
"pickle": _collect_requirements(pickle_require),
Expand Down Expand Up @@ -217,6 +228,8 @@ def _collect_requirements(requires):
"tensorflow~=2.0; platform_system != 'Darwin' or platform_machine != 'arm64'",
"triad>=0.6.7, <1.0",
"trufflehog~=2.1",
"xarray<=0.20.2; python_version == '3.7'",
"xarray>=2023.1.0; python_version >= '3.8'",
"xlsxwriter~=1.0",
]

Expand Down
Empty file.
Loading
Loading