From 8fb01ef28370e6900caae36906c0d653567f3e0a Mon Sep 17 00:00:00 2001 From: Afaque Ahmad Date: Fri, 21 Jul 2023 20:44:12 +0800 Subject: [PATCH] feat(datasets): Added `DeltaTableDataSet` (#243) * feat: added delta table dataset Signed-off-by: Afaque Ahmad * test: lint Signed-off-by: k9 * chore: adjusted docstring line length Signed-off-by: Afaque Ahmad * chore: fix requirements order Signed-off-by: k9 * chore: add .pylintrc to ignore line too long for url Signed-off-by: k9 * chore: remove invalid noqa comment Signed-off-by: k9 * fix: do not import TableNotFoundError from private module to avoid pylint complains Signed-off-by: k9 * fix: fixed linting issues Signed-off-by: Afaque Ahmad * Move pylintrc to pyproject.toml Signed-off-by: Nok * Fix pylint config Signed-off-by: Nok * test: use mocker fixture to replace unittest.mock Signed-off-by: k9 * chore: lint for line too long Signed-off-by: k9 * test: increase coverage for pandas delta table dataset Signed-off-by: Kyle Chung * chore: lint Signed-off-by: Kyle Chung --------- Signed-off-by: Afaque Ahmad Signed-off-by: k9 Signed-off-by: Afaque Ahmad Signed-off-by: Nok Signed-off-by: Kyle Chung Co-authored-by: k9 Co-authored-by: Nok --- .../kedro_datasets/pandas/__init__.py | 3 + .../pandas/deltatable_dataset.py | 258 ++++++++++++++++++ kedro-datasets/pyproject.toml | 4 + .../tests/pandas/test_deltatable_dataset.py | 168 ++++++++++++ 4 files changed, 433 insertions(+) create mode 100644 kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py create mode 100644 kedro-datasets/tests/pandas/test_deltatable_dataset.py diff --git a/kedro-datasets/kedro_datasets/pandas/__init__.py b/kedro-datasets/kedro_datasets/pandas/__init__.py index b84015d1d..2ac29379a 100644 --- a/kedro-datasets/kedro_datasets/pandas/__init__.py +++ b/kedro-datasets/kedro_datasets/pandas/__init__.py @@ -2,6 +2,7 @@ __all__ = [ "CSVDataSet", + "DeltaTableDataSet", "ExcelDataSet", "FeatherDataSet", "GBQTableDataSet", @@ -19,6 +20,8 @@ with suppress(ImportError): from .csv_dataset import CSVDataSet +with suppress(ImportError): + from .deltatable_dataset import DeltaTableDataSet with suppress(ImportError): from .excel_dataset import ExcelDataSet with suppress(ImportError): diff --git a/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py b/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py new file mode 100644 index 000000000..16eefca25 --- /dev/null +++ b/kedro-datasets/kedro_datasets/pandas/deltatable_dataset.py @@ -0,0 +1,258 @@ +"""``DeltaTableDataSet`` loads/saves delta tables from/to a filesystem (e.g.: local, +S3, GCS), Databricks unity catalog and AWS Glue catalog respectively. It handles +load and save using a pandas dataframe. +""" +from copy import deepcopy +from typing import Any, Dict, List, Optional + +import pandas as pd +from deltalake import DataCatalog, DeltaTable, Metadata +from deltalake.exceptions import TableNotFoundError +from deltalake.writer import write_deltalake +from kedro.io.core import AbstractDataSet, DataSetError + + +class DeltaTableDataSet(AbstractDataSet): # pylint:disable=too-many-instance-attributes + """``DeltaTableDataSet`` loads/saves delta tables from/to a filesystem (e.g.: local, + S3, GCS), Databricks unity catalog and AWS Glue catalog respectively. It handles + load and save using a pandas dataframe. When saving data, you can specify one of two + modes: overwrite(default), append. If you wish to alter the schema as a part of + overwrite, pass overwrite_schema=True. You can overwrite a specific partition by using + mode=overwrite together with partition_filters. This will remove all files within the + matching partition and insert your data as new files. + + Example usage for the `YAML API`_: + + .. code-block:: yaml + + boats_filesystem: + type: pandas.DeltaTableDataSet + filepath: data/01_raw/boats + credentials: dev_creds + load_args: + version: 7 + save_args: + mode: overwrite + + boats_databricks_unity_catalog: + type: pandas.DeltaTableDataSet + credentials: dev_creds + catalog_type: UNITY + database: simple_database + table: simple_table + save_args: + mode: overwrite + + trucks_aws_glue_catalog: + type: pandas.DeltaTableDataSet + credentials: dev_creds + catalog_type: AWS + catalog_name: main + database: db_schema + table: db_table + save_args: + mode: overwrite + + Example usage for the `Python API`_: + :: + + >>> from kedro_datasets.pandas import DeltaTableDataSet + >>> import pandas as pd + >>> + >>> data = pd.DataFrame({'col1': [1, 2], 'col2': [4, 5], 'col3': [5, 6]}) + >>> data_set = DeltaTableDataSet(filepath="test") + >>> + >>> data_set.save(data) + >>> reloaded = data_set.load() + >>> assert data.equals(reloaded) + >>> + >>> new_data = pd.DataFrame({'col1': [7, 8], 'col2': [9, 10], 'col3': [11, 12]}) + >>> data_set.save(new_data) + >>> data_set.get_loaded_version() + + """ + + DEFAULT_WRITE_MODE = "overwrite" + ACCEPTED_WRITE_MODES = ("overwrite", "append") + + DEFAULT_LOAD_ARGS: Dict[str, Any] = {} + DEFAULT_SAVE_ARGS: Dict[str, Any] = {"mode": DEFAULT_WRITE_MODE} + + def __init__( # pylint: disable=too-many-arguments + self, + filepath: Optional[str] = None, + catalog_type: Optional[DataCatalog] = None, + catalog_name: Optional[str] = None, + database: Optional[str] = None, + table: Optional[str] = None, + load_args: Optional[Dict[str, Any]] = None, + save_args: Optional[Dict[str, Any]] = None, + credentials: Optional[Dict[str, Any]] = None, + fs_args: Optional[Dict[str, Any]] = None, + ) -> None: + """Creates a new instance of ``DeltaTableDataSet`` + + Args: + filepath (str): Filepath to a delta lake file with the following accepted protocol: + ``S3``: + `s3:///` + `s3a:///` + ``Azure``: + `az:///` + `adl:///` + `abfs:///` + ``GCS``: + `gs:///` + If any of the prefix above is not provided, `file` protocol (local filesystem) + will be used. + catalog_type (DataCatalog, optional): `AWS` or `UNITY` if filepath is not provided. + Defaults to None. + catalog_name (str, optional): the name of catalog in AWS Glue or Databricks Unity. + Defaults to None. + database (str, optional): the name of the database (also referred to as schema). + Defaults to None. + table (str, optional): the name of the table. + load_args (Dict[str, Any], optional): Additional options for loading file(s) + into DeltaTableDataSet. `load_args` accepts `version` to load the appropriate + version when loading from a filesystem. + save_args (Dict[str, Any], optional): Additional saving options for saving into + Delta lake. Here you can find all available arguments: + https://delta-io.github.io/delta-rs/python/api_reference.html#writing-deltatables + credentials (Dict[str, Any], optional): Credentials required to get access to + the underlying filesystem. E.g. for ``GCSFileSystem`` it should look like + `{"token": None}`. + fs_args (Dict[str, Any], optional): Extra arguments to pass into underlying + filesystem class constructor. + (e.g. `{"project": "my-project"}` for ``GCSFileSystem``). + Raises: + DataSetError: Invalid configuration supplied (through DeltaTableDataSet validation) + """ + self._filepath = filepath + self._catalog_type = catalog_type + self._catalog_name = catalog_name + self._database = database + self._table = table + self._fs_args = deepcopy(fs_args) or {} + self._credentials = deepcopy(credentials) or {} + + # DeltaTable cannot be instantiated from an empty directory + # for the first time creation from filepath, we need to delay the instantiation + self.is_empty_dir: bool = False + self._delta_table: Optional[DeltaTable] = None + + self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args: + self._load_args.update(load_args) + + self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args: + self._save_args.update(save_args) + + write_mode = self._save_args.get("mode", None) + if write_mode not in self.ACCEPTED_WRITE_MODES: + raise DataSetError( + f"Write mode {write_mode} is not supported, " + f"Please use any of the following accepted modes " + f"{self.ACCEPTED_WRITE_MODES}" + ) + + self._version = self._load_args.get("version", None) + + if self._filepath and self._catalog_type: + raise DataSetError( + "DeltaTableDataSet can either load from " + "filepath or catalog_type. Please provide " + "one of either filepath or catalog_type." + ) + + if self._filepath: + try: + self._delta_table = DeltaTable( + table_uri=self._filepath, + storage_options=self.fs_args, + version=self._version, + ) + except TableNotFoundError: + self.is_empty_dir = True + else: + self._delta_table = DeltaTable.from_data_catalog( + data_catalog=DataCatalog[self._catalog_type], + data_catalog_id=self._catalog_name, + database_name=self._database, + table_name=self._table, + ) + + @property + def fs_args(self) -> Dict[str, Any]: + """Appends and returns filesystem credentials to fs_args.""" + fs_args = deepcopy(self._fs_args) + fs_args.update(self._credentials) + return fs_args + + @property + def schema(self) -> Dict[str, Any]: + """Returns the schema of the DeltaTableDataSet as a dictionary.""" + return self._delta_table.schema().json() + + @property + def metadata(self) -> Metadata: + """Returns the metadata of the DeltaTableDataSet as a dictionary. + Metadata contains the following: + 1. A unique id + 2. A name, if provided + 3. A description, if provided + 4. The list of partition_columns. + 5. The created_time of the table + 6. A map of table configuration. This includes fields such as delta.appendOnly, + which if true indicates the table is not meant to have data deleted from it. + + Returns: Metadata object containing the above metadata attributes. + """ + return self._delta_table.metadata() + + @property + def history(self) -> List[Dict[str, Any]]: + """Returns the history of actions on DeltaTableDataSet as a list of dictionaries.""" + return self._delta_table.history() + + def get_loaded_version(self) -> int: + """Returns the version of the DeltaTableDataSet that is currently loaded.""" + return self._delta_table.version() + + def _load(self) -> pd.DataFrame: + return self._delta_table.to_pandas() + + def _save(self, data: pd.DataFrame) -> None: + if self.is_empty_dir: + # first time creation of delta table + write_deltalake( + self._filepath, + data, + storage_options=self.fs_args, + **self._save_args, + ) + self.is_empty_dir = False + self._delta_table = DeltaTable( + table_uri=self._filepath, + storage_options=self.fs_args, + version=self._version, + ) + else: + write_deltalake( + self._delta_table, + data, + storage_options=self.fs_args, + **self._save_args, + ) + + def _describe(self) -> Dict[str, Any]: + return { + "filepath": self._filepath, + "catalog_type": self._catalog_type, + "catalog_name": self._catalog_name, + "database": self._database, + "table": self._table, + "load_args": self._load_args, + "save_args": self._save_args, + "version": self._version, + } diff --git a/kedro-datasets/pyproject.toml b/kedro-datasets/pyproject.toml index 36c74f436..d8d1d654c 100644 --- a/kedro-datasets/pyproject.toml +++ b/kedro-datasets/pyproject.toml @@ -52,6 +52,7 @@ pandas = [ pandas-base = ["pandas>=1.3, <3.0"] pandas-csvdataset = ["kedro-datasets[pandas-base]"] pandas-exceldataset = ["kedro-datasets[pandas-base]", "openpyxl>=3.0.6, <4.0"] +pandas-deltatabledataset= ["kedro-datasets[pandas-base]", "deltalake>=0.10.0"] pandas-featherdataset = ["kedro-datasets[pandas-base]"] pandas-gbqquerydataset = [ "kedro-datasets[pandas-base]", @@ -157,6 +158,7 @@ test = [ "coverage[toml]", "dask[complete]", "delta-spark~=1.2.1", + "deltalake>=0.10.0", # 1.2.0 has a bug that breaks some of our tests: https://github.com/delta-io/delta/issues/1070 "dill~=0.3.1", "filelock>=3.4.0, <4.0", @@ -249,6 +251,8 @@ enable = ["useless-suppression"] max-nested-blocks = 5 [tool.pylint.format] +# Regexp for a line that is allowed to be longer than the limit. +ignore-long-lines='^\s*(# )??$' indent-after-paren = 4 indent-string = " " diff --git a/kedro-datasets/tests/pandas/test_deltatable_dataset.py b/kedro-datasets/tests/pandas/test_deltatable_dataset.py new file mode 100644 index 000000000..ac75fc1ff --- /dev/null +++ b/kedro-datasets/tests/pandas/test_deltatable_dataset.py @@ -0,0 +1,168 @@ +import pandas as pd +import pytest +from deltalake import DataCatalog, Metadata +from kedro.io import DataSetError +from pandas.testing import assert_frame_equal + +from kedro_datasets.pandas import DeltaTableDataSet + + +@pytest.fixture +def filepath(tmp_path): + return (tmp_path / "test-delta-table").as_posix() + + +@pytest.fixture +def dummy_df(): + return pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]}) + + +@pytest.fixture +def deltatable_data_set_from_path(filepath, load_args, save_args, fs_args): + return DeltaTableDataSet( + filepath=filepath, + load_args=load_args, + save_args=save_args, + fs_args=fs_args, + ) + + +class TestDeltaTableDataSet: + def test_save_to_empty_dir(self, deltatable_data_set_from_path, dummy_df): + """Test saving to an empty directory (first time creation of delta table).""" + deltatable_data_set_from_path.save(dummy_df) + reloaded = deltatable_data_set_from_path.load() + assert_frame_equal(dummy_df, reloaded) + + def test_overwrite_with_same_schema(self, deltatable_data_set_from_path, dummy_df): + """Test saving with the default overwrite mode with new data of same schema.""" + deltatable_data_set_from_path.save(dummy_df) + new_df = pd.DataFrame({"col1": [0, 0], "col2": [1, 1], "col3": [2, 2]}) + deltatable_data_set_from_path.save(new_df) + reloaded = deltatable_data_set_from_path.load() + assert_frame_equal(new_df, reloaded) + + def test_overwrite_with_diff_schema(self, deltatable_data_set_from_path, dummy_df): + """Test saving with the default overwrite mode with new data of diff schema.""" + deltatable_data_set_from_path.save(dummy_df) + new_df = pd.DataFrame({"new_col": [1, 2]}) + pattern = "Schema of data does not match table schema" + with pytest.raises(DataSetError, match=pattern): + deltatable_data_set_from_path.save(new_df) + + @pytest.mark.parametrize("save_args", [{"overwrite_schema": True}], indirect=True) + def test_overwrite_both_data_and_schema( + self, deltatable_data_set_from_path, dummy_df + ): + """Test saving to overwrite both data and schema.""" + deltatable_data_set_from_path.save(dummy_df) + new_df = pd.DataFrame({"new_col": [1, 2]}) + deltatable_data_set_from_path.save(new_df) + reloaded = deltatable_data_set_from_path.load() + assert_frame_equal(new_df, reloaded) + + @pytest.mark.parametrize("save_args", [{"mode": "append"}], indirect=True) + def test_append(self, deltatable_data_set_from_path, dummy_df): + """Test saving by appending new data.""" + deltatable_data_set_from_path.save(dummy_df) + new_df = pd.DataFrame({"col1": [0, 0], "col2": [1, 1], "col3": [2, 2]}) + appended = pd.concat([dummy_df, new_df], ignore_index=True) + deltatable_data_set_from_path.save(new_df) + reloaded = deltatable_data_set_from_path.load() + assert_frame_equal(appended, reloaded) + + def test_versioning(self, filepath, dummy_df): + """Test loading different versions.""" + deltatable_data_set_from_path = DeltaTableDataSet(filepath) + deltatable_data_set_from_path.save(dummy_df) + assert deltatable_data_set_from_path.get_loaded_version() == 0 + new_df = pd.DataFrame({"col1": [0, 0], "col2": [1, 1], "col3": [2, 2]}) + deltatable_data_set_from_path.save(new_df) + assert deltatable_data_set_from_path.get_loaded_version() == 1 + + deltatable_data_set_from_path0 = DeltaTableDataSet( + filepath, load_args={"version": 0} + ) + version_0 = deltatable_data_set_from_path0.load() + assert deltatable_data_set_from_path0.get_loaded_version() == 0 + assert_frame_equal(dummy_df, version_0) + + deltatable_data_set_from_path1 = DeltaTableDataSet( + filepath, load_args={"version": 1} + ) + version_1 = deltatable_data_set_from_path1.load() + assert deltatable_data_set_from_path1.get_loaded_version() == 1 + assert_frame_equal(new_df, version_1) + + def test_filepath_and_catalog_both_exist(self, filepath): + """Test when both filepath and catalog are provided.""" + with pytest.raises(DataSetError): + DeltaTableDataSet(filepath=filepath, catalog_type="AWS") + + def test_property_schema(self, deltatable_data_set_from_path, dummy_df): + """Test the schema property to return the underlying delta table schema.""" + deltatable_data_set_from_path.save(dummy_df) + s1 = deltatable_data_set_from_path.schema + s2 = deltatable_data_set_from_path._delta_table.schema().json() + assert s1 == s2 + + def test_describe(self, filepath): + """Test the describe method.""" + deltatable_data_set_from_path = DeltaTableDataSet(filepath) + desc = deltatable_data_set_from_path._describe() + assert desc["filepath"] == filepath + assert desc["version"] is None + + def test_from_aws_glue_catalog(self, mocker): + """Test dataset creation from AWS Glue catalog.""" + mock_delta_table = mocker.patch( + "kedro_datasets.pandas.deltatable_dataset.DeltaTable" + ) + _ = DeltaTableDataSet(catalog_type="AWS", database="db", table="tbl") + mock_delta_table.from_data_catalog.assert_called_once() + mock_delta_table.from_data_catalog.assert_called_with( + data_catalog=DataCatalog.AWS, + data_catalog_id=None, + database_name="db", + table_name="tbl", + ) + + def test_from_databricks_unity_catalog(self, mocker): + """Test dataset creation from Databricks Unity catalog.""" + mock_delta_table = mocker.patch( + "kedro_datasets.pandas.deltatable_dataset.DeltaTable" + ) + _ = DeltaTableDataSet( + catalog_type="UNITY", catalog_name="id", database="db", table="tbl" + ) + mock_delta_table.from_data_catalog.assert_called_once() + mock_delta_table.from_data_catalog.assert_called_with( + data_catalog=DataCatalog.UNITY, + data_catalog_id="id", + database_name="db", + table_name="tbl", + ) + + def test_from_unsupported_catalog(self): + """Test dataset creation from unsupported catalog.""" + with pytest.raises(KeyError): + DeltaTableDataSet(catalog_type="unsupported", database="db", table="tbl") + + def test_unsupported_write_mode(self, filepath): + """Test write mode not supported.""" + pattern = "Write mode unsupported is not supported" + with pytest.raises(DataSetError, match=pattern): + DeltaTableDataSet(filepath, save_args={"mode": "unsupported"}) + + def test_metadata(self, deltatable_data_set_from_path, dummy_df): + """Test metadata property exists and return a metadata object.""" + deltatable_data_set_from_path.save(dummy_df) + metadata = deltatable_data_set_from_path.metadata + assert isinstance(metadata, Metadata) + + def test_history(self, deltatable_data_set_from_path, dummy_df): + """Test history property exists with a create table operation.""" + deltatable_data_set_from_path.save(dummy_df) + history = deltatable_data_set_from_path.history + assert isinstance(history, list) + assert history[0]["operation"] == "CREATE TABLE"