Skip to content

Commit

Permalink
Add method to estimate caching key (#318)
Browse files Browse the repository at this point in the history
Related to #313 #292 

The cache key is a unique identifier for the component that will be used
to decide whether a component should be executed or not.
  • Loading branch information
PhilippeMoussalli authored Aug 21, 2023
1 parent 3525963 commit b040a30
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/fondant/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""This module defines classes to represent a Fondant Pipeline."""
import hashlib
import json
import logging
import re
import typing as t
Expand Down Expand Up @@ -139,6 +141,45 @@ def from_registry(
node_pool_name=node_pool_name,
)

def get_component_cache_key(self) -> str:
"""Calculate a cache key representing the unique identity of this ComponentOp.
The cache key is computed based on the component specification, image hash, arguments, and
other attributes of the ComponentOp. It is used to uniquely identify a specific instance
of the ComponentOp and is used for caching.
Returns:
A cache key representing the unique identity of this ComponentOp.
"""

def get_nested_dict_hash(input_dict):
"""Calculate the hash of a nested dictionary.
Args:
input_dict: The nested dictionary to calculate the hash for.
Returns:
The hash value (MD5 digest) of the nested dictionary.
"""
sorted_json_string = json.dumps(input_dict, sort_keys=True)
hash_object = hashlib.md5(sorted_json_string.encode()) # nosec
return hash_object.hexdigest()

component_spec_dict = self.component_spec.specification
arguments = (
get_nested_dict_hash(self.arguments) if self.arguments is not None else None
)

component_op_uid_dict = {
"component_spec_hash": get_nested_dict_hash(component_spec_dict),
"arguments": arguments,
"input_partition_rows": self.input_partition_rows,
"number_of_gpus": self.number_of_gpus,
"node_pool_name": self.node_pool_name,
}

return get_nested_dict_hash(component_op_uid_dict)


class Pipeline:
"""Class representing a Fondant Pipeline."""
Expand Down
48 changes: 48 additions & 0 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Fondant pipelines test."""
import copy
from pathlib import Path

import pytest
Expand Down Expand Up @@ -52,6 +53,53 @@ def test_component_op(
)


@pytest.mark.parametrize(
"valid_pipeline_example",
[
(
"example_1",
["first_component", "second_component", "third_component"],
),
],
)
def test_component_op_hash(
valid_pipeline_example,
monkeypatch,
):
example_dir, component_names = valid_pipeline_example
components_path = Path(valid_pipeline_path / example_dir)

comp_0_op_spec_0 = ComponentOp(
Path(components_path / component_names[0]),
arguments={"storage_args": "a dummy string arg"},
)

comp_0_op_spec_1 = ComponentOp(
Path(components_path / component_names[0]),
arguments={"storage_args": "a different string arg"},
)

comp_1_op_spec_0 = ComponentOp(
Path(components_path / component_names[1]),
arguments={"storage_args": "a dummy string arg"},
)

comp_0_op_spec_0_copy = copy.deepcopy(comp_0_op_spec_0)

assert (
comp_0_op_spec_0.get_component_cache_key()
!= comp_0_op_spec_1.get_component_cache_key()
)
assert (
comp_0_op_spec_0.get_component_cache_key()
== comp_0_op_spec_0_copy.get_component_cache_key()
)
assert (
comp_0_op_spec_0.get_component_cache_key()
!= comp_1_op_spec_0.get_component_cache_key()
)


@pytest.mark.parametrize(
"valid_pipeline_example",
[
Expand Down

0 comments on commit b040a30

Please sign in to comment.