Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial pipeline execution #292

Closed
wants to merge 9 commits into from
Closed

Conversation

PhilippeMoussalli
Copy link
Contributor

@PhilippeMoussalli PhilippeMoussalli commented Jul 12, 2023

PR that enables executing components using the local runner with partial execution (starting from a specific checkpoint). The user can either resume from the last un-run component. This will make it easier to work on specific component during development especially if the pipeline consists of multiple components or if one of the components is difficult to run (e.g. laion or GPU dependent component).

Usage example:

 fondant run pipeline:pipeline --local --run-id controlnet-pipeline-20230712171928      

or specify a specific component to resume the component run from to run from an earlier executed component

 fondant run pipeline:pipeline --local --run-id controlnet-pipeline-20230712171928 --resume-component laion_retrieval      
  • I changed the structure in which the manifest was written locally from
    component_name/manifest.json to component_name/run_id/manifest.json to avoid overwriting the manifest on different pipeline runs

  • The approach depends on modifying the docker compose file.

services:
  first_component:
    depends_on: {}
  second_component:
    depends_on: first_component
  third_component:
    depends_on: second_component

if you only want to run the second component, it becomes

services:
  second_component:
    depends_on:{}
  third_component:
    depends_on: second_component

I don't think there is a way to only work with the original docker-compose file and only specify the services to run because the chaining of the components needs to be modified in the file. More details on this here.

  • Some things will have to be written and optimized for the remote runner.

@RobbeSneyders
Copy link
Member

Thanks @PhilippeMoussalli!

I think we first need to think about how we want the partial execution to work:

  • Via caching. Fondant detects if a component and its inputs are unchanged and skips the step if that's the case. Similar to KfP caching. I see two ways to implement this:
    • At runtime, so the runner selects which components to execute. I think this would be the ideal case, but it might not be possible with every runner (eg. can't find how to do it with KfP in their documentation).
    • In the component, so the component gets executed by the runner, but decides itself if it should actually execute or just produce a manifest referring to the data from the cached run.
  • Via command line arguments at runtime. The pipeline definition stays the same, but only a part of it is executed based on arguments passed by the user. Same issue as above, might not be supported by every runner.
  • At compilation time like in this PR. This is my least favorite approach, since it requires re-compilation and is not compatible with caching (which by definition happens at runtime).

Full blown caching is probably too much at this moment, but I'd like to make sure that the steps we take are towards it.

@PhilippeMoussalli
Copy link
Contributor Author

PhilippeMoussalli commented Jul 13, 2023

Thanks for the elaborate feedback.

  • In the component, so the component gets executed by the runner, but decides itself if it should actually execute or just produce a manifest referring to the data from the cached run.

I think this might be the easiest approach since I'm not sure if it can be done at the runner level (at least for the docker based one)

We need to check two things in this case:

  • Which components have been executed: this can be done by checking if a component has produced a manifest as I already have implemented.
  • If executed components need to be re-run: Helps decide whether we need to start from the latest un-run component or if we need to re-run previous components. We need to check the pipeline and component task specification (base image, args, commands). I think all of those can be found in the docker-compose.yml file. Based on that, we can pass a flag internally to decide whether a component needs to run or whether it should just produce a manifest (adding another internal hidden argument to the components cached like input_manifest). The new cached run will have the same run_id and will overwrite data from subsequent components. We would also need to store and tag the docker_compose.yml. I would propose to store them in base_path/local_runner/docker_compose_<pipeline_run_id>.yml. One thing that we still need to check is if a component has been updated, for example a component can have the same tag (e.g latest) but could have been modified. I would propose to store the image_id/hash as part of the metadata fields for each argument. We can fetch this with the docker package
# import docker
#
# # Create a Docker client
# client = docker.from_env()
#
# # Specify the image URL
# image_url = "ghcr.io/ml6team/caption_images:latest"
#
# # Get the image object
# image = client.images.get(image_url)
#
# # Get the image creation timestamp
# created_timestamp = image.attrs["image_id"]

So the advantage here over the current implemented approach is that now we have an automatic detection of when components have been changed and the user will no longer have to specify a resume_component argument but only the run_id.

That being said, it's still not very clear to me where the comparison of the components should be done. The easy way would be to generate a new docker-compose spec and compare it with the old one. If everything matches (args, component hash, ..) we resume the run from last un-run component, if not (e.g. arguments change) then we need to re-compile again to generate a new docker-compose file with the new updated arguments. However, you mentioned that recompiling again is something that you want to avoid since caching only happens at run time

Let me know what you think.

@RobbeSneyders
Copy link
Member

Thanks for the response @PhilippeMoussalli and sorry for the delay, I needed some time to wrap my head around this 🤯 I'm just going to try and summarize my view based on your response:

I think there are two high level ways to tackle this:

  • At compilation time. This means that the compiler checks whether a component should run and generates only a partial pipeline definition (as you did in this PR). I'm not a big fan of this:
    • It means you need to recompile every time and cannot re-run the same pipeline definition when it fails
    • You already decide which components to run at compilation time, while the situation might have changed by runtime
  • At runtime. This means that the runner checks whether a component should run without changing anything to the pipeline definition. We then need a mechanism to indicate which components should actually execute. I see two options here:
    • We integrate with the underlying orchestration framework (kfp, docker compose, ...) to tell it to skip the execution of a component. This would require custom integration for every framework, and I even doubt that every orchestration framework supports this.
    • We pass in an argument to the component which tells it to skip its execution, as you propose in your proposal.

So let's continue with that last option.

I would not leverage the docker_compose.yml, but let every component log a summary of its execution after it's finished. As you mention, it should probably include the image digest instead of the tag. I would then completely rely on this log to decide if an image should run or be skipped.

The new run should always have a new run id, we should never overwrite any older runs, as it breaks the lineage. We do need the run_id of the previous run to compare though, and I'm not yet sure how we should decide which run this is. We can probably start by having the user pass it in manually.

We probably also need a flag to disable caching, as you might want to re-run a component because something external changed (eg. external data you're reading).

@PhilippeMoussalli
Copy link
Contributor Author

I still think it needs to both a combination of compile and runtime. I don't see how else would we run the pipeline with different arguments without generating a different pipeline spec.
The compilation will generate the necessary specs needed for comparison (args, specs, ...) and run, based on that we can calculate a cache key that represents a unique component run identifier for each component during compilation. Based on this, we can decide whether a component needs to get executed or simply return the manifest of previous component runs with a matching cache key.

I'v draw a diagram to make this a little more concrete

image

Excalidraw link

This has the advantage that you don't need to specify which pipeline run to resume from. It is a bit similar to how Vertex describes their caching approach link. It seems like they do compile again and check for matching component execution

Some caveats based on your previous comments

  • You already decide which components to run at compilation time, while the situation might have changed by runtime

Yes this is especially relevant if your images get updated and you're running the same specification twice, we could instead move the calculation of the cache key to the component itself and calculate it at runtime.

I would not leverage the docker_compose.yml, but let every component log a summary of its execution after it's finished. As you mention, it should probably include the image digest instead of the tag. I would then completely rely on this log to decide if an image should run or be skipped.

This can still be equivalent to the hash key, not sure if we need to log a full summary. Otherwise, I would just include it in the manifest as part of the metadata.

The new run should always have a new run id, we should never overwrite any older runs, as it breaks the lineage. We do need the run_id of the previous run to compare though, and I'm not yet sure how we should decide which run this is. We can probably start by having the user pass it in manually.

I agree with running it with a new run_id, the comparison with a specific run_id from previous runs is not needed if we follow the approach specified above where we check for matching executions in the base_path (equivalent to Vertex ML metadata store).

We probably also need a flag to disable caching, as you might want to re-run a component because something external changed (eg. external data you're reading).

Yes I agree, we can introduce this at the ComponentOp level and have it enabled by default

@PhilippeMoussalli
Copy link
Contributor Author

closed in favor of #313

RobbeSneyders pushed a commit that referenced this pull request Aug 21, 2023
Related to #313 #292 

The cache key is a unique identifier for the component that will be used
to decide whether a component should be executed or not.
Hakimovich99 pushed a commit that referenced this pull request Oct 16, 2023
Related to #313 #292 

The cache key is a unique identifier for the component that will be used
to decide whether a component should be executed or not.
@RobbeSneyders RobbeSneyders deleted the partial-pipeline-execution branch January 11, 2024 09:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Core Core framework
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants