Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganise and improve the data catalog documentation #2888

Merged
merged 23 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/build-docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ set -o nounset
action=$1

if [ "$action" == "linkcheck" ]; then
sphinx-build -WETan -j auto -D language=en -b linkcheck -d docs/build/doctrees docs/source docs/build/linkcheck
stichbury marked this conversation as resolved.
Show resolved Hide resolved
sphinx-build -ETan -j auto -D language=en -b linkcheck -d docs/build/doctrees docs/source docs/build/linkcheck
elif [ "$action" == "docs" ]; then
sphinx-build -WETa -j auto -D language=en -b html -d docs/build/doctrees docs/source docs/build/html
sphinx-build -ETa -j auto -D language=en -b html -d docs/build/doctrees docs/source docs/build/html
fi
2 changes: 1 addition & 1 deletion docs/source/configuration/credentials.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
For security reasons, we strongly recommend that you *do not* commit any credentials or other secrets to version control.
Kedro is set up so that, by default, if a file inside the `conf` folder (and its subfolders) contains `credentials` in its name, it will be ignored by git.

Credentials configuration can be used on its own directly in code or [fed into the `DataCatalog`](../data/data_catalog.md#feeding-in-credentials).
Credentials configuration can be used on its own directly in code or [fed into the `DataCatalog`](../data/data_catalog.md#dataset-access-credentials).
If you would rather store your credentials in environment variables instead of a file, you can use the `OmegaConfigLoader` [to load credentials from environment variables](advanced_configuration.md#how-to-load-credentials-through-environment-variables) as described in the advanced configuration chapter.

## How to load credentials in code
Expand Down
258 changes: 258 additions & 0 deletions docs/source/data/advanced_data_catalog_usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
# Advanced Data Catalog usage

stichbury marked this conversation as resolved.
Show resolved Hide resolved
## How to read the same file using two different datasets

Use transcoding to load and save a file via its specified `filepath` using more than one `DataSet` implementation.

### A typical example of transcoding

Parquet files can not only be loaded via the `ParquetDataSet` using `pandas`, but also directly by `SparkDataSet`. This conversion is typical when coordinating a `Spark` to `pandas` workflow.

To enable transcoding, define two `DataCatalog` entries for the same dataset in a common format (Parquet, JSON, CSV, etc.) in your `conf/base/catalog.yml`:

```yaml
my_dataframe@spark:
type: spark.SparkDataSet
filepath: data/02_intermediate/data.parquet
file_format: parquet

my_dataframe@pandas:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/data.parquet
```

These entries are used in the pipeline like this:

```python
pipeline(
[
node(func=my_func1, inputs="spark_input", outputs="my_dataframe@spark"),
node(func=my_func2, inputs="my_dataframe@pandas", outputs="pipeline_output"),
]
)
```

### How does transcoding work?

In this example, Kedro understands that `my_dataframe` is the same dataset in its `spark.SparkDataSet` and `pandas.ParquetDataSet` formats and helps resolve the node execution order.

In the pipeline, Kedro uses the `spark.SparkDataSet` implementation for saving and `pandas.ParquetDataSet`
for loading, so the first node outputs a `pyspark.sql.DataFrame`, while the second node receives a `pandas.Dataframe`.
stichbury marked this conversation as resolved.
Show resolved Hide resolved


## How to access the Data Catalog in code
stichbury marked this conversation as resolved.
Show resolved Hide resolved

You can define a Data Catalog in two ways. Most use cases can be through a YAML configuration file as [illustrated previously](./data_catalog.md), but it is possible to access the Data Catalog programmatically through [`kedro.io.DataCatalog`](/kedro.io.DataCatalog) using an API that allows you to configure data sources in code and use the IO module within notebooks.

### How to configure a Data Catalog using the `DataCatalog` API
stichbury marked this conversation as resolved.
Show resolved Hide resolved

In a file like `catalog.py`, you can construct a `DataCatalog` object programmatically. In the following, we are using several pre-built data loaders documented in the [API reference documentation](/kedro_datasets).

```python
from kedro.io import DataCatalog
from kedro_datasets.pandas import (
CSVDataSet,
SQLTableDataSet,
SQLQueryDataSet,
ParquetDataSet,
)

io = DataCatalog(
stichbury marked this conversation as resolved.
Show resolved Hide resolved
{
"bikes": CSVDataSet(filepath="../data/01_raw/bikes.csv"),
"cars": CSVDataSet(filepath="../data/01_raw/cars.csv", load_args=dict(sep=",")),
"cars_table": SQLTableDataSet(
table_name="cars", credentials=dict(con="sqlite:///kedro.db")
),
"scooters_query": SQLQueryDataSet(
sql="select * from cars where gear=4",
credentials=dict(con="sqlite:///kedro.db"),
),
"ranked": ParquetDataSet(filepath="ranked.parquet"),
}
)
```

When using `SQLTableDataSet` or `SQLQueryDataSet` you must provide a `con` key containing [SQLAlchemy compatible](https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls) database connection string. In the example above we pass it as part of `credentials` argument. Alternative to `credentials` is to put `con` into `load_args` and `save_args` (`SQLTableDataSet` only).

### How to view the available data sources programmatically

To review the `DataCatalog`:

```python
io.list()
```

### How to load datasets programmatically
stichbury marked this conversation as resolved.
Show resolved Hide resolved

To access each dataset by its name:

```python
cars = io.load("cars") # data is now loaded as a DataFrame in 'cars'
gear = cars["gear"].values
```

The following steps happened behind the scenes when `load` was called:

- The value `cars` was located in the Data Catalog
- The corresponding `AbstractDataSet` object was retrieved
- The `load` method of this dataset was called
- This `load` method delegated the loading to the underlying pandas `read_csv` function

### How to save data programmatically

To save data using an API similar to that used to load data:

```{warning}
This use is not recommended unless you are prototyping in notebooks.
```

stichbury marked this conversation as resolved.
Show resolved Hide resolved
#### How to save data to memory

```python
from kedro.io import MemoryDataSet

memory = MemoryDataSet(data=None)
stichbury marked this conversation as resolved.
Show resolved Hide resolved
io.add("cars_cache", memory)
io.save("cars_cache", "Memory can store anything.")
io.load("cars_cache")
```

#### How to save data to a SQL database for querying

To put the data in a SQLite database:

```python
import os

# This cleans up the database in case it exists at this point
try:
os.remove("kedro.db")
except FileNotFoundError:
pass

io.save("cars_table", cars)

# rank scooters by their mpg
ranked = io.load("scooters_query")[["brand", "mpg"]]
```

#### How to save data in Parquet

To save the processed data in Parquet format:

```python
io.save("ranked", ranked)
```

```{warning}
Saving `None` to a dataset is not allowed!
```

### How to access a dataset programmatically with credentials
Before instantiating the `DataCatalog`, Kedro will first attempt to read [the credentials from the project configuration](../configuration/credentials.md). The resulting dictionary is then passed into `DataCatalog.from_config()` as the `credentials` argument.

Let's assume that the project contains the file `conf/local/credentials.yml` with the following contents:

```yaml
dev_s3:
client_kwargs:
aws_access_key_id: key
aws_secret_access_key: secret

scooters_credentials:
con: sqlite:///kedro.db

my_gcp_credentials:
id_token: key
```

Your code will look as follows:

```python
CSVDataSet(
filepath="s3://test_bucket/data/02_intermediate/company/motorbikes.csv",
load_args=dict(sep=",", skiprows=5, skipfooter=1, na_values=["#NA", "NA"]),
credentials=dict(key="token", secret="key"),
)
```

### How to version a dataset using the Code API

In an earlier section of the documentation we described how [Kedro enables dataset and ML model versioning](./data_catalog.md/#dataset-versioning).

If you require programmatic control over load and save versions of a specific dataset, you can instantiate `Version` and pass it as a parameter to the dataset initialisation:

```python
from kedro.io import DataCatalog, Version
from kedro_datasets.pandas import CSVDataSet
import pandas as pd

data1 = pd.DataFrame({"col1": [1, 2], "col2": [4, 5], "col3": [5, 6]})
data2 = pd.DataFrame({"col1": [7], "col2": [8], "col3": [9]})
version = Version(
load=None, # load the latest available version
save=None, # generate save version automatically on each save operation
)

test_data_set = CSVDataSet(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
io = DataCatalog({"test_data_set": test_data_set})

# save the dataset to data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data1)
# save the dataset into a new file data/01_raw/test.csv/<version>/test.csv
io.save("test_data_set", data2)

# load the latest version from data/test.csv/*/test.csv
reloaded = io.load("test_data_set")
assert data2.equals(reloaded)
```

In the example above, we do not fix any versions. The behaviour of load and save operations becomes slightly different when we set a version:


```python
version = Version(
load="my_exact_version", # load exact version
save="my_exact_version", # save to exact version
)

test_data_set = CSVDataSet(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
io = DataCatalog({"test_data_set": test_data_set})

# save the dataset to data/01_raw/test.csv/my_exact_version/test.csv
io.save("test_data_set", data1)
# load from data/01_raw/test.csv/my_exact_version/test.csv
reloaded = io.load("test_data_set")
assert data1.equals(reloaded)

# raises DataSetError since the path
# data/01_raw/test.csv/my_exact_version/test.csv already exists
io.save("test_data_set", data2)
```

```{warning}
We do not recommend passing exact load and/or save versions, since it might lead to inconsistencies between operations. For example, if versions for load and save operations do not match, a save operation would result in a `UserWarning` indicating that save and load versions do not match. Load after save might also return an error if the corresponding load version is not found:
```
stichbury marked this conversation as resolved.
Show resolved Hide resolved

```python
version = Version(
load="exact_load_version", # load exact version
save="exact_save_version", # save to exact version
)

test_data_set = CSVDataSet(
filepath="data/01_raw/test.csv", save_args={"index": False}, version=version
)
io = DataCatalog({"test_data_set": test_data_set})

io.save("test_data_set", data1) # emits a UserWarning due to version inconsistency

# raises DataSetError since the data/01_raw/test.csv/exact_load_version/test.csv
# file does not exist
reloaded = io.load("test_data_set")
```
Loading