Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor local runner and add run command #234

Merged
merged 7 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
294 changes: 174 additions & 120 deletions fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import logging
import shutil
import sys
import typing as t

from fondant.compiler import DockerCompiler
from fondant.explorer import (
Expand All @@ -28,91 +27,77 @@
run_explorer_app,
)
from fondant.pipeline import Pipeline
from fondant.runner import DockerRunner

logging.basicConfig(level=logging.INFO)
cli = argparse.ArgumentParser(description="Fondant CLI")
subparsers = cli.add_subparsers()
logger = logging.getLogger(__name__)


def entrypoint():
"""Entrypoint for the fondant CLI."""
parser = argparse.ArgumentParser(description="Fondant CLI")
subparsers = parser.add_subparsers()
register_explore(subparsers)
register_compile(subparsers)
register_run(subparsers)

sys.path.append(".")
args = cli.parse_args()

args = parser.parse_args(sys.argv[1:])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
args = parser.parse_args(sys.argv[1:])
args = parser.parse_args()

I don't think explicitly providing the arguments is necessary.

args.func(args)


def argument(*name_or_flags, **kwargs):
"""Helper function to create an argument tuple for the subcommand decorator."""
return (list(name_or_flags), kwargs)


def distill_arguments(args: argparse.Namespace, remove: t.Optional[t.List[str]] = None):
"""Helper function to distill arguments to be passed on to the function."""
args_dict = vars(args)
args_dict.pop("func")
if remove is not None:
for arg in remove:
args_dict.pop(arg)
return args_dict


def subcommand(name, parent_parser=subparsers, help=None, args=None):
"""Decorator to add a subcommand to the CLI."""

def decorator(func):
parser = parent_parser.add_parser(name, help=help)
if args is not None:
for arg in args:
parser.add_argument(*arg[0], **arg[1])
parser.set_defaults(func=func)

return decorator


@subcommand(
"explore",
help="Explore a fondant pipeline",
args=[
argument(
"--data-directory",
"-d",
help="""Path to the source directory that contains the data produced
by a fondant pipeline.""",
required=False,
type=str,
),
argument(
"--container",
"-r",
default=DEFAULT_CONTAINER,
help="Docker container to use. Defaults to ghcr.io/ml6team/data_explorer.",
),
argument("--tag", "-t", default=DEFAULT_TAG, help="Docker image tag to use."),
argument(
"--port",
"-p",
default=DEFAULT_PORT,
type=int,
help="Port to expose the container on.",
),
argument(
"--credentials",
"-c",
help="""Path mapping of the source (local) and target (docker file system)
credential paths in the format of src:target
\nExamples:\n
Google Cloud: $HOME/.config/gcloud/application_default_credentials.json:/root/."
+ "config/gcloud/application_default_credentials.json
AWS: HOME/.aws/credentials:/root/.aws/credentials
More info on
Google Cloud:
https://cloud.google.com/docs/authentication/application-default-credentials
AWS: https: // docs.aws.amazon.com/sdkref/latest/guide/file-location.html
def register_explore(parent_parser):
parser = parent_parser.add_parser(
"explore",
help="Explore a fondant pipeline.",
)
parser.add_argument(
"--data-directory",
"-d",
help="""Path to the source directory that contains the data produced
by a fondant pipeline.""",
required=False,
type=str,
)
parser.add_argument(
"--container",
"-r",
default=DEFAULT_CONTAINER,
help="Docker container to use. Defaults to ghcr.io/ml6team/data_explorer.",
)
parser.add_argument(
"--tag",
"-t",
default=DEFAULT_TAG,
help="Docker image tag to use.",
)
parser.add_argument(
"--port",
"-p",
default=DEFAULT_PORT,
type=int,
help="Port to expose the container on.",
)
parser.add_argument(
"--credentials",
"-c",
help="""Path mapping of the source (local) and target (docker file system)
credential paths in the format of src:target
\nExamples:\n
Google Cloud: $HOME/.config/gcloud/application_default_credentials.json:/root/."
+ "config/gcloud/application_default_credentials.json
AWS: HOME/.aws/credentials:/root/.aws/credentials
More info on
Google Cloud:
https://cloud.google.com/docs/authentication/application-default-credentials
AWS: https: // docs.aws.amazon.com/sdkref/latest/guide/file-location.html
""",
),
],
)
)

parser.set_defaults(func=explore)


def explore(args):
"""Defines the explore subcommand."""
if not args.data_directory:
logging.warning(
"You have not provided a data directory."
Expand All @@ -121,7 +106,9 @@ def explore(args):
)
else:
logging.info(f"Using data directory: {args.data_directory}")
logging.info("This directory will be mounted to /artifacts in the container.")
logging.info(
"This directory will be mounted to /artifacts in the container.",
)

if not args.credentials:
logging.warning(
Expand All @@ -130,17 +117,123 @@ def explore(args):
)

if not shutil.which("docker"):
logging.error("Docker runtime not found. Please install Docker and try again.")
logging.error(
"Docker runtime not found. Please install Docker and try again.",
)

run_explorer_app(
data_directory=args.data_directory,
container=args.container,
tag=args.tag,
port=args.port,
credentials=args.credentials,
)


def register_compile(parent_parser):
parser = parent_parser.add_parser(
"compile",
help="Compile a fondant pipeline.",
)
parser.add_argument(
"pipeline",
help="Path to the fondant pipeline: path.to.module:instance",
type=pipeline_from_string,
)
# add a mutually exclusive group for the mode
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--local", action="store_true")
mode_group.add_argument("--kubeflow", action="store_true")

parser.add_argument(
"--output-path",
"-o",
help="Output directory",
default="docker-compose.yml",
)
parser.add_argument(
"--extra-volumes",
help="Extra volumes to mount in containers",
nargs="+",
)

parser.set_defaults(func=compile)


def compile(args):
if args.local:
compiler = DockerCompiler()
compiler.compile(
pipeline=args.pipeline,
extra_volumes=args.extra_volumes,
output_path=args.output_path,
)
elif args.kubeflow:
msg = "Kubeflow compiler not implemented"
raise NotImplementedError(msg)

function_args = distill_arguments(args)
run_explorer_app(**function_args)

def register_run(parent_parser):
parser = parent_parser.add_parser(
"run",
help="Run a fondant pipeline.",
)
parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a pipeline instance that will be compiled first""",
action="store",
)
# add a mutually exclusive group for the mode
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument("--local", action="store_true")
mode_group.add_argument("--kubeflow", action="store_true")

parser.add_argument(
"--output-path",
"-o",
help="Output directory",
default="docker-compose.yml",
)
parser.add_argument(
"--extra-volumes",
help="Extra volumes to mount in containers",
nargs="+",
)
parser.set_defaults(func=run)


def run(args):
if args.local:
try:
pipeline = pipeline_from_string(args.ref)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need the try except around this line I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move the logic around but I believe this is the clearest. Try to see if it is an importstring if it is then compile it otherwise assume it is a reference to an already compiled pipeline.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use

try:
    ...
except:
    ...
else:
    ...
finally:
    ...


spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = DockerCompiler()
compiler.compile(
pipeline=pipeline,
extra_volumes=args.extra_volumes,
output_path=spec_ref,
)

except ImportFromStringError:
spec_ref = args.ref

DockerRunner().run(spec_ref)
elif args.kubeflow:
msg = "Kubeflow runner not implemented"
raise NotImplementedError(msg)


class ImportFromStringError(Exception):
pass
"""Error raised when an import string is not valid."""


def pipeline_from_string(import_string: str) -> Pipeline:
"""Try to import a pipeline from a string otherwise raise an ImportFromStringError."""
module_str, _, attr_str = import_string.rpartition(":")
if not attr_str or not module_str:
raise ImportFromStringError(
Expand Down Expand Up @@ -168,42 +261,3 @@ def pipeline_from_string(import_string: str) -> Pipeline:
raise ImportFromStringError(msg)

return instance


@subcommand(
"compile",
help="Compile a fondant pipeline",
args=[
argument(
"pipeline",
help="Path to the fondant pipeline: path.to.module:instance",
type=pipeline_from_string,
),
argument(
"--mode",
"-m",
help="Mode to run the pipeline in. Defaults to 'local'",
default="local",
choices=["local", "kubeflow"],
),
argument(
"--output-path",
"-o",
help="Output directory",
default="docker-compose.yml",
),
argument(
"--extra-volumes",
help="Extra volumes to mount in containers",
nargs="+",
),
],
)
def compile(args):
if args.mode == "local":
compiler = DockerCompiler()
function_args = distill_arguments(args, remove=["mode"])
compiler.compile(**function_args)
else:
msg = "Kubeflow mode is not implemented yet."
raise NotImplementedError(msg)
18 changes: 18 additions & 0 deletions fondant/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import subprocess # nosec
from abc import ABC, abstractmethod


class Runner(ABC):
"""Abstract base class for a runner."""

@abstractmethod
def run(self, *args, **kwargs):
"""Abstract method to invoke running."""


class DockerRunner(Runner):
def run(cls, input_spec: str, *args, **kwargs):
"""Run a docker-compose spec."""
cmd = ["docker", "compose", "-f", input_spec, "up", "--build"]

subprocess.call(cmd, stdout=subprocess.PIPE) # nosec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
subprocess.call(cmd, stdout=subprocess.PIPE) # nosec
subprocess.call(cmd) # nosec

As discussed, this intercepts the build logs and docker compose log colors.

1 change: 1 addition & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"fondant --help",
"fondant explore --help",
"fondant compile --help",
"fondant run --help",
]


Expand Down