Skip to content

Commit

Permalink
Merge branch 'main' into test/pipeline-slow-run-example
Browse files Browse the repository at this point in the history
  • Loading branch information
lrcouto authored Oct 24, 2024
2 parents 7462464 + c2d7100 commit 3a1ce58
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 23 deletions.
2 changes: 2 additions & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
**Note:** ``KedroDataCatalog`` is an experimental feature and is under active development. Therefore, it is possible we'll introduce breaking changes to this class, so be mindful of that if you decide to use it already. Let us know if you have any feedback about the ``KedroDataCatalog`` or ideas for new features.

## Bug fixes and other changes
* Added I/O support for Oracle Cloud Infrastructure (OCI) Object Storage filesystem

## Breaking changes to the API
## Documentation changes
* Updated CLI autocompletion docs with new Click syntax.
Expand Down
8 changes: 6 additions & 2 deletions benchmarks/benchmark_datacatalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,15 @@ def time_release(self):

def time_add_all(self):
"""Benchmark the time to add all datasets"""
self.catalog.add_all(self.datasets)
# Have to initialise a new DataCatalog to avoid failing with DatasetAlreadyExistsError
catalog = DataCatalog.from_config(base_catalog)
catalog.add_all(self.datasets)

def time_feed_dict(self):
"""Benchmark the time to add feed dict"""
self.catalog.add_feed_dict(self.feed_dict)
# Have to initialise a new DataCatalog to avoid failing with DatasetAlreadyExistsError
catalog = DataCatalog.from_config(base_catalog)
catalog.add_feed_dict(self.feed_dict)

def time_list(self):
"""Benchmark the time to list all datasets"""
Expand Down
130 changes: 130 additions & 0 deletions benchmarks/benchmark_kedrodatacatalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import pandas as pd
from kedro_datasets.pandas import CSVDataset

from kedro.io import KedroDataCatalog

base_catalog = {
f"dataset_{i}": {
"type": "pandas.CSVDataset",
"filepath": f"data_{i}.csv",
} for i in range(1, 1001)
}
# Add datasets with the same filepath for loading
base_catalog.update({
f"dataset_load_{i}": {
"type": "pandas.CSVDataset",
"filepath": "data.csv",
} for i in range(1, 1001)
})
# Add a factory pattern
base_catalog.update({
"dataset_factory_{placeholder}": {
"type": "pandas.CSVDataset",
"filepath": "data_{placeholder}.csv",
}
})

runtime_patterns = {
"{placeholder}": {
"type": "pandas.CSVDataset",
"filepath": "{placeholder}.csv",
}
}

class TimeKedroDataCatalog:
def setup(self):
self.catalog = KedroDataCatalog.from_config(base_catalog)
self.dataframe = pd.DataFrame({"column": [1, 2, 3]})
self.dataframe.to_csv("data.csv", index=False)
self.datasets = {
f"dataset_new_{i}": CSVDataset(filepath="data.csv") for i in range(1, 1001)
}
self.feed_dict = {
f"param_{i}": i for i in range(1, 1001)
}

def time_init(self):
"""Benchmark the time to initialize the catalog"""
KedroDataCatalog.from_config(base_catalog)

def time_contains(self):
"""Benchmark the time to check if a dataset exists"""
for i in range(1,1001):
f"dataset_{i}" in self.catalog

def time_getitem(self):
"""Benchmark the time to get a dataset"""
for i in range(1,1001):
self.catalog[f"dataset_{i}"]


def time_get(self):
"""Benchmark the time to get a dataset"""
for i in range(1,1001):
self.catalog.get(f"dataset_{i}")

def time_iter(self):
"""Benchmark the time to iterate over the catalog"""
for dataset in self.catalog:
pass

def time_keys(self):
"""Benchmark the time to get the keys of the catalog"""
self.catalog.keys()

def time_values(self):
"""Benchmark the time to get the items of the catalog"""
self.catalog.values()

def time_items(self):
"""Benchmark the time to get the items of the catalog"""
self.catalog.items()

def time_setitem(self):
"""Benchmark the time to set a dataset"""
for i in range(1,1001):
self.catalog[f"dataset_new_{i}"] = CSVDataset(filepath="data.csv")

def time_setitem_raw(self):
"""Benchmark the time to add a memory dataset"""
for i in range(1,1001):
self.catalog[f"param_{i}"] = self.feed_dict[f"param_{i}"]

def time_save(self):
"""Benchmark the time to save datasets"""
for i in range(1,1001):
self.catalog.save(f"dataset_{i}", self.dataframe)

def time_load(self):
"""Benchmark the time to load datasets"""
for i in range(1,1001):
self.catalog.load(f"dataset_load_{i}")

def time_exists(self):
"""Benchmark the time to check if datasets exist"""
for i in range(1,1001):
self.catalog.exists(f"dataset_{i}")

def time_release(self):
"""Benchmark the time to release datasets"""
for i in range(1,1001):
self.catalog.release(f"dataset_{i}")

def time_list(self):
"""Benchmark the time to list all datasets"""
self.catalog.list()

def time_shallow_copy(self):
"""Benchmark the time to shallow copy the catalog"""
# Will be removed
self.catalog.shallow_copy()

def time_resolve_factory(self):
"""Benchmark the time to resolve factory"""
for i in range(1,1001):
self.catalog.get(f"dataset_factory_{i}")

def time_add_runtime_patterns(self):
"""Benchmark the time to add runtime patterns"""
for i in range(1,1001):
self.catalog.config_resolver.add_runtime_patterns(runtime_patterns)
2 changes: 1 addition & 1 deletion features/windows_reqs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# everything, so just this subset will be enough for CI
behave==1.2.6
pandas~=1.3
psutil~=6.0
psutil~=6.1
requests~=2.32
toml~=0.10.1
PyYAML>=4.2, <7.0
Expand Down
9 changes: 6 additions & 3 deletions kedro/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"gcs",
"gdrive",
"gs",
"oci",
"oss",
"s3",
"s3a",
Expand Down Expand Up @@ -820,9 +821,11 @@ def _parse_filepath(filepath: str) -> dict[str, str]:
host_with_port = parsed_path.netloc.rsplit("@", 1)[-1]
host = host_with_port.rsplit(":", 1)[0]
options["path"] = host + options["path"]
# Azure Data Lake Storage Gen2 URIs can store the container name in the
# 'username' field of a URL (@ syntax), so we need to add it to the path
if protocol == "abfss" and parsed_path.username:
# - Azure Data Lake Storage Gen2 URIs can store the container name in the
# 'username' field of a URL (@ syntax), so we need to add it to the path
# - Oracle Cloud Infrastructure (OCI) Object Storage filesystem (ocifs) also
# uses the @ syntax for I/O operations: "oci://bucket@namespace/path_to_file"
if protocol in ["abfss", "oci"] and parsed_path.username:
options["path"] = parsed_path.username + "@" + options["path"]

return options
Expand Down
39 changes: 22 additions & 17 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from more_itertools import interleave

from kedro.framework.hooks.manager import _NullPluginManager
from kedro.io import CatalogProtocol, MemoryDataset
from kedro.io import CatalogProtocol, MemoryDataset, SharedMemoryDataset
from kedro.pipeline import Pipeline

if TYPE_CHECKING:
Expand Down Expand Up @@ -84,11 +84,8 @@ def run(
by the node outputs.
"""

hook_or_null_manager = hook_manager or _NullPluginManager()

# Check which datasets used in the pipeline are in the catalog or match
# a pattern in the catalog
# a pattern in the catalog, not including extra dataset patterns
registered_ds = [ds for ds in pipeline.datasets() if ds in catalog]

# Check if there are any input datasets that aren't in the catalog and
Expand All @@ -100,22 +97,17 @@ def run(
f"Pipeline input(s) {unsatisfied} not found in the {catalog.__class__.__name__}"
)

# Identify MemoryDataset in the catalog
memory_datasets = {
ds_name
for ds_name, ds in catalog._datasets.items()
if isinstance(ds, MemoryDataset)
}

# Check if there's any output datasets that aren't in the catalog and don't match a pattern
# in the catalog and include MemoryDataset.
free_outputs = pipeline.outputs() - (set(registered_ds) - memory_datasets)

# Register the default dataset pattern with the catalog
catalog = catalog.shallow_copy(
extra_dataset_patterns=self._extra_dataset_patterns
)

hook_or_null_manager = hook_manager or _NullPluginManager()

# Check which datasets used in the pipeline are in the catalog or match
# a pattern in the catalog, including added extra_dataset_patterns
registered_ds = [ds for ds in pipeline.datasets() if ds in catalog]

if self._is_async:
self._logger.info(
"Asynchronous mode is enabled for loading and saving data"
Expand All @@ -124,7 +116,20 @@ def run(

self._logger.info("Pipeline execution completed successfully.")

return {ds_name: catalog.load(ds_name) for ds_name in free_outputs}
# Identify MemoryDataset in the catalog
memory_datasets = {
ds_name
for ds_name, ds in catalog._datasets.items()
if isinstance(ds, MemoryDataset) or isinstance(ds, SharedMemoryDataset)
}

# Check if there's any output datasets that aren't in the catalog and don't match a pattern
# in the catalog and include MemoryDataset.
free_outputs = pipeline.outputs() - (set(registered_ds) - memory_datasets)

run_output = {ds_name: catalog.load(ds_name) for ds_name in free_outputs}

return run_output

def run_only_missing(
self, pipeline: Pipeline, catalog: CatalogProtocol, hook_manager: PluginManager
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# {{ cookiecutter.project_name }}

[![Powered by Kedro](https://img.shields.io/badge/powered_by-kedro-ffc900?logo=kedro)](https://kedro.org)

## Overview

This is your new Kedro project, which was generated using `kedro {{ cookiecutter.kedro_version }}`.
Expand Down
1 change: 1 addition & 0 deletions tests/io/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def test_get_filepath_str(self):
"abfss://mycontainer@mystorageaccount.dfs.core.windows.net/mypath",
("abfss", "mycontainer@mystorageaccount.dfs.core.windows.net/mypath"),
),
("oci://bucket@namespace/file.txt", ("oci", "bucket@namespace/file.txt")),
("hdfs://namenode:8020/file.txt", ("hdfs", "/file.txt")),
("file:///tmp/file.txt", ("file", "/tmp/file.txt")),
("/tmp/file.txt", ("file", "/tmp/file.txt")),
Expand Down
11 changes: 11 additions & 0 deletions tests/runner/test_sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ def test_log_not_using_async(self, fan_out_fan_in, catalog, caplog):
SequentialRunner().run(fan_out_fan_in, catalog)
assert "Using synchronous mode for loading and saving data." in caplog.text

def test_run_twice_giving_same_result(self, fan_out_fan_in, catalog):
catalog.add_feed_dict({"A": 42})
result_first_run = SequentialRunner().run(
fan_out_fan_in, catalog, hook_manager=_create_hook_manager()
)
result_second_run = SequentialRunner().run(
fan_out_fan_in, catalog, hook_manager=_create_hook_manager()
)

assert result_first_run == result_second_run


@pytest.mark.parametrize("is_async", [False, True])
class TestSeqentialRunnerBranchlessPipeline:
Expand Down

0 comments on commit 3a1ce58

Please sign in to comment.