From c83ce9aa1f4b3ab29a35f06be1bbc56341cf71e5 Mon Sep 17 00:00:00 2001 From: Sajid Alam <90610031+SajidAlamQB@users.noreply.github.com> Date: Mon, 13 Feb 2023 12:08:05 +0000 Subject: [PATCH] Add documentation about nodes with generator functions (#2302) * add generator functions documentation Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * add example output of kedro run Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * Update nodes.md Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * changes based on review Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * add permalink to original split_data in pandas_iris Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * grammar Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * Update nodes.md Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * changes based on review II Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * Update nodes.md Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * point to links on how to create starters Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * comments Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> * changes based on review III Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --------- Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com> --- docs/source/nodes_and_pipelines/nodes.md | 156 +++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/docs/source/nodes_and_pipelines/nodes.md b/docs/source/nodes_and_pipelines/nodes.md index 6ccad80786..dad812edbd 100644 --- a/docs/source/nodes_and_pipelines/nodes.md +++ b/docs/source/nodes_and_pipelines/nodes.md @@ -181,3 +181,159 @@ Out[2]: {'sum': 5} ```{note} You can also call a node as a regular Python function: `adder_node(dict(a=2, b=3))`. This will call `adder_node.run(dict(a=2, b=3))` behind the scenes. ``` + +## How to use generator functions in a node + +[Generator functions](https://learnpython.org/en/Generators) were introduced with [PEP 255](https://www.python.org/dev/peps/pep-0255). They are a special kind of function that returns lazy iterators but do not store their entire contents in memory all at once. + +The following code uses a `pandas chunksize` generator to process large datasets within the [`pandas-iris` starter](../kedro_project_setup/starters.md). First set up a project by following the [get started guide](../get_started/new_project.md#create-the-example-project) to create a Kedro project with the `pandas-iris` starter example code. + +Create a [custom dataset](../extend_kedro/custom_datasets.md) called `ChunkWiseCSVDataSet` in `src/YOUR_PROJECT_NAME/extras/datasets/chunkwise_dataset.py` for your `pandas-iris` project. This dataset is a simplified version of the `pandas.CSVDataSet` where the main change is to the `_save` method which should save the data in append-or-create mode, `a+`. + +
+Click to expand + +```python +from copy import deepcopy +from io import BytesIO +from pathlib import PurePosixPath +from typing import Any, Dict + +import fsspec +import pandas as pd + +from kedro.io.core import ( + AbstractVersionedDataSet, + Version, + get_filepath_str, + get_protocol_and_path, +) + + +class ChunkWiseCSVDataSet(AbstractVersionedDataSet[pd.DataFrame, pd.DataFrame]): + """``ChunkWiseCSVDataSet`` loads/saves data from/to a CSV file using an underlying + filesystem. It uses pandas to handle the CSV file. + """ + + DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any] + DEFAULT_SAVE_ARGS = {"index": False} # type: Dict[str, Any] + + def __init__( + self, + filepath: str, + load_args: Dict[str, Any] = None, + save_args: Dict[str, Any] = None, + version: Version = None, + credentials: Dict[str, Any] = None, + fs_args: Dict[str, Any] = None, + ) -> None: + """Creates a new instance of ``ChunkWiseCSVDataSet`` pointing to a concrete CSV file + on a specific filesystem. + """ + _fs_args = deepcopy(fs_args) or {} + _credentials = deepcopy(credentials) or {} + + protocol, path = get_protocol_and_path(filepath, version) + if protocol == "file": + _fs_args.setdefault("auto_mkdir", True) + + self._protocol = protocol + self._storage_options = {**_credentials, **_fs_args} + self._fs = fsspec.filesystem(self._protocol, **self._storage_options) + + super().__init__( + filepath=PurePosixPath(path), + version=version, + exists_function=self._fs.exists, + glob_function=self._fs.glob, + ) + + # Handle default load and save arguments + self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS) + if load_args is not None: + self._load_args.update(load_args) + self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS) + if save_args is not None: + self._save_args.update(save_args) + + def _describe(self) -> Dict[str, Any]: + return { + "filepath": self._filepath, + "protocol": self._load_args, + "save_args": self._save_args, + "version": self._version, + } + + def _load(self) -> pd.DataFrame: + load_path = str(self._get_load_path()) + return pd.read_csv(load_path, **self._load_args) + + def _save(self, data: pd.DataFrame) -> None: + save_path = get_filepath_str(self._get_save_path(), self._protocol) + + buf = BytesIO() + data.to_csv(path_or_buf=buf, **self._save_args) + + with self._fs.open(save_path, mode="a+") as fs_file: + fs_file.write(buf.getvalue()) +``` +
+ +Modify `example_iris_data` in `catalog.yml` by changing `type` to the custom dataset you created above. Add `chunksize: 100` to `load_args` which will return an iterable object. The `chunksize` parameter refers to the number of rows in each chunk. + +```yaml +example_iris_data: + type: YOUR_PROJECT_NAME.extras.datasets.chunkwise_dataset.ChunkWiseCSVDataSet + filepath: data/01_raw/iris.csv + load_args: + chunksize: 100 +``` + +Next, in `nodes.py` we repurpose the [`split_data`](https://github.com/kedro-org/kedro-starters/blob/dacdd56f1c1afde00a03a1e342fc0f44e1567a1e/pandas-iris/%7B%7B%20cookiecutter.repo_name%20%7D%7D/src/%7B%7B%20cookiecutter.python_package%20%7D%7D/nodes.py#L13) function to process chunk-wise data: + +```python +def split_data( + data: pd.DataFrame, parameters: Dict[str, Any] +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]: + """Splits data into features and target training and test sets. + + Args: + data: Data containing features and target. + parameters: Parameters defined in parameters.yml. + Returns: + Split data. + """ + # Loop through data in chunks building up the training and test sets + for chunk in data: # Iterate over the chunks from data + full_data = pd.concat([chunk]) # Converts the TextFileReader object into list of DataFrames + data_train = full_data.sample( + frac=parameters["train_fraction"], random_state=parameters["random_state"] + ) + data_test = full_data.drop(data_train.index) + + X_train = data_train.drop(columns=parameters["target_column"]) + X_test = data_test.drop(columns=parameters["target_column"]) + y_train = data_train[parameters["target_column"]] + y_test = data_test[parameters["target_column"]] + yield X_train, X_test, y_train, y_test # Use yield instead of return to get the generator object +``` + +We can now `kedro run` in the terminal. The output shows `X_train`, `X_test`, `y_train`, `y_test` saved in chunks: + +``` +... +[02/10/23 12:42:55] INFO Loading data from 'example_iris_data' (ChunkWiseCSVDataSet)... data_catalog.py:343 + INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343 + INFO Running node: split: split_data([example_iris_data,parameters]) -> node.py:329 + [X_train,X_test,y_train,y_test] + INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382 + INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382 + INFO Completed 1 out of 3 tasks sequential_runner.py:85 +... +```