Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation about nodes with generator functions #2302

Merged
merged 15 commits into from
Feb 13, 2023
Merged
156 changes: 156 additions & 0 deletions docs/source/nodes_and_pipelines/nodes.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,159 @@ Out[2]: {'sum': 5}
```{note}
You can also call a node as a regular Python function: `adder_node(dict(a=2, b=3))`. This will call `adder_node.run(dict(a=2, b=3))` behind the scenes.
```

## How to use generator functions in a node

[Generator functions](https://learnpython.org/en/Generators) were introduced with [PEP 255](https://www.python.org/dev/peps/pep-0255). They are a special kind of function that returns a lazy iterators but do not store their entire contents in memory all at once.
SajidAlamQB marked this conversation as resolved.
Show resolved Hide resolved

The following code uses a `pandas chunksize` generator to process large datasets within the [`pandas-iris` starter](../kedro_project_setup/starters.md). Going forward we will assume you have already set up a Kedro `pandas-iris` starter project. If you have not yet, do so by following the [Kedro starters guidelines](../kedro_project_setup/starters.md).
SajidAlamQB marked this conversation as resolved.
Show resolved Hide resolved

Create a [custom dataset](../extend_kedro/custom_datasets.md) called `ChunkWiseCSVDataSet` in `src/YOUR_PROJECT_NAME/extras/datasets/chunkwise_dataset.py` for your `pandas-iris` project. This dataset is a simplified version of the `pandas.CSVDataSet` where the main change is to the `_save` method which should save the data in append-or-create mode, `a+`.

<details>
<summary><b>Click to expand</b></summary>

```python
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict

import fsspec
import pandas as pd

from kedro.io.core import (
AbstractVersionedDataSet,
Version,
get_filepath_str,
get_protocol_and_path,
)


class ChunkWiseCSVDataSet(AbstractVersionedDataSet[pd.DataFrame, pd.DataFrame]):
"""``ChunkWiseCSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem. It uses pandas to handle the CSV file.
"""

DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {"index": False} # type: Dict[str, Any]

def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``ChunkWiseCSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"protocol": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _load(self) -> pd.DataFrame:
load_path = str(self._get_load_path())
return pd.read_csv(load_path, **self._load_args)

def _save(self, data: pd.DataFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

buf = BytesIO()
data.to_csv(path_or_buf=buf, **self._save_args)

with self._fs.open(save_path, mode="a+") as fs_file:
fs_file.write(buf.getvalue())
```
SajidAlamQB marked this conversation as resolved.
Show resolved Hide resolved
</details>

Modify `example_iris_data` in `catalog.yml` by changing `type` to the custom dataset you created above. Add `chunksize: 100` to `load_args` which will return an iterable object. The `chunksize` parameter refers to the number of rows in each chunk.

```yaml
example_iris_data:
type: YOUR_PROJECT_NAME.extras.datasets.chunkwise_dataset.ChunkWiseCSVDataSet
filepath: data/01_raw/iris.csv
load_args:
chunksize: 100
```

Next, in `nodes.py` we repurpose the [`split_data`](https://github.com/kedro-org/kedro-starters/blob/dacdd56f1c1afde00a03a1e342fc0f44e1567a1e/pandas-iris/%7B%7B%20cookiecutter.repo_name%20%7D%7D/src/%7B%7B%20cookiecutter.python_package%20%7D%7D/nodes.py#L13) function to process chunk-wise data:

```python
def split_data(
data: pd.DataFrame, parameters: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""Splits data into features and target training and test sets.

Args:
data: Data containing features and target.
parameters: Parameters defined in parameters.yml.
Returns:
Split data.
"""
# Loop through data in chunks building up the training and test sets
for chunk in data: # Iterate over the chunks from data
full_data = pd.concat([chunk]) # Converts the TextFileReader object into list of DataFrames
data_train = full_data.sample(
frac=parameters["train_fraction"], random_state=parameters["random_state"]
)
data_test = full_data.drop(data_train.index)

X_train = data_train.drop(columns=parameters["target_column"])
X_test = data_test.drop(columns=parameters["target_column"])
y_train = data_train[parameters["target_column"]]
y_test = data_test[parameters["target_column"]]
yield X_train, X_test, y_train, y_test # Use yield instead of return to get the generator object
```

We can now `kedro run` in the terminal. The output shows `X_train`, `X_test`, `y_train`, `y_test` saved in chunks:

```
...
[02/10/23 12:42:55] INFO Loading data from 'example_iris_data' (ChunkWiseCSVDataSet)... data_catalog.py:343
INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split: split_data([example_iris_data,parameters]) -> node.py:329
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
...
```