Skip to content

Commit

Permalink
Add documentation about nodes with generator functions (#2302)
Browse files Browse the repository at this point in the history
* add generator functions documentation

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* add example output of kedro run

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* Update nodes.md

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* changes based on review

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* add permalink to original split_data in pandas_iris

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* grammar

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* Update nodes.md

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* changes based on review II

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* Update nodes.md

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* point to links on how to create starters

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* comments

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

* changes based on review III

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>

---------

Signed-off-by: SajidAlamQB <90610031+SajidAlamQB@users.noreply.github.com>
  • Loading branch information
SajidAlamQB committed Feb 13, 2023
1 parent fa8c56f commit c83ce9a
Showing 1 changed file with 156 additions and 0 deletions.
156 changes: 156 additions & 0 deletions docs/source/nodes_and_pipelines/nodes.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,159 @@ Out[2]: {'sum': 5}
```{note}
You can also call a node as a regular Python function: `adder_node(dict(a=2, b=3))`. This will call `adder_node.run(dict(a=2, b=3))` behind the scenes.
```

## How to use generator functions in a node

[Generator functions](https://learnpython.org/en/Generators) were introduced with [PEP 255](https://www.python.org/dev/peps/pep-0255). They are a special kind of function that returns lazy iterators but do not store their entire contents in memory all at once.

The following code uses a `pandas chunksize` generator to process large datasets within the [`pandas-iris` starter](../kedro_project_setup/starters.md). First set up a project by following the [get started guide](../get_started/new_project.md#create-the-example-project) to create a Kedro project with the `pandas-iris` starter example code.

Create a [custom dataset](../extend_kedro/custom_datasets.md) called `ChunkWiseCSVDataSet` in `src/YOUR_PROJECT_NAME/extras/datasets/chunkwise_dataset.py` for your `pandas-iris` project. This dataset is a simplified version of the `pandas.CSVDataSet` where the main change is to the `_save` method which should save the data in append-or-create mode, `a+`.

<details>
<summary><b>Click to expand</b></summary>

```python
from copy import deepcopy
from io import BytesIO
from pathlib import PurePosixPath
from typing import Any, Dict

import fsspec
import pandas as pd

from kedro.io.core import (
AbstractVersionedDataSet,
Version,
get_filepath_str,
get_protocol_and_path,
)


class ChunkWiseCSVDataSet(AbstractVersionedDataSet[pd.DataFrame, pd.DataFrame]):
"""``ChunkWiseCSVDataSet`` loads/saves data from/to a CSV file using an underlying
filesystem. It uses pandas to handle the CSV file.
"""

DEFAULT_LOAD_ARGS = {} # type: Dict[str, Any]
DEFAULT_SAVE_ARGS = {"index": False} # type: Dict[str, Any]

def __init__(
self,
filepath: str,
load_args: Dict[str, Any] = None,
save_args: Dict[str, Any] = None,
version: Version = None,
credentials: Dict[str, Any] = None,
fs_args: Dict[str, Any] = None,
) -> None:
"""Creates a new instance of ``ChunkWiseCSVDataSet`` pointing to a concrete CSV file
on a specific filesystem.
"""
_fs_args = deepcopy(fs_args) or {}
_credentials = deepcopy(credentials) or {}

protocol, path = get_protocol_and_path(filepath, version)
if protocol == "file":
_fs_args.setdefault("auto_mkdir", True)

self._protocol = protocol
self._storage_options = {**_credentials, **_fs_args}
self._fs = fsspec.filesystem(self._protocol, **self._storage_options)

super().__init__(
filepath=PurePosixPath(path),
version=version,
exists_function=self._fs.exists,
glob_function=self._fs.glob,
)

# Handle default load and save arguments
self._load_args = deepcopy(self.DEFAULT_LOAD_ARGS)
if load_args is not None:
self._load_args.update(load_args)
self._save_args = deepcopy(self.DEFAULT_SAVE_ARGS)
if save_args is not None:
self._save_args.update(save_args)

def _describe(self) -> Dict[str, Any]:
return {
"filepath": self._filepath,
"protocol": self._load_args,
"save_args": self._save_args,
"version": self._version,
}

def _load(self) -> pd.DataFrame:
load_path = str(self._get_load_path())
return pd.read_csv(load_path, **self._load_args)

def _save(self, data: pd.DataFrame) -> None:
save_path = get_filepath_str(self._get_save_path(), self._protocol)

buf = BytesIO()
data.to_csv(path_or_buf=buf, **self._save_args)

with self._fs.open(save_path, mode="a+") as fs_file:
fs_file.write(buf.getvalue())
```
</details>

Modify `example_iris_data` in `catalog.yml` by changing `type` to the custom dataset you created above. Add `chunksize: 100` to `load_args` which will return an iterable object. The `chunksize` parameter refers to the number of rows in each chunk.

```yaml
example_iris_data:
type: YOUR_PROJECT_NAME.extras.datasets.chunkwise_dataset.ChunkWiseCSVDataSet
filepath: data/01_raw/iris.csv
load_args:
chunksize: 100
```
Next, in `nodes.py` we repurpose the [`split_data`](https://github.com/kedro-org/kedro-starters/blob/dacdd56f1c1afde00a03a1e342fc0f44e1567a1e/pandas-iris/%7B%7B%20cookiecutter.repo_name%20%7D%7D/src/%7B%7B%20cookiecutter.python_package%20%7D%7D/nodes.py#L13) function to process chunk-wise data:

```python
def split_data(
data: pd.DataFrame, parameters: Dict[str, Any]
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series, pd.Series]:
"""Splits data into features and target training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters.yml.
Returns:
Split data.
"""
# Loop through data in chunks building up the training and test sets
for chunk in data: # Iterate over the chunks from data
full_data = pd.concat([chunk]) # Converts the TextFileReader object into list of DataFrames
data_train = full_data.sample(
frac=parameters["train_fraction"], random_state=parameters["random_state"]
)
data_test = full_data.drop(data_train.index)
X_train = data_train.drop(columns=parameters["target_column"])
X_test = data_test.drop(columns=parameters["target_column"])
y_train = data_train[parameters["target_column"]]
y_test = data_test[parameters["target_column"]]
yield X_train, X_test, y_train, y_test # Use yield instead of return to get the generator object
```

We can now `kedro run` in the terminal. The output shows `X_train`, `X_test`, `y_train`, `y_test` saved in chunks:

```
...
[02/10/23 12:42:55] INFO Loading data from 'example_iris_data' (ChunkWiseCSVDataSet)... data_catalog.py:343
INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split: split_data([example_iris_data,parameters]) -> node.py:329
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
...
```

0 comments on commit c83ce9a

Please sign in to comment.