Skip to content

Commit

Permalink
Update caching strategy (#407)
Browse files Browse the repository at this point in the history
PR that updates the way we check for cached executions. Previous
proposal was to use glob patterns to check for a manifest under a
certain directory, but this caused it to be too slow since there glob
patterns behave differently in remote filesystems where they check for
all nested directories. More details about the issue
[here](#368 (comment)).

This PR changes this by writing a reference to a manifest file in a
folder separate from the subsets.

```
base_path/
├── pipelines/
    └── <pipeline_name>/
        ├── <pipeline_id>/
        │   ├── component_1/
        │   │   ├── manifest.json
        │   │   └── subsets/
        │   │       ├── <subset_1>/
        │   │       │   └── part.0.parquet
        │   │       └── <subset_2>/
        │   │           └── part.0.parquet
        │   └── component_2/
        │       └── manifest.json
        ├── cache/   
            ├── <hash_key1>
            ├── <hash_key2>
            ├── <hash_key3>
```

In consequence, we also reverted back to using basic fsspec
functionality which makes it easier to resolve files without having to
explicitly to which protocol they belong to. Closing [this
ticket](#402) in favor of this
one.

Tested with all different combinations (remote/local runner/basepath)
  • Loading branch information
PhilippeMoussalli authored Sep 5, 2023
1 parent 910d3eb commit 187148a
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 150 deletions.
8 changes: 2 additions & 6 deletions src/fondant/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def _generate_spec(

pipeline.validate(run_id=run_id)

cache_key_previous_component = None

for component_name, component in pipeline._graph.items():
component_op = component["fondant_component_op"]

Expand All @@ -142,7 +140,7 @@ def _generate_spec(
[
"--output_manifest_path",
f"{path}/{metadata.pipeline_name}/{metadata.run_id}/"
f"{component_name}/manifest_{metadata.cache_key}.json",
f"{component_name}/manifest.json",
],
)

Expand All @@ -165,12 +163,10 @@ def _generate_spec(
[
"--input_manifest_path",
f"{path}/{metadata.pipeline_name}/{metadata.run_id}/"
f"{dependency}/manifest_{cache_key_previous_component}.json",
f"{dependency}/manifest.json",
],
)

cache_key_previous_component = metadata.cache_key

volumes: t.List[t.Union[str, dict]] = []
if volume:
volumes.append(asdict(volume))
Expand Down
94 changes: 58 additions & 36 deletions src/fondant/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import dask.dataframe as dd
import pandas as pd
from fsspec import open as fs_open

from fondant.component import (
Component,
Expand All @@ -25,7 +26,6 @@
)
from fondant.component_spec import Argument, ComponentSpec, kubeflow2python_type
from fondant.data_io import DaskDataLoader, DaskDataWriter
from fondant.filesystem import get_filesystem
from fondant.manifest import Manifest, Metadata
from fondant.schema import validate_partition_number

Expand Down Expand Up @@ -53,7 +53,6 @@ def __init__(
self.metadata = Metadata.from_dict(metadata)
self.user_arguments = user_arguments
self.input_partition_rows = input_partition_rows
self.filesystem = get_filesystem(self.metadata.base_path)

@classmethod
def from_args(cls) -> "Executor":
Expand Down Expand Up @@ -191,44 +190,34 @@ def _write_data(self, dataframe: dd.DataFrame, *, manifest: Manifest):

data_writer.write_dataframe(dataframe)

def _get_latest_matching_manifest(self) -> t.Union[Manifest, None]:
def _get_cached_manifest(self) -> t.Union[Manifest, None]:
"""
Find and return the most recent matching execution's Manifest for the component,
if it exists.
Find and return the matching execution's Manifest for the component, if it exists.
This function searches for previous execution manifests that match the component's metadata.
Returns:
The Manifest object representing the most recent matching execution,
or None if no matching execution is found.
"""
matching_manifest_glob_pattern = (
f"{self.metadata.base_path}/{self.metadata.pipeline_name}/*/"
f"{self.metadata.component_id}/manifest_{self.metadata.cache_key}.json"
manifest_reference_path = (
f"{self.metadata.base_path}/{self.metadata.pipeline_name}/cache/"
f"{self.metadata.cache_key}.txt"
)

matching_manifests = self.filesystem.glob(matching_manifest_glob_pattern)

if matching_manifests:
logger.info("Matching execution for component detected.")
nb_matching_manifest = len(matching_manifests)

if nb_matching_manifest > 1:
try:
with fs_open(manifest_reference_path, mode="rt", encoding="utf-8") as file_:
cached_manifest_path = file_.read()
manifest = Manifest.from_file(cached_manifest_path)
logger.info(
f"Multiple matching executions for the component were found: "
f"{nb_matching_manifest}. Picking the most recent one.",
f"Matching execution detected for component. The last execution of the"
f" component originated from `{manifest.run_id}`.",
)
return manifest

# Get the most recent file based on the file creation date time
manifest_file = max(matching_manifests, key=self.filesystem.created)
else:
manifest_file = matching_manifests[0]

return Manifest.from_file(manifest_file, self.filesystem)

logger.info("No matching execution for component detected")

return None
except FileNotFoundError:
logger.info("No matching execution for component detected")
return None

def _is_previous_cached(self, input_manifest: Manifest) -> bool:
"""
Expand Down Expand Up @@ -288,7 +277,7 @@ def execute(self, component_cls: t.Type[Component]) -> None:
input_manifest = self._load_or_create_manifest()

if self.cache and self._is_previous_cached(input_manifest):
output_manifest = self._get_latest_matching_manifest()
output_manifest = self._get_cached_manifest()
if output_manifest is not None:
logger.info("Skipping component execution")
else:
Expand All @@ -300,6 +289,32 @@ def execute(self, component_cls: t.Type[Component]) -> None:

self.upload_manifest(output_manifest, save_path=self.output_manifest_path)

def _upload_cache_key(
self,
manifest: Manifest,
manifest_save_path: t.Union[str, Path],
):
"""
Write the cache key containing the reference to the location of the written manifest..
This function creates a file with the format "<cache_key>.txt" at the specified
'manifest_save_path' to store the manifest location for future retrieval of
cached component executions.
Args:
manifest: The reference manifest.
manifest_save_path (str): The path where the manifest is saved.
"""
manifest_reference_path = (
f"{manifest.base_path}/{manifest.pipeline_name}/cache/"
f"{self.metadata.cache_key}.txt"
)

logger.info(f"Writing cache key to {manifest_reference_path}")

with fs_open(manifest_reference_path, mode="wt", encoding="utf-8") as file_:
file_.write(str(manifest_save_path))

def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]):
"""
Uploads the manifest to the specified destination.
Expand All @@ -320,19 +335,26 @@ def upload_manifest(self, manifest: Manifest, save_path: t.Union[str, Path]):
# Save to the expected base path directory
save_path_base_path = (
f"{manifest.base_path}/{manifest.pipeline_name}/{manifest.run_id}/"
f"{manifest.component_id}/manifest_{manifest.cache_key}.json"
f"{manifest.component_id}/manifest.json"
)
Path(save_path_base_path).parent.mkdir(parents=True, exist_ok=True)
manifest.to_file(save_path_base_path, self.filesystem)
# Upload manifest and it's reference if cache is False
manifest.to_file(save_path_base_path)
logger.info(f"Saving output manifest to {save_path_base_path}")
self._upload_cache_key(
manifest=manifest,
manifest_save_path=save_path_base_path,
)
# Write manifest to the native kfp artifact path that will be passed as an artifact
# and read by the next component
manifest.to_file(save_path, self.filesystem)
manifest.to_file(save_path)
else:
# Local runner
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
manifest.to_file(save_path, self.filesystem)
manifest.to_file(save_path)
logger.info(f"Saving output manifest to {save_path}")
self._upload_cache_key(
manifest=manifest,
manifest_save_path=save_path,
)


class DaskLoadExecutor(Executor[DaskLoadComponent]):
Expand Down Expand Up @@ -372,7 +394,7 @@ class TransformExecutor(Executor[Component]):
"""Base class for a Fondant transform component."""

def _load_or_create_manifest(self) -> Manifest:
return Manifest.from_file(self.input_manifest_path, self.filesystem)
return Manifest.from_file(self.input_manifest_path)

def _execute_component(
self,
Expand Down Expand Up @@ -514,7 +536,7 @@ def optional_fondant_arguments() -> t.List[str]:
return ["output_manifest_path"]

def _load_or_create_manifest(self) -> Manifest:
return Manifest.from_file(self.input_manifest_path, self.filesystem)
return Manifest.from_file(self.input_manifest_path)

def _execute_component(
self,
Expand Down
26 changes: 0 additions & 26 deletions src/fondant/filesystem.py

This file was deleted.

10 changes: 5 additions & 5 deletions src/fondant/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pathlib import Path

import jsonschema.exceptions
from fsspec import AbstractFileSystem
from fsspec import open as fs_open
from jsonschema import Draft4Validator
from referencing import Registry, Resource
from referencing.jsonschema import DRAFT4
Expand Down Expand Up @@ -177,15 +177,15 @@ def create(
return cls(specification)

@classmethod
def from_file(cls, path: t.Union[str, Path], fs: AbstractFileSystem) -> "Manifest":
def from_file(cls, path: t.Union[str, Path]) -> "Manifest":
"""Load the manifest from the file specified by the provided path."""
with fs.open(path, encoding="utf-8") as file_:
with fs_open(path, encoding="utf-8") as file_:
specification = json.load(file_)
return cls(specification)

def to_file(self, path: t.Union[str, Path], fs: AbstractFileSystem) -> None:
def to_file(self, path: t.Union[str, Path]) -> None:
"""Dump the manifest to the file specified by the provided path."""
with fs.open(path, "w", encoding="utf-8") as file_:
with fs_open(path, "w", encoding="utf-8") as file_:
json.dump(self._specification, file_)

def copy(self) -> "Manifest":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
- '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000",
"component_id": "first_component", "cache_key": "1"}'
- --output_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest_1.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json
- --storage_args
- a dummy string arg
- --input_partition_rows
Expand Down Expand Up @@ -40,7 +40,7 @@ services:
- '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000",
"component_id": "second_component", "cache_key": "2"}'
- --output_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest_2.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest.json
- --storage_args
- a dummy string arg
- --input_partition_rows
Expand All @@ -54,7 +54,7 @@ services:
"array", "items": {"type": "float32"}}}}}, "args": {"storage_args": {"description":
"Storage arguments", "type": "str"}}}'
- --input_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest_1.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json
depends_on:
first_component:
condition: service_completed_successfully
Expand All @@ -68,7 +68,7 @@ services:
- '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000",
"component_id": "third_component", "cache_key": "3"}'
- --output_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/third_component/manifest_3.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/third_component/manifest.json
- --storage_args
- a dummy string arg
- --input_partition_rows
Expand All @@ -84,7 +84,7 @@ services:
false}, "args": {"storage_args": {"description": "Storage arguments", "type":
"str"}}}'
- --input_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest_2.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/second_component/manifest.json
depends_on:
second_component:
condition: service_completed_successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
- '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000",
"component_id": "first_component", "cache_key": "1"}'
- --output_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest_1.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json
- --storage_args
- a dummy string arg
- --input_partition_rows
Expand All @@ -29,7 +29,7 @@ services:
- '{"base_path": "/foo/bar", "pipeline_name": "test_pipeline", "run_id": "test_pipeline-20230101000000",
"component_id": "image_cropping", "cache_key": "2"}'
- --output_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/image_cropping/manifest_2.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/image_cropping/manifest.json
- --cropping_threshold
- '0'
- --padding
Expand All @@ -50,7 +50,7 @@ services:
for the image cropping. The padding is added to all borders of the image.",
"type": "int", "default": 10}}}'
- --input_manifest_path
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest_1.json
- /foo/bar/test_pipeline/test_pipeline-20230101000000/first_component/manifest.json
depends_on:
first_component:
condition: service_completed_successfully
Expand Down
7 changes: 4 additions & 3 deletions tests/example_specs/components/arguments/input_manifest.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
{
"metadata": {
"pipeline_name": "example_pipeline",
"base_path": "/bucket",
"run_id": "2024",
"component_id": "component_1"
"base_path": "tests/example_data/subsets_input/mock_base_path",
"run_id": "example_pipeline_123",
"component_id": "component_1",
"cache_key": "00"
},
"index": {
"location": "/index"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tests/example_specs/mock_base_path/example_pipeline/example_pipeline_2023/component_1/manifest.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"metadata": {
"pipeline_name": "test_pipeline",
"base_path": "gs://bucket",
"run_id": "test_pipeline_2022",
"pipeline_name": "example_pipeline",
"base_path": "tests/example_data/subsets_input/mock_base_path",
"run_id": "example_pipeline_2023",
"component_id": "component_1",
"cache_key": "1"
"cache_key": "42"
},
"index": {
"location": "/index"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"metadata": {
"pipeline_name": "test_pipeline",
"base_path": "gs://bucket",
"run_id": "test_pipeline_2023",
"pipeline_name": "example_pipeline",
"base_path": "tests/example_data/subsets_input/mock_base_path",
"run_id": "example_pipeline_2023",
"component_id": "component_2",
"cache_key": "2"
"cache_key": "42"
},
"index": {
"location": "/index"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"metadata": {
"pipeline_name": "test_pipeline",
"base_path": "gs://bucket",
"run_id": "test_pipeline_2023",
"pipeline_name": "example_pipeline",
"base_path": "tests/example_data/subsets_input/mock_base_path",
"run_id": "example_pipeline_2024",
"component_id": "component_1",
"cache_key": "1"
"cache_key": "42"
},
"index": {
"location": "/index"
Expand Down
Loading

0 comments on commit 187148a

Please sign in to comment.