Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expand cli to support kfp compiling and running #366

Merged
merged 1 commit into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import sys
import textwrap

from fondant.compiler import DockerCompiler
from fondant.compiler import DockerCompiler, KubeFlowCompiler
from fondant.explorer import (
DEFAULT_CONTAINER,
DEFAULT_PORT,
DEFAULT_TAG,
run_explorer_app,
)
from fondant.pipeline import Pipeline
from fondant.runner import DockerRunner
from fondant.runner import DockerRunner, KubeflowRunner

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -141,11 +141,7 @@ def register_explore(parent_parser):

def explore(args):
if not args.data_directory:
logging.warning(
"You have not provided a data directory."
+ "To access local files, provide a local data directory"
+ " with the --data-directory flag.",
)
logging.error("")
else:
logging.info(f"Using data directory: {args.data_directory}")
logging.info(
Expand Down Expand Up @@ -237,8 +233,8 @@ def compile(args):
build_args=args.build_arg,
)
elif args.kubeflow:
msg = "Kubeflow compiler not implemented"
raise NotImplementedError(msg)
compiler = KubeFlowCompiler()
compiler.compile(pipeline=args.pipeline, output_path=args.output_path)


def register_run(parent_parser):
Expand Down Expand Up @@ -287,6 +283,7 @@ def register_run(parent_parser):
action="append",
help="Build arguments to pass to `docker build`. Format {key}={value}.",
)
parser.add_argument("--host", help="KubeFlow pipeline host url", required=False)
parser.set_defaults(func=run)


Expand All @@ -311,8 +308,23 @@ def run(args):
finally:
DockerRunner().run(spec_ref)
elif args.kubeflow:
msg = "Kubeflow runner not implemented"
raise NotImplementedError(msg)
if not args.host:
msg = "--host argument is required for running on Kubeflow"
raise ValueError(msg)
try:
pipeline = pipeline_from_string(args.ref)
except ImportFromStringError:
spec_ref = args.ref
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = KubeFlowCompiler()
compiler.compile(pipeline=pipeline, output_path=spec_ref)
finally:
runner = KubeflowRunner(host=args.host)
runner.run(input_spec=spec_ref)


class ImportFromStringError(Exception):
Expand Down
58 changes: 50 additions & 8 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,32 @@ def test_pipeline_from_string_error(import_string):
pipeline_from_string(import_string)


def test_compile_logic(tmp_path_factory):
def test_local_compile(tmp_path_factory):
"""Test that the compile command works with arguments."""
with tmp_path_factory.mktemp("temp") as fn:
args = argparse.Namespace(
local=True,
kubeflow=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might make more sense to merge these options (especially if we have more in the future):

engine = self.parser.add_mutually_exclusive_group()
engine.add_argument('--local',
                    help='Run locally',
                    dest='engine',
                    action='store_const',
                    const='local')
engine.add_argument('--kfp',
                    help='Run on kfp',
                    dest='engine',
                    action='store_const',
                    const='kfp')

It's then available as args.engine with a value of either "local" or "kfp"

pipeline=TEST_PIPELINE,
output_path=str(fn / "docker-compose.yml"),
extra_volumes=[],
build_arg=[],
)
compile(args)
args2 = argparse.Namespace(kubeflow=True, local=False, ref="some/path")
with pytest.raises(NotImplementedError):
compile(args2)


def test_run_logic(tmp_path_factory):
def test_kfp_compile(tmp_path_factory):
with tmp_path_factory.mktemp("temp") as fn:
args = argparse.Namespace(
kubeflow=True,
local=False,
pipeline=TEST_PIPELINE,
output_path=str(fn / "kubeflow_pipelines.yml"),
)
compile(args)


def test_local_run(tmp_path_factory):
"""Test that the run command works with different arguments."""
args = argparse.Namespace(local=True, ref="some/path")
with patch("subprocess.call") as mock_call:
Expand Down Expand Up @@ -103,6 +112,39 @@ def test_run_logic(tmp_path_factory):
"--remove-orphans",
],
)
args2 = argparse.Namespace(kubeflow=True, local=False, ref="some/path")
with pytest.raises(NotImplementedError):
run(args2)


def test_kfp_run(tmp_path_factory):
"""Test that the run command works in different scenarios."""
args = argparse.Namespace(
kubeflow=True,
local=False,
ref="some/path",
host=None,
)
with pytest.raises(
ValueError,
match="--host argument is required for running on Kubeflow",
): # no host
run(args)
with patch("fondant.cli.KubeflowRunner") as mock_runner:
args = argparse.Namespace(
kubeflow=True,
local=False,
host="localhost",
ref="some/path",
)
run(args)
mock_runner.assert_called_once_with(host="localhost")
with patch("fondant.cli.KubeflowRunner") as mock_runner, tmp_path_factory.mktemp(
"temp",
) as fn:
args = argparse.Namespace(
kubeflow=True,
local=False,
host="localhost2",
output_path=str(fn / "kubeflow_pipelines.yml"),
ref=__name__ + ":TEST_PIPELINE",
)
run(args)
mock_runner.assert_called_once_with(host="localhost2")
Loading