From 06cf752c6dee4f4eb96797eb5507f2ca5f8b5a2f Mon Sep 17 00:00:00 2001 From: Janick <128086870+janickspirig@users.noreply.github.com> Date: Thu, 26 Sep 2024 06:04:39 -0300 Subject: [PATCH] fix(datasets): Replace deprecated GBQDataset load/save funcs (#826) * fix:use_pd_gbq Signed-off-by: janick_spirig * fix:updated_readme Signed-off-by: janick_spirig * fix:updated_args Signed-off-by: janick_spirig * fix:updated_args_in_test Signed-off-by: janick_spirig * fix:linting Signed-off-by: janick_spirig --------- Signed-off-by: janick_spirig Co-authored-by: Merel Theisen <49397448+merelcht@users.noreply.github.com> Co-authored-by: Ankita Katiyar <110245118+ankatiyar@users.noreply.github.com> --- kedro-datasets/RELEASE.md | 2 + .../kedro_datasets/pandas/gbq_dataset.py | 20 +++++---- .../tests/pandas/test_gbq_dataset.py | 43 +++++++++++++------ 3 files changed, 44 insertions(+), 21 deletions(-) diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index 109d4e6fe..30af78fe9 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -14,12 +14,14 @@ ## Bug fixes and other changes * Refactored all datasets to set `fs_args` defaults in the same way as `load_args` and `save_args` and not have hardcoded values in the save methods. +* Fixed deprecated load and save approaches of GBQTableDataset and GBQQueryDataset by invoking save and load directly over `pandas-gbq` lib ## Breaking Changes ## Community contributions Many thanks to the following Kedroids for contributing PRs to this release: * [Brandon Meek](https://github.com/bpmeek) * [yury-fedotov](https://github.com/yury-fedotov) +* [janickspirig](https://github.com/janickspirig) # Release 4.1.0 diff --git a/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py b/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py index f16f828f7..e7ed3c2df 100644 --- a/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py +++ b/kedro-datasets/kedro_datasets/pandas/gbq_dataset.py @@ -10,6 +10,7 @@ import fsspec import pandas as pd +import pandas_gbq as pd_gbq from google.cloud import bigquery from google.cloud.exceptions import NotFound from google.oauth2.credentials import Credentials @@ -138,16 +139,17 @@ def _describe(self) -> dict[str, Any]: def _load(self) -> pd.DataFrame: sql = f"select * from {self._dataset}.{self._table_name}" # nosec - self._load_args.setdefault("query", sql) - return pd.read_gbq( + self._load_args.setdefault("query_or_table", sql) + return pd_gbq.read_gbq( project_id=self._project_id, credentials=self._credentials, **self._load_args, ) def _save(self, data: pd.DataFrame) -> None: - data.to_gbq( - f"{self._dataset}.{self._table_name}", + pd_gbq.to_gbq( + dataframe=data, + destination_table=f"{self._dataset}.{self._table_name}", project_id=self._project_id, credentials=self._credentials, **self._save_args, @@ -176,7 +178,7 @@ def _validate_location(self): class GBQQueryDataset(AbstractDataset[None, pd.DataFrame]): """``GBQQueryDataset`` loads data from a provided SQL query from Google - BigQuery. It uses ``pandas.read_gbq`` which itself uses ``pandas-gbq`` + BigQuery. It uses ``pandas_gbq.read_gbq`` which itself uses ``pandas-gbq`` internally to read from BigQuery table. Therefore it supports all allowed pandas options on ``read_gbq``. @@ -274,7 +276,7 @@ def __init__( # noqa: PLR0913 # load sql query from arg or from file if sql: - self._load_args["query"] = sql + self._load_args["query_or_table"] = sql self._filepath = None else: # filesystem for loading sql file @@ -291,7 +293,7 @@ def __init__( # noqa: PLR0913 def _describe(self) -> dict[str, Any]: load_args = copy.deepcopy(self._load_args) desc = {} - desc["sql"] = str(load_args.pop("query", None)) + desc["sql"] = str(load_args.pop("query_or_table", None)) desc["filepath"] = str(self._filepath) desc["load_args"] = str(load_args) @@ -303,9 +305,9 @@ def _load(self) -> pd.DataFrame: if self._filepath: load_path = get_filepath_str(PurePosixPath(self._filepath), self._protocol) with self._fs.open(load_path, mode="r") as fs_file: - load_args["query"] = fs_file.read() + load_args["query_or_table"] = fs_file.read() - return pd.read_gbq( + return pd_gbq.read_gbq( project_id=self._project_id, credentials=self._credentials, **load_args, diff --git a/kedro-datasets/tests/pandas/test_gbq_dataset.py b/kedro-datasets/tests/pandas/test_gbq_dataset.py index a797708ae..63095b74e 100644 --- a/kedro-datasets/tests/pandas/test_gbq_dataset.py +++ b/kedro-datasets/tests/pandas/test_gbq_dataset.py @@ -95,7 +95,9 @@ def test_save_extra_params(self, gbq_dataset, save_args): def test_load_missing_file(self, gbq_dataset, mocker): """Check the error when trying to load missing table.""" pattern = r"Failed while loading data from data set GBQTableDataset\(.*\)" - mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq") + mocked_read_gbq = mocker.patch( + "kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq" + ) mocked_read_gbq.side_effect = ValueError with pytest.raises(DatasetError, match=pattern): gbq_dataset.load() @@ -133,30 +135,43 @@ def test_save_load_data(self, gbq_dataset, dummy_dataframe, mocker): """Test saving and reloading the data set.""" sql = f"select * from {DATASET}.{TABLE_NAME}" table_id = f"{DATASET}.{TABLE_NAME}" - mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq") + mocked_to_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd_gbq.to_gbq") + mocked_read_gbq = mocker.patch( + "kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq" + ) mocked_read_gbq.return_value = dummy_dataframe mocked_df = mocker.Mock() gbq_dataset.save(mocked_df) loaded_data = gbq_dataset.load() - mocked_df.to_gbq.assert_called_once_with( - table_id, project_id=PROJECT, credentials=None, progress_bar=False + mocked_to_gbq.assert_called_once_with( + dataframe=mocked_df, + destination_table=table_id, + project_id=PROJECT, + credentials=None, + progress_bar=False, ) mocked_read_gbq.assert_called_once_with( - project_id=PROJECT, credentials=None, query=sql + project_id=PROJECT, credentials=None, query_or_table=sql ) assert_frame_equal(dummy_dataframe, loaded_data) - @pytest.mark.parametrize("load_args", [{"query": "Select 1"}], indirect=True) + @pytest.mark.parametrize( + "load_args", [{"query_or_table": "Select 1"}], indirect=True + ) def test_read_gbq_with_query(self, gbq_dataset, dummy_dataframe, mocker, load_args): """Test loading data set with query in the argument.""" - mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq") + mocked_read_gbq = mocker.patch( + "kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq" + ) mocked_read_gbq.return_value = dummy_dataframe loaded_data = gbq_dataset.load() mocked_read_gbq.assert_called_once_with( - project_id=PROJECT, credentials=None, query=load_args["query"] + project_id=PROJECT, + credentials=None, + query_or_table=load_args["query_or_table"], ) assert_frame_equal(dummy_dataframe, loaded_data) @@ -239,26 +254,30 @@ def test_credentials_propagation(self, mocker): def test_load(self, mocker, gbq_sql_dataset, dummy_dataframe): """Test `load` method invocation""" - mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq") + mocked_read_gbq = mocker.patch( + "kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq" + ) mocked_read_gbq.return_value = dummy_dataframe loaded_data = gbq_sql_dataset.load() mocked_read_gbq.assert_called_once_with( - project_id=PROJECT, credentials=None, query=SQL_QUERY + project_id=PROJECT, credentials=None, query_or_table=SQL_QUERY ) assert_frame_equal(dummy_dataframe, loaded_data) def test_load_query_file(self, mocker, gbq_sql_file_dataset, dummy_dataframe): """Test `load` method invocation using a file as input query""" - mocked_read_gbq = mocker.patch("kedro_datasets.pandas.gbq_dataset.pd.read_gbq") + mocked_read_gbq = mocker.patch( + "kedro_datasets.pandas.gbq_dataset.pd_gbq.read_gbq" + ) mocked_read_gbq.return_value = dummy_dataframe loaded_data = gbq_sql_file_dataset.load() mocked_read_gbq.assert_called_once_with( - project_id=PROJECT, credentials=None, query=SQL_QUERY + project_id=PROJECT, credentials=None, query_or_table=SQL_QUERY ) assert_frame_equal(dummy_dataframe, loaded_data)