-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix caching writes #469
Fix caching writes #469
Conversation
So If I'm understanding correctly, the logic should be as follows:
But the problem is that the component propagates the run_id from the input manifest to the output manifest. If that is correct, can we change it where it's propagated instead? So in either: I believe a change there would be simpler and easier to understand. |
I think it can still be further simplified. Just pass the |
Thanks for the feedback Robbe! I implemented the suggestion. Please double check to see if that's what you were implying. Also added other small fixes mainly related to propogating the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli!
Almost there 🙂 I would make the argument optional and rename it to just run_id
. See my suggestions.
src/fondant/manifest.py
Outdated
|
||
Args: | ||
component_spec: the component spec | ||
write_run_id: the run id used to define the write subset path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write_run_id: the run id used to define the write subset path. | |
run_id: the run id to include in the evolved manifest. If no run id is provided, the run id from the original manifest is propagated. |
src/fondant/manifest.py
Outdated
@@ -257,21 +257,27 @@ def remove_subset(self, name: str) -> None: | |||
def evolve( # noqa : PLR0912 (too many branches) | |||
self, | |||
component_spec: ComponentSpec, | |||
write_run_id: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
write_run_id: str, | |
*, | |
run_id: t.Optional[str], |
src/fondant/manifest.py
Outdated
""" | ||
evolved_manifest = self.copy() | ||
|
||
# Update `component_id` of the metadata | ||
component_id = component_spec.name.lower().replace(" ", "_") | ||
evolved_manifest.update_metadata(key="run_id", value=write_run_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
evolved_manifest.update_metadata(key="run_id", value=write_run_id) | |
if run_id is not None: | |
evolved_manifest.update_metadata(key="run_id", value=write_run_id) |
src/fondant/manifest.py
Outdated
evolved_manifest.update_metadata(key="component_id", value=component_id) | ||
|
||
# Update index location as this is currently always rewritten | ||
evolved_manifest.index._specification[ | ||
"location" | ||
] = f"/{self.pipeline_name}/{self.run_id}/{component_id}/index" | ||
] = f"/{self.pipeline_name}/{write_run_id}/{component_id}/index" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
] = f"/{self.pipeline_name}/{write_run_id}/{component_id}/index" | |
] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}/index" |
src/fondant/manifest.py
Outdated
@@ -322,7 +328,7 @@ def evolve( # noqa : PLR0912 (too many branches) | |||
# Update subset location as this is currently always rewritten | |||
evolved_manifest.subsets[subset_name]._specification[ | |||
"location" | |||
] = f"/{self.pipeline_name}/{self.run_id}/{component_id}/{subset_name}" | |||
] = f"/{self.pipeline_name}/{write_run_id}/{component_id}/{subset_name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
] = f"/{self.pipeline_name}/{write_run_id}/{component_id}/{subset_name}" | |
] = f"/{self.pipeline_name}/{evolved_manifest.run_id}/{component_id}/{subset_name}" |
src/fondant/pipeline.py
Outdated
@@ -444,7 +444,7 @@ def _validate_pipeline_definition(self, run_id: str): | |||
raise InvalidPipelineDefinition( | |||
msg, | |||
) | |||
manifest = manifest.evolve(component_spec) | |||
manifest = manifest.evolve(component_spec, write_run_id=run_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
manifest = manifest.evolve(component_spec, write_run_id=run_id) | |
manifest = manifest.evolve(component_spec, run_id=run_id) |
tests/test_manifest_evolution.py
Outdated
evolved_manifest = manifest.evolve(component_spec=component_spec) | ||
evolved_manifest = manifest.evolve( | ||
component_spec=component_spec, | ||
write_run_id=manifest.run_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update this test to provide a different run_id and validate that it's set correctly?
tests/test_manifest_evolution.py
Outdated
evolved_manifest = manifest.evolve(component_spec=component_spec) | ||
evolved_manifest = manifest.evolve( | ||
component_spec=component_spec, | ||
write_run_id=manifest.run_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change can be reverted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @PhilippeMoussalli!
PR that fixes issues related to caching subsequent components. Since cached manifests contain the same run id as the pipeline where they originated from, they can sometimes override existing data if the component needs to run again (e.g. cached first component loaded into second component which has different arguments and thus needs to execute). This PR makes sure that: * The run-id of the loaded **input manifest** matches the one from the **current pipeline run**, this ensures that all new subsets from the component being executed are written under the corresponding path of that pipeline run * The run-id of the **output manifest** is that of the **cached manifest** (if it applies). This ensures that subsequent components are able to tell whether the previous component execution was cached or not [link to code](https://github.com/ml6team/fondant/blob/39436e7faaa3e5acdd5eb347ce965ad33d669f73/src/fondant/executor.py#L294)
PR that fixes issues related to caching subsequent components.
Since cached manifests contain the same run id as the pipeline where they originated from, they can sometimes override existing data if the component needs to run again (e.g. cached first component loaded into second component which has different arguments and thus needs to execute). This PR makes sure that: