diff --git a/src/fondant/cli.py b/src/fondant/cli.py index 4422eba6b..7b412857a 100644 --- a/src/fondant/cli.py +++ b/src/fondant/cli.py @@ -311,13 +311,16 @@ def register_compile(parent_parser): name="vertex", help="vertex compiler", ) + sagemaker_parser = compiler_subparser.add_parser( + name="sagemaker", + help="Sagemaker compiler", + ) # Local runner parser local_parser.add_argument( "ref", - help="""Reference to the pipeline to run, can be a path to a spec file or - a module containing the pipeline instance that will be compiled first (e.g. pipeline.py) - """, + help="""Reference to the pipeline to run, a path to a to a module containing + the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""", action="store", ) local_parser.add_argument( @@ -364,9 +367,8 @@ def register_compile(parent_parser): # Kubeflow parser kubeflow_parser.add_argument( "ref", - help="""Reference to the pipeline to run, can be a path to a spec file or - a module containing the pipeline instance that will be compiled first (e.g. pipeline.py) - """, + help="""Reference to the pipeline to run, a path to a to a module containing + the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""", action="store", ) kubeflow_parser.add_argument( @@ -379,9 +381,8 @@ def register_compile(parent_parser): # vertex parser vertex_parser.add_argument( "ref", - help="""Reference to the pipeline to run, can be a path to a spec file or - a module containing the pipeline instance that will be compiled first (e.g. pipeline.py) - """, + help="""Reference to the pipeline to run, a path to a to a module containing + the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""", action="store", ) vertex_parser.add_argument( @@ -391,9 +392,36 @@ def register_compile(parent_parser): default="vertex_pipeline.yml", ) + # sagemaker parser + sagemaker_parser.add_argument( + "ref", + help="""Reference to the pipeline to run, a path to a to a module containing + the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""", + action="store", + ) + sagemaker_parser.add_argument( + "--output-path", + "-o", + help="Output path of compiled pipeline", + default=".fondant/sagemaker_pipeline.json", + ) + sagemaker_parser.add_argument( + "--instance-type", + help="""the instance type to use for the processing steps + (see: https://aws.amazon.com/ec2/instance-types/ for options).""", + default="ml.m5.large", + ) + + sagemaker_parser.add_argument( + "--role-arn", + help="""the Amazon Resource Name role to use for the processing steps""", + default=None, + ) + local_parser.set_defaults(func=compile_local) kubeflow_parser.set_defaults(func=compile_kfp) vertex_parser.set_defaults(func=compile_vertex) + sagemaker_parser.set_defaults(func=compile_sagemaker) def compile_local(args): @@ -434,6 +462,19 @@ def compile_vertex(args): compiler.compile(pipeline=pipeline, output_path=args.output_path) +def compile_sagemaker(args): + from fondant.pipeline.compiler import SagemakerCompiler + + pipeline = pipeline_from_module(args.ref) + compiler = SagemakerCompiler() + compiler.compile( + pipeline=pipeline, + output_path=args.output_path, + instance_type=args.instance_type, + role_arn=args.role_arn, + ) + + def register_run(parent_parser): parser = parent_parser.add_parser( "run", @@ -467,6 +508,10 @@ def register_run(parent_parser): name="vertex", help="Vertex runner", ) + sagemaker_parser = runner_subparser.add_parser( + name="sagemaker", + help="Sagemaker runner", + ) # Local runner parser local_parser.add_argument( @@ -476,12 +521,6 @@ def register_run(parent_parser): """, action="store", ) - local_parser.add_argument( - "--output-path", - "-o", - help="Output path of compiled pipeline", - default="docker-compose.yml", - ) local_parser.add_argument( "--extra-volumes", nargs="+", @@ -572,6 +611,32 @@ def register_run(parent_parser): default=None, ) + # sagemaker runner parser + sagemaker_parser.add_argument( + "ref", + help="""Reference to the pipeline to run, can be a path to a spec file or + a module containing the pipeline instance that will be compiled first (e.g. pipeline.py) + """, + action="store", + ) + sagemaker_parser.add_argument( + "--pipeline-name", + help="""the name of the sagemaker pipeline to create""", + default="fondant-pipeline", + ) + + sagemaker_parser.add_argument( + "--role-arn", + help="""the Amazon Resource Name role to use for the processing steps""", + default=None, + ) + sagemaker_parser.add_argument( + "--instance-type", + help="""the instance type to use for the processing steps + (see: https://aws.amazon.com/ec2/instance-types/ for options).""", + default="ml.m5.large", + ) + local_parser.set_defaults(func=run_local) kubeflow_parser.set_defaults(func=run_kfp) vertex_parser.set_defaults(func=run_vertex) @@ -656,6 +721,23 @@ def run_vertex(args): raise e +def run_sagemaker(args): + from fondant.pipeline.runner import SagemakerRunner + + try: + ref = pipeline_from_module(args.ref) + except ModuleNotFoundError: + ref = args.ref + finally: + runner = SagemakerRunner() + runner.run( + input=ref, + pipeline_name=args.pipeline_name, + role_arn=args.role_arn, + instance_type=args.instance_type, + ) + + def register_execute(parent_parser): parser = parent_parser.add_parser( "execute", diff --git a/src/fondant/pipeline/compiler.py b/src/fondant/pipeline/compiler.py index b0ccf2b02..dc00528ec 100644 --- a/src/fondant/pipeline/compiler.py +++ b/src/fondant/pipeline/compiler.py @@ -591,7 +591,7 @@ def compile( Args: pipeline: the pipeline to compile - output_path: the path where to save the Kubeflow pipeline spec. + output_path: the path where to save the sagemaker pipeline spec. instance_type: the instance type to use for the processing steps (see: https://aws.amazon.com/ec2/instance-types/ for options). role_arn: the Amazon Resource Name role to use for the processing steps, diff --git a/src/fondant/pipeline/runner.py b/src/fondant/pipeline/runner.py index f1b9a044a..2ef18429d 100644 --- a/src/fondant/pipeline/runner.py +++ b/src/fondant/pipeline/runner.py @@ -170,7 +170,6 @@ class SagemakerRunner(Runner): def __init__(self): self.__resolve_imports() self.client = self.boto3.client("sagemaker") - self.compiler = SagemakerCompiler() def __resolve_imports(self): try: @@ -208,7 +207,8 @@ def run( logging.info( "Found reference to un-compiled pipeline... compiling", ) - self.compiler.compile( + compiler = SagemakerCompiler() + compiler.compile( input, output_path=output_path, instance_type=instance_type, diff --git a/tests/pipeline/test_runner.py b/tests/pipeline/test_runner.py index 16a58db9b..7da895b98 100644 --- a/tests/pipeline/test_runner.py +++ b/tests/pipeline/test_runner.py @@ -183,29 +183,27 @@ def test_sagemaker_runner(tmp_path_factory): ] +class MockSagemakerCompiler: + def compile( + self, + pipeline, + output_path, + *, + instance_type, + role_arn, + ) -> None: + with open(output_path, "w") as f: + f.write("foo: bar") + + def test_sagemaker_runner_from_pipeline(): with mock.patch( "fondant.pipeline.runner.SagemakerCompiler", - ) as mock_compiler, mock.patch("boto3.client", spec=True): - mock_compiler.configure_mock( - **{ - "compile.side_effect": open( # noqa: SIM115 - ".fondant/sagemaker-pipeline.yaml", - "w", - ).write("foo: bar"), - }, - ) + new=MockSagemakerCompiler, + ), mock.patch("boto3.client", spec=True): runner = SagemakerRunner() runner.run( input=PIPELINE, pipeline_name=PIPELINE.name, role_arn="arn:something", ) - assert runner.compiler.method_calls == [ - mock.call.compile( - PIPELINE, - output_path=".fondant/sagemaker-pipeline.yaml", - instance_type="ml.m5.xlarge", - role_arn="arn:something", - ), - ] diff --git a/tests/test_cli.py b/tests/test_cli.py index cd5b58768..9da652a72 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -10,6 +10,7 @@ build, compile_kfp, compile_local, + compile_sagemaker, compile_vertex, component_from_module, execute, @@ -213,6 +214,29 @@ def test_vertex_compile(tmp_path_factory): ) +def test_sagemaker_compile(tmp_path_factory): + with tmp_path_factory.mktemp("temp") as fn, patch( + "fondant.pipeline.compiler.SagemakerCompiler.compile", + ) as mock_compiler: + args = argparse.Namespace( + ref=__name__, + kubeflow=False, + local=False, + vertex=False, + sagemaker=True, + output_path=str(fn / "sagemaker_pipeline.json"), + role_arn="some_role", + instance_type="some_instance_type", + ) + compile_sagemaker(args) + mock_compiler.assert_called_once_with( + pipeline=TEST_PIPELINE, + output_path=str(fn / "sagemaker_pipeline.json"), + instance_type="some_instance_type", + role_arn="some_role", + ) + + def test_local_run(): """Test that the run command works with different arguments.""" args = argparse.Namespace(