Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cli commands for sagemaker #680

Merged
merged 6 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 97 additions & 15 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,13 +311,16 @@ def register_compile(parent_parser):
name="vertex",
help="vertex compiler",
)
sagemaker_parser = compiler_subparser.add_parser(
name="sagemaker",
help="Sagemaker compiler",
)

# Local runner parser
local_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
help="""Reference to the pipeline to run, a path to a to a module containing
the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""",
action="store",
)
local_parser.add_argument(
Expand Down Expand Up @@ -364,9 +367,8 @@ def register_compile(parent_parser):
# Kubeflow parser
kubeflow_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
help="""Reference to the pipeline to run, a path to a to a module containing
the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""",
action="store",
)
kubeflow_parser.add_argument(
Expand All @@ -379,9 +381,8 @@ def register_compile(parent_parser):
# vertex parser
vertex_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
help="""Reference to the pipeline to run, a path to a to a module containing
the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""",
action="store",
)
vertex_parser.add_argument(
Expand All @@ -391,9 +392,36 @@ def register_compile(parent_parser):
default="vertex_pipeline.yml",
)

# sagemaker parser
sagemaker_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, a path to a to a module containing
the pipeline instance that will be compiled (e.g. my-project/pipeline.py)""",
action="store",
)
sagemaker_parser.add_argument(
"--output-path",
"-o",
help="Output path of compiled pipeline",
default=".fondant/sagemaker_pipeline.json",
)
sagemaker_parser.add_argument(
"--instance-type",
help="""the instance type to use for the processing steps
(see: https://aws.amazon.com/ec2/instance-types/ for options).""",
default="ml.m5.large",
)

sagemaker_parser.add_argument(
"--role-arn",
help="""the Amazon Resource Name role to use for the processing steps""",
default=None,
)

local_parser.set_defaults(func=compile_local)
kubeflow_parser.set_defaults(func=compile_kfp)
vertex_parser.set_defaults(func=compile_vertex)
sagemaker_parser.set_defaults(func=compile_sagemaker)


def compile_local(args):
Expand Down Expand Up @@ -434,6 +462,19 @@ def compile_vertex(args):
compiler.compile(pipeline=pipeline, output_path=args.output_path)


def compile_sagemaker(args):
from fondant.pipeline.compiler import SagemakerCompiler

pipeline = pipeline_from_module(args.ref)
compiler = SagemakerCompiler()
compiler.compile(
pipeline=pipeline,
output_path=args.output_path,
instance_type=args.instance_type,
role_arn=args.role_arn,
)


def register_run(parent_parser):
parser = parent_parser.add_parser(
"run",
Expand Down Expand Up @@ -467,6 +508,10 @@ def register_run(parent_parser):
name="vertex",
help="Vertex runner",
)
sagemaker_parser = runner_subparser.add_parser(
name="sagemaker",
help="Sagemaker runner",
)

# Local runner parser
local_parser.add_argument(
Expand All @@ -476,12 +521,6 @@ def register_run(parent_parser):
""",
action="store",
)
local_parser.add_argument(
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="docker-compose.yml",
)
local_parser.add_argument(
"--extra-volumes",
nargs="+",
Expand Down Expand Up @@ -572,6 +611,32 @@ def register_run(parent_parser):
default=None,
)

# sagemaker runner parser
sagemaker_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
sagemaker_parser.add_argument(
"--pipeline-name",
help="""the name of the sagemaker pipeline to create""",
default="fondant-pipeline",
)

sagemaker_parser.add_argument(
"--role-arn",
help="""the Amazon Resource Name role to use for the processing steps""",
default=None,
)
sagemaker_parser.add_argument(
"--instance-type",
help="""the instance type to use for the processing steps
(see: https://aws.amazon.com/ec2/instance-types/ for options).""",
default="ml.m5.large",
)

local_parser.set_defaults(func=run_local)
kubeflow_parser.set_defaults(func=run_kfp)
vertex_parser.set_defaults(func=run_vertex)
Expand Down Expand Up @@ -656,6 +721,23 @@ def run_vertex(args):
raise e


def run_sagemaker(args):
from fondant.pipeline.runner import SagemakerRunner

try:
ref = pipeline_from_module(args.ref)
except ModuleNotFoundError:
ref = args.ref
finally:
runner = SagemakerRunner()
runner.run(
input=ref,
pipeline_name=args.pipeline_name,
role_arn=args.role_arn,
instance_type=args.instance_type,
)


def register_execute(parent_parser):
parser = parent_parser.add_parser(
"execute",
Expand Down
2 changes: 1 addition & 1 deletion src/fondant/pipeline/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def compile(

Args:
pipeline: the pipeline to compile
output_path: the path where to save the Kubeflow pipeline spec.
output_path: the path where to save the sagemaker pipeline spec.
instance_type: the instance type to use for the processing steps
(see: https://aws.amazon.com/ec2/instance-types/ for options).
role_arn: the Amazon Resource Name role to use for the processing steps,
Expand Down
4 changes: 2 additions & 2 deletions src/fondant/pipeline/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ class SagemakerRunner(Runner):
def __init__(self):
self.__resolve_imports()
self.client = self.boto3.client("sagemaker")
self.compiler = SagemakerCompiler()

def __resolve_imports(self):
try:
Expand Down Expand Up @@ -208,7 +207,8 @@ def run(
logging.info(
"Found reference to un-compiled pipeline... compiling",
)
self.compiler.compile(
compiler = SagemakerCompiler()
compiler.compile(
input,
output_path=output_path,
instance_type=instance_type,
Expand Down
32 changes: 15 additions & 17 deletions tests/pipeline/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,29 +183,27 @@ def test_sagemaker_runner(tmp_path_factory):
]


class MockSagemakerCompiler:
def compile(
self,
pipeline,
output_path,
*,
instance_type,
role_arn,
) -> None:
with open(output_path, "w") as f:
f.write("foo: bar")


def test_sagemaker_runner_from_pipeline():
with mock.patch(
"fondant.pipeline.runner.SagemakerCompiler",
) as mock_compiler, mock.patch("boto3.client", spec=True):
mock_compiler.configure_mock(
**{
"compile.side_effect": open( # noqa: SIM115
".fondant/sagemaker-pipeline.yaml",
"w",
).write("foo: bar"),
},
)
new=MockSagemakerCompiler,
), mock.patch("boto3.client", spec=True):
runner = SagemakerRunner()
runner.run(
input=PIPELINE,
pipeline_name=PIPELINE.name,
role_arn="arn:something",
)
assert runner.compiler.method_calls == [
mock.call.compile(
PIPELINE,
output_path=".fondant/sagemaker-pipeline.yaml",
instance_type="ml.m5.xlarge",
role_arn="arn:something",
),
]
24 changes: 24 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
build,
compile_kfp,
compile_local,
compile_sagemaker,
compile_vertex,
component_from_module,
execute,
Expand Down Expand Up @@ -213,6 +214,29 @@ def test_vertex_compile(tmp_path_factory):
)


def test_sagemaker_compile(tmp_path_factory):
with tmp_path_factory.mktemp("temp") as fn, patch(
"fondant.pipeline.compiler.SagemakerCompiler.compile",
) as mock_compiler:
args = argparse.Namespace(
ref=__name__,
kubeflow=False,
local=False,
vertex=False,
sagemaker=True,
output_path=str(fn / "sagemaker_pipeline.json"),
role_arn="some_role",
instance_type="some_instance_type",
)
compile_sagemaker(args)
mock_compiler.assert_called_once_with(
pipeline=TEST_PIPELINE,
output_path=str(fn / "sagemaker_pipeline.json"),
instance_type="some_instance_type",
role_arn="some_role",
)


def test_local_run():
"""Test that the run command works with different arguments."""
args = argparse.Namespace(
Expand Down
Loading