Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vertex cli #519

Merged
merged 5 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 85 additions & 2 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
from collections import defaultdict
from types import ModuleType

from fondant.compiler import DockerCompiler, KubeFlowCompiler
from fondant.compiler import DockerCompiler, KubeFlowCompiler, VertexCompiler
from fondant.component import BaseComponent, Component
from fondant.executor import ExecutorFactory
from fondant.explorer import (
run_explorer_app,
)
from fondant.pipeline import Pipeline
from fondant.runner import DockerRunner, KubeflowRunner
from fondant.runner import DockerRunner, KubeflowRunner, VertexRunner

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -192,6 +192,10 @@ def register_compile(parent_parser):
name="kubeflow",
help="Kubeflow compiler",
)
vertex_parser = compiler_subparser.add_parser(
name="vertex",
help="vertex compiler",
)

# Local runner parser
local_parser.add_argument(
Expand Down Expand Up @@ -236,8 +240,24 @@ def register_compile(parent_parser):
default="pipeline.yaml",
)

# vertex parser
vertex_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
vertex_parser.add_argument(
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="vertex_pipeline.yml",
)

local_parser.set_defaults(func=compile_local)
kubeflow_parser.set_defaults(func=compile_kfp)
vertex_parser.set_defaults(func=compile_vertex)


def compile_local(args):
Expand All @@ -257,6 +277,12 @@ def compile_kfp(args):
compiler.compile(pipeline=pipeline, output_path=args.output_path)


def compile_vertex(args):
pipeline = pipeline_from_module(args.ref)
compiler = VertexCompiler()
compiler.compile(pipeline=pipeline, output_path=args.output_path)


def register_run(parent_parser):
parser = parent_parser.add_parser(
"run",
Expand Down Expand Up @@ -284,6 +310,10 @@ def register_run(parent_parser):
name="kubeflow",
help="Kubeflow runner",
)
vertex_parser = runner_subparser.add_parser(
name="vertex",
help="Vertex runner",
)

# Local runner parser
local_parser.add_argument(
Expand Down Expand Up @@ -335,6 +365,38 @@ def register_run(parent_parser):

kubeflow_parser.set_defaults(func=run_kfp)

# Vertex runner parser
vertex_parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
vertex_parser.add_argument(
"--project-id",
help="""The project id of the GCP project used to submit the pipeline""",
)
vertex_parser.add_argument(
"--project-region",
help="The project region of the GCP project used to submit the pipeline ",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is on a per pipeline basis right? I tested it and it works. So can we update the help message to reflect hits? Now it seems like the region is tied to the project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is the region where the pipeline is run and can be different from the project region. Updated the description

)

vertex_parser.add_argument(
"--output-path",
"-o",
help="Output path of compiled pipeline",
default="vertex_pipeline.yaml",
)

vertex_parser.add_argument(
"--service-account",
help="The service account used to launch jobs",
default=None,
)

vertex_parser.set_defaults(func=run_vertex)


def run_local(args):
try:
Expand Down Expand Up @@ -377,6 +439,27 @@ def run_kfp(args):
runner.run(input_spec=spec_ref)


def run_vertex(args):
try:
pipeline = pipeline_from_module(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
logging.info(
"Found reference to un-compiled pipeline... compiling to {spec_ref}",
)
compiler = VertexCompiler()
compiler.compile(pipeline=pipeline, output_path=spec_ref)
finally:
runner = VertexRunner(
project_id=args.project_id,
project_region=args.project_region,
service_account=args.service_account,
)
runner.run(input_spec=spec_ref)


def register_execute(parent_parser):
parser = parent_parser.add_parser(
"execute",
Expand Down
73 changes: 71 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
PipelineImportError,
compile_kfp,
compile_local,
compile_vertex,
component_from_module,
execute,
get_module,
pipeline_from_module,
run_kfp,
run_local,
run_vertex,
)
from fondant.component import DaskLoadComponent
from fondant.executor import Executor, ExecutorFactory
Expand Down Expand Up @@ -135,6 +137,7 @@ def test_local_logic(tmp_path_factory):
ref=__name__,
local=True,
kubeflow=False,
vertex=False,
output_path=str(fn / "docker-compose.yml"),
extra_volumes=[],
build_arg=[],
Expand All @@ -150,12 +153,31 @@ def test_kfp_compile(tmp_path_factory):
ref=__name__,
kubeflow=True,
local=False,
output_path=str(fn / "kubeflow_pipelines.yml"),
vertex=False,
output_path=str(fn / "kubeflow_pipeline.yml"),
)
compile_kfp(args)
mock_compiler.assert_called_once_with(
pipeline=TEST_PIPELINE,
output_path=str(fn / "kubeflow_pipelines.yml"),
output_path=str(fn / "kubeflow_pipeline.yml"),
)


def test_vertex_compile(tmp_path_factory):
with tmp_path_factory.mktemp("temp") as fn, patch(
"fondant.compiler.VertexCompiler.compile",
) as mock_compiler:
args = argparse.Namespace(
ref=__name__,
kubeflow=False,
local=False,
vertex=True,
output_path=str(fn / "vertex_pipeline.yml"),
)
compile_vertex(args)
mock_compiler.assert_called_once_with(
pipeline=TEST_PIPELINE,
output_path=str(fn / "vertex_pipeline.yml"),
)


Expand All @@ -181,6 +203,8 @@ def test_local_run(tmp_path_factory):
with patch("subprocess.call") as mock_call, tmp_path_factory.mktemp("temp") as fn:
args1 = argparse.Namespace(
local=True,
vertex=False,
kubeflow=False,
ref=__name__,
output_path=str(fn / "docker-compose.yml"),
extra_volumes=[],
Expand All @@ -207,6 +231,7 @@ def test_kfp_run(tmp_path_factory):
args = argparse.Namespace(
kubeflow=True,
local=False,
vertex=False,
output_path=None,
ref="some/path",
host=None,
Expand Down Expand Up @@ -241,3 +266,47 @@ def test_kfp_run(tmp_path_factory):
)
run_kfp(args)
mock_runner.assert_called_once_with(host="localhost2")


def test_vertex_run(tmp_path_factory):
"""Test that the run command works in different scenarios."""
with patch("fondant.cli.VertexRunner") as mock_runner:
args = argparse.Namespace(
kubeflow=False,
local=False,
vertex=True,
output_path=None,
project_region="europe-west-1",
project_id="project-123",
service_account=None,
ref="some/path",
)
run_vertex(args)
mock_runner.assert_called_once_with(
project_id="project-123",
project_region="europe-west-1",
service_account=None,
)

with patch("fondant.cli.VertexRunner") as mock_runner, patch(
"fondant.cli.VertexCompiler",
) as mock_compiler, tmp_path_factory.mktemp(
"temp",
) as fn:
mock_compiler.compile.return_value = "some/path"
args = argparse.Namespace(
kubeflow=True,
local=False,
host="localhost2",
output_path=str(fn / "kubeflow_pipelines.yml"),
ref=__name__,
project_region="europe-west-1",
project_id="project-123",
service_account=None,
)
run_vertex(args)
mock_runner.assert_called_once_with(
project_id="project-123",
project_region="europe-west-1",
service_account=None,
)
Loading