Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(datasets): create separate ibis.FileDataset #842

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kedro-datasets/docs/source/api/kedro_datasets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ kedro_datasets
holoviews.HoloviewsWriter
huggingface.HFDataset
huggingface.HFTransformerPipelineDataset
ibis.FileDataset
ibis.TableDataset
json.JSONDataset
matlab.MatlabDataset
Expand Down
4 changes: 3 additions & 1 deletion kedro-datasets/kedro_datasets/ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import lazy_loader as lazy

# https://github.com/pylint-dev/pylint/issues/4300#issuecomment-1043601901
FileDataset: Any
TableDataset: Any

__getattr__, __dir__, __all__ = lazy.attach(
__name__, submod_attrs={"table_dataset": ["TableDataset"]}
__name__,
submod_attrs={"file_dataset": ["FileDataset"], "table_dataset": ["TableDataset"]},
)
184 changes: 184 additions & 0 deletions kedro-datasets/kedro_datasets/ibis/file_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""Provide file loading and saving functionality for Ibis's backends."""
from __future__ import annotations

from copy import deepcopy
from pathlib import Path, PurePosixPath
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractVersionedDataset, DatasetError, Version

if TYPE_CHECKING:
from ibis import BaseBackend


class FileDataset(AbstractVersionedDataset[ir.Table, ir.Table]):
"""``FileDataset`` loads/saves data from/to a specified file format.

Example usage for the
`YAML API <https://docs.kedro.org/en/stable/data/data_catalog_yaml_examples.html>`_:

.. code-block:: yaml

cars:
type: ibis.TableDataset
filepath: data/01_raw/company/cars.csv
file_format: csv
table_name: cars
connection:
backend: duckdb
database: company.db
load_args:
sep: ","
nullstr: "#NA"
save_args:
sep: ","
nullstr: "#NA"

Example usage for the
`Python API <https://docs.kedro.org/en/stable/data/\
advanced_data_catalog_usage.html>`_:

.. code-block:: pycon

>>> import ibis
>>> from kedro_datasets.ibis import FileDataset
>>>
>>> data = ibis.memtable({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
>>>
>>> dataset = FileDataset(
... filepath=tmp_path / "test.csv",
... file_format="csv",
... table_name="test",
... connection={"backend": "duckdb", "database": tmp_path / "file.db"},
... )
>>> dataset.save(data)
>>> reloaded = dataset.load()
>>> assert data.execute().equals(reloaded.execute())

"""

DEFAULT_LOAD_ARGS: ClassVar[dict[str, Any]] = {}
DEFAULT_SAVE_ARGS: ClassVar[dict[str, Any]] = {}

_connections: ClassVar[dict[tuple[tuple[str, str]], BaseBackend]] = {}

def __init__( # noqa: PLR0913
self,
filepath: str,
file_format: str,
*,
table_name: str | None = None,
connection: dict[str, Any] | None = None,
load_args: dict[str, Any] | None = None,
save_args: dict[str, Any] | None = None,
version: Version | None = None,
metadata: dict[str, Any] | None = None,
) -> None:
"""Creates a new ``FileDataset`` pointing to the given filepath.

``FileDataset`` connects to the Ibis backend object constructed
from the connection configuration. The `backend` key provided in
the config can be any of the `supported backends <https://ibis-\
project.org/install>`_. The remaining dictionary entries will be
passed as arguments to the underlying ``connect()`` method (e.g.
`ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\
#ibis.duckdb.connect>`_).

The read method corresponding to the given ``file_format`` (e.g.
`read_csv() <https://ibis-project.org/backends/\
duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load
the file with the backend. Note that only the data is loaded; no
link to the underlying file exists past ``FileDataset.load()``.

Args:
filepath: Path to a file to register as a table. Most useful
for loading data into your data warehouse (for testing).
On save, the backend exports data to the specified path.
file_format: String specifying the file format for the file.
table_name: The name to use for the created table (on load).
connection: Configuration for connecting to an Ibis backend.
load_args: Additional arguments passed to the Ibis backend's
`read_{file_format}` method.
save_args: Additional arguments passed to the Ibis backend's
`to_{file_format}` method.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
metadata: Any arbitrary metadata. This is ignored by Kedro,
but may be consumed by users or external plugins.
"""
self._filepath = filepath
self._file_format = file_format
self._table_name = table_name
self._connection_config = connection
self.metadata = metadata

super().__init__(
filepath=PurePosixPath(filepath),
version=version,
exists_function=lambda filepath: Path(filepath).exists(),
)

# Set load and save arguments, overwriting defaults if provided.
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)

self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

@property
def connection(self) -> BaseBackend:
def hashable(value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this section needs comments because it's super clever and complicated, but someone else maintaining the class won't be able to understand what's going on without a lot of reverse engineering.

if isinstance(value, dict):
return tuple((k, hashable(v)) for k, v in sorted(value.items()))
if isinstance(value, list):
return tuple(hashable(x) for x in value)
return value

cls = type(self)
key = hashable(self._connection_config)
if key not in cls._connections:
import ibis

config = deepcopy(self._connection_config)
backend_attr = config.pop("backend") if config else None
backend = getattr(ibis, backend_attr)
cls._connections[key] = backend.connect(**config)

return cls._connections[key]

def _load(self) -> ir.Table:
load_path = self._get_load_path()
reader = getattr(self.connection, f"read_{self._file_format}")
return reader(load_path, self._table_name, **self._load_args)

def _save(self, data: ir.Table) -> None:
save_path = self._get_save_path()
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
writer = getattr(self.connection, f"to_{self._file_format}")
writer(data, save_path, **self._save_args)

def _describe(self) -> dict[str, Any]:
return {
"filepath": self._filepath,
"file_format": self._file_format,
"table_name": self._table_name,
"backend": self._connection_config.get("backend")
if self._connection_config
else None,
"load_args": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _exists(self) -> bool:
try:
load_path = self._get_load_path()
except DatasetError:
return False

return Path(load_path).exists()
26 changes: 12 additions & 14 deletions kedro-datasets/kedro_datasets/ibis/table_dataset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Provide data loading and saving functionality for Ibis's backends."""
from __future__ import annotations

import warnings
from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar

import ibis.expr.types as ir
from kedro.io import AbstractDataset, DatasetError

from kedro_datasets import KedroDeprecationWarning

if TYPE_CHECKING:
from ibis import BaseBackend

Expand All @@ -21,15 +24,10 @@ class TableDataset(AbstractDataset[ir.Table, ir.Table]):

cars:
type: ibis.TableDataset
filepath: data/01_raw/company/cars.csv
file_format: csv
table_name: cars
connection:
backend: duckdb
database: company.db
load_args:
sep: ","
nullstr: "#NA"
save_args:
materialized: table

Expand Down Expand Up @@ -91,12 +89,6 @@ def __init__( # noqa: PLR0913
`ibis.duckdb.connect() <https://ibis-project.org/backends/duckdb\
#ibis.duckdb.connect>`_).

If ``filepath`` and ``file_format`` are given, the corresponding
read method (e.g. `read_csv() <https://ibis-project.org/backends/\
duckdb#ibis.backends.duckdb.Backend.read_csv>`_) is used to load
the file with the backend. Note that only the data is loaded; no
link to the underlying file exists past ``TableDataset.load()``.

If ``table_name`` is given (and ``filepath`` isn't), the dataset
establishes a connection to the relevant table for the execution
backend. Therefore, Ibis doesn't fetch data on load; all compute
Expand All @@ -105,9 +97,6 @@ def __init__( # noqa: PLR0913
is saved, after running code defined across one more more nodes.

Args:
filepath: Path to a file to register as a table. Most useful
for loading data into your data warehouse (for testing).
file_format: Specifies the input file format for `filepath`.
table_name: The name of the table or view to read or create.
connection: Configuration for connecting to an Ibis backend.
load_args: Additional arguments passed to the Ibis backend's
Expand All @@ -125,6 +114,15 @@ def __init__( # noqa: PLR0913
"Must provide at least one of `filepath` or `table_name`."
)

if filepath is not None or file_format is not None:
warnings.warn(
"Use 'FileDataset' to load and save files with an Ibis "
"backend; the functionality will be removed from 'Table"
"Dataset' in Kedro-Datasets 6.0.0",
KedroDeprecationWarning,
stacklevel=2,
)

self._filepath = filepath
self._file_format = file_format
self._table_name = table_name
Expand Down
Loading
Loading