Skip to content

Commit

Permalink
First implementation of DockerCompiler (#194)
Browse files Browse the repository at this point in the history
Known TODO's:

- support for build in docker compose
- update documentation
- update examples to include dockercompiler
- update pipeline.py and move compile functionality to a
KubeflowCompiler
  • Loading branch information
GeorgesLorre authored Jun 14, 2023
1 parent 0e8ebb8 commit c286c30
Show file tree
Hide file tree
Showing 4 changed files with 355 additions and 12 deletions.
75 changes: 63 additions & 12 deletions docs/pipeline.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
# Pipeline

A Fondant pipeline is a tool for building complex workflows by creating a Directed Acyclic Graph (DAG) of different components that need to be executed. With Fondant, you can use both reusable components and custom components to construct your pipeline. Fondant extends the functionality of Kubeflow and provides additional features to simplify pipeline development.
A Fondant pipeline is a tool for building complex workflows by creating a Directed Acyclic Graph (DAG) of different components that need to be executed. With Fondant, you can use both reusable components and custom components to construct your pipeline. In order to build a pipeline you register components on it with dependencies (if any) and Fondant will construct the graph automatically.


## Building a pipeline
## Composing a pipeline

To build a pipeline, you need to define a set of component operations called `ComponentOp`. A component operation encapsulates the specifications of the component and its runtime configuration.

The component specifications include the location of the Docker image in the artifact registry. It is important to ensure that your Kubeflow service account has access to the image registry if it is not public.
The component specifications include the location of the Docker image in a registry.

The runtime configuration consists of the component's arguments and the definition of node pools and resources. For example, if a component requires GPU for model inference, you can specify the necessary GPU resources in the runtime configuration.

Here is an example of how to build and compile a pipeline using Fondant:
Here is an example of how to build a pipeline using Fondant:
```python
from fondant.pipeline import ComponentOp, Pipeline, Client

Expand All @@ -37,17 +36,69 @@ def build_pipeline():
pipeline.add_op(caption_images_op, dependencies=load_from_hub_op)
return pipeline

if __name__ == "__main__":
client = Client(host="https://kfp-host.com/")
pipeline = build_pipeline()
client.compile_and_run(pipeline=pipeline)
```

In the example above, we first define our pipeline by providing a name as an identifier and a base path where the pipeline run artifacts will be stored. The base path can be a remote cloud location or a local directory, which is useful for local development.

Next, we define two operations: `load_from_hub_op`, which is a based from a reusable component loaded from the Fondant registry, and `caption_images_op`, which is a custom component defined by you. We add these components to the pipeline using the `.add_op()` method and specify the dependencies between components to build the DAG.

Please note that currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.

Once the pipeline is built, you need to initialize the client with the kubeflow host path (more info about the host path can be found in the [infrastructure documentation](https://github.com/ml6team/fondant/blob/main/docs/infrastructure.md))
and use it to compile and run the pipeline with the `compile_and_run()` method. This performs static checking to ensure that all required arguments are provided to the components and that the required input data subsets are available. If the checks pass, a URL will be provided, allowing you to visualize and monitor the execution of your pipeline.
!!! note "IMPORTANT"
Currently Fondant supports linear DAGs with single dependencies. Support for non-linear DAGs will be available in future releases.



## Compiling a pipeline

Once all your components are added to your pipeline you can use different compilers to run your pipeline:

### Kubeflow
TODO: update this once kubeflow compiler is implemented

~~Once the pipeline is built, you need to initialize the client with the kubeflow host path (more info about the host path can be found in the [infrastructure documentation](https://github.com/ml6team/fondant/blob/main/docs/infrastructure.md))
and use it to compile and run the pipeline with the `compile_and_run()` method. This performs static checking to ensure that all required arguments are provided to the components and that the required input data subsets are available. If the checks pass, a URL will be provided, allowing you to visualize and monitor the execution of your pipeline.~~

### Docker-Compose

The DockerCompiler will take your pipeline and create a docker-compose.yml file where every component is added as a service with the correct dependencies by leveraging the `depends_on` functionality and the `service_completed_successfully` status. See the basic example below:

```yaml
version: '3.8'
services:
component_1:
command: ["python", "main.py"]
image: component_1:latest
component_2:
command: ["python", "main.py"]
image: component_2:latest
depends_on:
component_1:
condition: service_completed_successfully
component_3:
command: ["python", "main.py"]
depends_on:
component_2:
condition: service_completed_successfully
image: component_3:latest
```
In order to compile your pipeline to a `docker-compose` spec you need to import the `DockerCompiler`

```python
from fondant.compiler import DockerCompiler
compiler = DockerCompiler()
compiler.compile(pipeline=pipeline)
```

The DockerCompiler will try to see if the `base_path` of the pipeline is local or remote. If local the `base_path` will be mounted as a bind volume on every service/component.


#### Running a Docker compiled pipeline

Navigate to the folder where your docker compose is located and run (you need to have [docker-compose](https://docs.docker.com/compose/install/) installed)
```bash
docker compose up
```

This will start the pipeline and provide logs per component(service)
135 changes: 135 additions & 0 deletions fondant/compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import json
import logging
import typing as t
from abc import ABC, abstractmethod
from dataclasses import asdict, dataclass
from pathlib import Path

import yaml

from fondant.pipeline import Pipeline

logger = logging.getLogger(__name__)


class Compiler(ABC):
"""Abstract base class for a compiler."""

@abstractmethod
def compile(self, *args, **kwargs):
"""Abstract method to invoke compilation."""


@dataclass
class DockerVolume:
"""Dataclass representing a DockerVolume.
(https://docs.docker.com/compose/compose-file/05-services/#volumes).
Args:
type: the mount type volume (bind, volume)
source: the source of the mount, a path on the host for a bind mount
target: the path in the container where the volume is mounted.
"""

type: str
source: str
target: str


@dataclass
class MetaData:
"""Dataclass representing the metadata arguments of a pipeline.
Args:
run_id: identifier of the current pipeline run
base_path: the base path used to store the artifacts.
"""

run_id: str
base_path: str


class DockerCompiler(Compiler):
"""Compiler that creates a docker-compose spec from a pipeline."""

def compile(
self, pipeline: Pipeline, output_path: str = "docker-compose.yml"
) -> None:
"""Compile a pipeline to docker-compose spec and save it to a specified output path."""
logger.info(f"Compiling {pipeline.name} to docker-compose.yml")
spec = self._generate_spec(pipeline=pipeline)
with open(output_path, "w") as outfile:
yaml.safe_dump(spec, outfile)
logger.info(f"Successfully compiled to {output_path}")

@staticmethod
def _safe_component_name(component_name: str) -> str:
"""Transform a component name to a docker-compose friendly one.
eg: `Component A` -> `component_a`.
"""
return component_name.replace(" ", "_").lower()

def _patch_path(self, base_path: str) -> t.Tuple[str, t.Optional[DockerVolume]]:
"""Helper that checks if the base_path is local or remote,
if local it patches the base_path and prepares a bind mount
Returns a tuple containing the path and volume.
"""
p_base_path = Path(base_path)
# check if base path is an existing local folder
if p_base_path.exists():
volume = DockerVolume(
type="bind", source=str(p_base_path), target=f"/{p_base_path.stem}"
)
path = f"/{p_base_path.stem}"
logger.info(f"Base path set to: {path}")
else:
volume = None
path = base_path
return (path, volume)

def _generate_spec(self, pipeline: Pipeline) -> dict:
"""Generate a docker-compose spec as a python dictionary,
loops over the pipeline graph to create services and their dependencies.
"""
path, volume = self._patch_path(base_path=pipeline.base_path)
metadata = MetaData(run_id=pipeline.name, base_path=path)

services = {}

for component_name, component in pipeline._graph.items():
logger.info(f"Compiling service for {component_name}")
safe_component_name = self._safe_component_name(component_name)

component_op = component["fondant_component_op"]

# add metadata argument to command
command = ["--metadata", json.dumps(asdict(metadata))]

# add in and out manifest paths to command
command.extend(["--output_manifest_path", f"{path}/manifest.txt"])

# add arguments if any to command
for key, value in component_op.arguments.items():
command.extend([f"--{key}", f"{value}"])

# resolve dependencies
depends_on = {}
if component["dependencies"]:
# there is only an input manifest if the component has dependencies
command.extend(["--input_manifest_path", f"{path}/manifest.txt"])
for dependency in component["dependencies"]:
safe_dependency = self._safe_component_name(dependency)
depends_on[safe_dependency] = {
"condition": "service_completed_successfully"
}

volumes = [asdict(volume)] if volume else []

services[safe_component_name] = {
"image": component_op.component_spec.image,
"command": command,
"depends_on": depends_on,
"volumes": volumes,
}

return {"version": "3.8", "services": services}
43 changes: 43 additions & 0 deletions tests/example_pipelines/compiled_pipeline/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
services:
first_component:
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
- --output_manifest_path
- /foo/bar/manifest.txt
- --storage_args
- a dummy string arg
depends_on: {}
image: example_component:latest
volumes: []
second_component:
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
- --output_manifest_path
- /foo/bar/manifest.txt
- --storage_args
- a dummy string arg
- --input_manifest_path
- /foo/bar/manifest.txt
depends_on:
first_component:
condition: service_completed_successfully
image: example_component:latest
volumes: []
third_component:
command:
- --metadata
- '{"run_id": "test_pipeline", "base_path": "/foo/bar"}'
- --output_manifest_path
- /foo/bar/manifest.txt
- --storage_args
- a dummy string arg
- --input_manifest_path
- /foo/bar/manifest.txt
depends_on:
second_component:
condition: service_completed_successfully
image: example_component:latest
volumes: []
version: '3.8'
114 changes: 114 additions & 0 deletions tests/test_compiler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from pathlib import Path

import pytest
import yaml

from fondant.compiler import DockerCompiler
from fondant.pipeline import ComponentOp, Pipeline

COMPONENTS_PATH = Path(__file__).parent / "example_pipelines/valid_pipeline"

VALID_DOCKER_PIPELINE = (
Path(__file__).parent / "example_pipelines/compiled_pipeline/docker-compose.yml"
)

TEST_PIPELINES = [
(
"example_1",
["first_component.yaml", "second_component.yaml", "third_component.yaml"],
),
]


@pytest.fixture(params=TEST_PIPELINES)
def pipeline(request, tmp_path, monkeypatch):
pipeline = Pipeline(
pipeline_name="test_pipeline",
pipeline_description="description of the test pipeline",
base_path="/foo/bar",
)
example_dir, component_specs = request.param

component_args = {"storage_args": "a dummy string arg"}
components_path = Path(COMPONENTS_PATH / example_dir)

prev_comp = None
for component_spec in component_specs:
component_op = ComponentOp(
Path(components_path / component_spec), arguments=component_args
)
pipeline.add_op(component_op, dependencies=prev_comp)
prev_comp = component_op

pipeline.compile()

# override the default package_path with temporary path to avoid the creation of artifacts
monkeypatch.setattr(pipeline, "package_path", str(tmp_path / "test_pipeline.tgz"))

return pipeline


def test_docker_compiler(pipeline, tmp_path_factory):
"""Test compiling a pipeline to docker-compose."""
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
output_path = str(fn / "docker-compose.yml")
compiler.compile(pipeline=pipeline, output_path=output_path)
with open(output_path, "r") as src, open(VALID_DOCKER_PIPELINE, "r") as truth:
assert src.read() == truth.read()


def test_docker_local_path(pipeline, tmp_path_factory):
"""Test that a local path is applied correctly as a volume and in the arguments."""
# volumes are only create for local existing directories
with tmp_path_factory.mktemp("temp") as fn:
# this is the directory mounted in the container
work_dir = f"/{fn.stem}"
pipeline.base_path = str(fn)
compiler = DockerCompiler()
compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml")

# read the generated docker-compose file
with open(fn / "docker-compose.yml") as f_spec:
spec = yaml.safe_load(f_spec)

for service in spec["services"].values():
# check if volumes are defined correctly
assert service["volumes"] == [
{
"source": str(fn),
"target": work_dir,
"type": "bind",
}
]
# check if commands are patched to use the working dir
commands_with_dir = [
f"{work_dir}/manifest.txt",
f'{{"run_id": "test_pipeline", "base_path": "{work_dir}"}}',
]
for command in commands_with_dir:
assert command in service["command"]


def test_docker_remote_path(pipeline, tmp_path_factory):
"""Test that a remote path is applied correctly in the arguments and no volume."""
remote_dir = "gs://somebucket/artifacts"
pipeline.base_path = remote_dir
compiler = DockerCompiler()
with tmp_path_factory.mktemp("temp") as fn:
compiler.compile(pipeline=pipeline, output_path=fn / "docker-compose.yml")

# read the generated docker-compose file
with open(fn / "docker-compose.yml") as f_spec:
spec = yaml.safe_load(f_spec)

for service in spec["services"].values():
# check that no volumes are created
assert service["volumes"] == []
# check if commands are patched to use the remote dir
commands_with_dir = [
f"{remote_dir}/manifest.txt",
f'{{"run_id": "test_pipeline", "base_path": "{remote_dir}"}}',
]
for command in commands_with_dir:
assert command in service["command"]

0 comments on commit c286c30

Please sign in to comment.