From 6c3335b47b3ecbac70f10f562440f5fc9254d0f3 Mon Sep 17 00:00:00 2001 From: Georges Lorre Date: Thu, 17 Aug 2023 11:41:23 +0200 Subject: [PATCH] Expand cli to support kfp compiling and running --- src/fondant/cli.py | 34 ++++++++++++++++++--------- tests/test_cli.py | 58 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 73 insertions(+), 19 deletions(-) diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 5eecf404d..1afa56f6a 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -21,7 +21,7 @@ import sys import textwrap -from fondant.compiler import DockerCompiler +from fondant.compiler import DockerCompiler, KubeFlowCompiler from fondant.explorer import ( DEFAULT_CONTAINER, DEFAULT_PORT, @@ -29,7 +29,7 @@ run_explorer_app, ) from fondant.pipeline import Pipeline -from fondant.runner import DockerRunner +from fondant.runner import DockerRunner, KubeflowRunner logger = logging.getLogger(__name__) @@ -141,11 +141,7 @@ def register_explore(parent_parser): def explore(args): if not args.data_directory: - logging.warning( - "You have not provided a data directory." - + "To access local files, provide a local data directory" - + " with the --data-directory flag.", - ) + logging.error("") else: logging.info(f"Using data directory: {args.data_directory}") logging.info( @@ -237,8 +233,8 @@ def compile(args): build_args=args.build_arg, ) elif args.kubeflow: - msg = "Kubeflow compiler not implemented" - raise NotImplementedError(msg) + compiler = KubeFlowCompiler() + compiler.compile(pipeline=args.pipeline, output_path=args.output_path) def register_run(parent_parser): @@ -287,6 +283,7 @@ def register_run(parent_parser): action="append", help="Build arguments to pass to `docker build`. Format {key}={value}.", ) + parser.add_argument("--host", help="KubeFlow pipeline host url", required=False) parser.set_defaults(func=run) @@ -311,8 +308,23 @@ def run(args): finally: DockerRunner().run(spec_ref) elif args.kubeflow: - msg = "Kubeflow runner not implemented" - raise NotImplementedError(msg) + if not args.host: + msg = "--host argument is required for running on Kubeflow" + raise ValueError(msg) + try: + pipeline = pipeline_from_string(args.ref) + except ImportFromStringError: + spec_ref = args.ref + else: + spec_ref = args.output_path + logging.info( + "Found reference to un-compiled pipeline... compiling to {spec_ref}", + ) + compiler = KubeFlowCompiler() + compiler.compile(pipeline=pipeline, output_path=spec_ref) + finally: + runner = KubeflowRunner(host=args.host) + runner.run(input_spec=spec_ref) class ImportFromStringError(Exception): diff --git a/tests/test_cli.py b/tests/test_cli.py index 35f401dcc..b8253e089 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -46,23 +46,32 @@ def test_pipeline_from_string_error(import_string): pipeline_from_string(import_string) -def test_compile_logic(tmp_path_factory): +def test_local_compile(tmp_path_factory): """Test that the compile command works with arguments.""" with tmp_path_factory.mktemp("temp") as fn: args = argparse.Namespace( local=True, + kubeflow=False, pipeline=TEST_PIPELINE, output_path=str(fn / "docker-compose.yml"), extra_volumes=[], build_arg=[], ) compile(args) - args2 = argparse.Namespace(kubeflow=True, local=False, ref="some/path") - with pytest.raises(NotImplementedError): - compile(args2) -def test_run_logic(tmp_path_factory): +def test_kfp_compile(tmp_path_factory): + with tmp_path_factory.mktemp("temp") as fn: + args = argparse.Namespace( + kubeflow=True, + local=False, + pipeline=TEST_PIPELINE, + output_path=str(fn / "kubeflow_pipelines.yml"), + ) + compile(args) + + +def test_local_run(tmp_path_factory): """Test that the run command works with different arguments.""" args = argparse.Namespace(local=True, ref="some/path") with patch("subprocess.call") as mock_call: @@ -103,6 +112,39 @@ def test_run_logic(tmp_path_factory): "--remove-orphans", ], ) - args2 = argparse.Namespace(kubeflow=True, local=False, ref="some/path") - with pytest.raises(NotImplementedError): - run(args2) + + +def test_kfp_run(tmp_path_factory): + """Test that the run command works in different scenarios.""" + args = argparse.Namespace( + kubeflow=True, + local=False, + ref="some/path", + host=None, + ) + with pytest.raises( + ValueError, + match="--host argument is required for running on Kubeflow", + ): # no host + run(args) + with patch("fondant.cli.KubeflowRunner") as mock_runner: + args = argparse.Namespace( + kubeflow=True, + local=False, + host="localhost", + ref="some/path", + ) + run(args) + mock_runner.assert_called_once_with(host="localhost") + with patch("fondant.cli.KubeflowRunner") as mock_runner, tmp_path_factory.mktemp( + "temp", + ) as fn: + args = argparse.Namespace( + kubeflow=True, + local=False, + host="localhost2", + output_path=str(fn / "kubeflow_pipelines.yml"), + ref=__name__ + ":TEST_PIPELINE", + ) + run(args) + mock_runner.assert_called_once_with(host="localhost2")