From 96f8dd45d89bc9167d3542e159ace65a2a7271af Mon Sep 17 00:00:00 2001 From: Alexey Roytman Date: Mon, 3 Jun 2024 11:08:49 +0300 Subject: [PATCH] update noop kfpv2 --- .make.versions | 4 +- .../src/create_ray_cluster.py | 2 +- .../src/delete_ray_cluster.py | 2 +- kfp/kfp_ray_components/src/execute_ray_job.py | 2 +- .../src/execute_ray_job_multi_s3.py | 2 +- kfp/kfp_ray_components/src/subworkflow.py | 2 +- .../utils/pipelines_tests_utils.py | 2 +- .../kfp_v2_workflow_support/Makefile | 2 + .../kfp_v2_workflow_support/pyproject.toml | 8 +- .../compile_utils/__init__.py | 0 .../compile_utils/component.py | 0 .../pipeline_utils/__init__.py | 1 + .../pipeline_utils/pipeline_utils.py | 173 ++++++++++++++++++ .../pipeline_utils/pipelines_tests_utils.py | 75 ++++++++ .../runtime_utils/__init__.py | 0 .../runtime_utils/kfp_utils.py | 0 .../runtime_utils/remote_jobs_utils.py | 0 .../test/pipeline_utils_test.py | 34 ++++ .../test/ray_remote_jobs_test.py | 91 +++++++++ .../universal/noop/kfp_ray/v2/noop_wf.py | 2 +- 20 files changed, 390 insertions(+), 12 deletions(-) rename kfp/kfp_support_lib/kfp_v2_workflow_support/src/{ => workflow_support}/compile_utils/__init__.py (100%) rename kfp/kfp_support_lib/kfp_v2_workflow_support/src/{ => workflow_support}/compile_utils/component.py (100%) create mode 100644 kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/__init__.py create mode 100644 kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipeline_utils.py create mode 100644 kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipelines_tests_utils.py rename kfp/kfp_support_lib/kfp_v2_workflow_support/src/{ => workflow_support}/runtime_utils/__init__.py (100%) rename kfp/kfp_support_lib/kfp_v2_workflow_support/src/{ => workflow_support}/runtime_utils/kfp_utils.py (100%) rename kfp/kfp_support_lib/kfp_v2_workflow_support/src/{ => workflow_support}/runtime_utils/remote_jobs_utils.py (100%) create mode 100644 kfp/kfp_support_lib/kfp_v2_workflow_support/test/pipeline_utils_test.py create mode 100644 kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py diff --git a/.make.versions b/.make.versions index 8900a58d4..73bb98ce0 100644 --- a/.make.versions +++ b/.make.versions @@ -7,7 +7,7 @@ # Data prep lab wheel version DPK_LIB_VERSION=0.2.0 DPK_LIB_KFP_VERSION=0.2.0 -DPK_LIB_KFP_VERSION_v2=0.1.1-dev1 +DPK_LIB_KFP_VERSION_v2=0.2.0 # Begin transform versions/tags BLOCKLIST_VERSION=0.4.0 @@ -28,5 +28,5 @@ CODE_QUALITY_VERSION=0.4.0 DOC_QUALITY_VERSION=0.4.0 INGEST_TO_PARQUET_VERSION=0.4.0 -KFP_DOCKER_VERSION_v2=0.1.1 +KFP_DOCKER_VERSION_v2=0.2.0-v2 KFP_DOCKER_VERSION=0.2.0-v2 diff --git a/kfp/kfp_ray_components/src/create_ray_cluster.py b/kfp/kfp_ray_components/src/create_ray_cluster.py index 42cace863..ee8312b1b 100644 --- a/kfp/kfp_ray_components/src/create_ray_cluster.py +++ b/kfp/kfp_ray_components/src/create_ray_cluster.py @@ -14,7 +14,7 @@ kfp_v2 = os.getenv("KFP_v2", 0) if kfp_v2 == 1: - from kfp_v1_workflow_support.utils import KFPUtils, RayRemoteJobs + from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs print(f"Load KFPv2 libs") else: from workflow_support.utils import KFPUtils, RayRemoteJobs diff --git a/kfp/kfp_ray_components/src/delete_ray_cluster.py b/kfp/kfp_ray_components/src/delete_ray_cluster.py index 886799453..ccbb31b93 100644 --- a/kfp/kfp_ray_components/src/delete_ray_cluster.py +++ b/kfp/kfp_ray_components/src/delete_ray_cluster.py @@ -15,7 +15,7 @@ kfp_v2 = os.getenv("KFP_v2", 0) if kfp_v2 == 1: - from kfp_v1_workflow_support.utils import KFPUtils, RayRemoteJobs + from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs print(f"Load KFPv2 libs") else: from workflow_support.utils import KFPUtils, RayRemoteJobs diff --git a/kfp/kfp_ray_components/src/execute_ray_job.py b/kfp/kfp_ray_components/src/execute_ray_job.py index 4a80f3ae0..037a3baaa 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job.py +++ b/kfp/kfp_ray_components/src/execute_ray_job.py @@ -14,7 +14,7 @@ kfp_v2 = os.getenv("KFP_v2", 0) if kfp_v2 == 1: - from workflow_support.utils import KFPUtils, execute_ray_jobs + from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs print(f"Load KFPv2 libs") else: from workflow_support.utils import KFPUtils, execute_ray_jobs diff --git a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py index 123c5a8e7..7a9246cdf 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py +++ b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py @@ -14,7 +14,7 @@ kfp_v2 = os.getenv("KFP_v2", 0) if kfp_v2 == 1: - from workflow_support.utils import KFPUtils, execute_ray_jobs + from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs print(f"Load KFPv2 libs") else: from workflow_support.utils import KFPUtils, execute_ray_jobs diff --git a/kfp/kfp_ray_components/src/subworkflow.py b/kfp/kfp_ray_components/src/subworkflow.py index 4771390c2..a57e1406d 100644 --- a/kfp/kfp_ray_components/src/subworkflow.py +++ b/kfp/kfp_ray_components/src/subworkflow.py @@ -3,7 +3,7 @@ kfp_v2 = os.getenv("KFP_v2", 0) if kfp_v2 == 1: - from workflow_support.utils import KFPUtils, PipelinesUtils + from workflow_support.runtime_utils import KFPUtils, PipelinesUtils print(f"Load KFPv2 libs") else: from workflow_support.utils import KFPUtils, PipelinesUtils diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/utils/pipelines_tests_utils.py b/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/utils/pipelines_tests_utils.py index 1e7ff9cf7..5fd43ca6b 100644 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/utils/pipelines_tests_utils.py +++ b/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/utils/pipelines_tests_utils.py @@ -3,7 +3,7 @@ from data_processing.utils import get_logger, str2bool -from . import PipelinesUtils +from workflow_support.utils import PipelinesUtils logger = get_logger(__name__) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile b/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile index 135e29514..6d6540d84 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile @@ -49,6 +49,8 @@ venv:: pyproject.toml .check-env rm -rf venv $(PYTHON) -m venv venv . ${VENV_ACTIVATE}; \ + pip install -e ../python_apiserver_client; \ + pip install -e ../../../data-processing-lib/python; \ pip install -e .; \ pip install ray==${RAY} \ pip install pytest pytest-cov diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml b/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml index 4238e0417..bedc6f334 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml @@ -12,9 +12,11 @@ authors = [ { name = "Revital Eres", email = "eres@il.ibm.com" }, ] dependencies = [ - "kfp==2.2.0", + "kfp==2.7.0", + "ray==2.9.3", "requests", - "data-prep-toolkit==0.1.1", + "data-prep-toolkit==0.2.0", + "python_apiserver_client", ] [build-system] @@ -36,7 +38,7 @@ dev = [ package_dir = ["src"] [options.packages.find] -where = ["src/kfp_support"] +where = ["src/workflow_support"] [tool.pytest.ini_options] addopts = "--cov --cov-report term-missing --cov-fail-under 10" diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/compile_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/__init__.py similarity index 100% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/compile_utils/__init__.py rename to kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/__init__.py diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/compile_utils/component.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py similarity index 100% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/compile_utils/component.py rename to kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/__init__.py new file mode 100644 index 000000000..0e80d97a2 --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/__init__.py @@ -0,0 +1 @@ +from workflow_support.pipeline_utils.pipeline_utils import PipelinesUtils diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipeline_utils.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipeline_utils.py new file mode 100644 index 000000000..7566f6b2e --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipeline_utils.py @@ -0,0 +1,173 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import datetime +import time +from typing import Any, Optional + +from data_processing.utils import get_logger +from kfp_server_api import models + +from kfp import Client + + +logger = get_logger(__name__) + + +class PipelinesUtils: + """ + Helper class for pipeline management + """ + + def __init__(self, host: str = "http://localhost:8080"): + """ + Initialization + :param host: host to connect to + """ + self.kfp_client = Client(host=host) + + def upload_pipeline( + self, + pipeline_package_path: str = None, + pipeline_name: str = None, + overwrite: bool = False, + description: str = None, + ) -> models.api_pipeline.ApiPipeline: + """ + Uploads the pipeline + :param pipeline_package_path: Local path to the pipeline package. + :param pipeline_name: Optional. Name of the pipeline to be shown in the UI + :param overwrite: Optional. If pipeline exists, delete it before creating a new one. + :param description: Optional. Description of the pipeline to be shown in the UI. + :return: Server response object containing pipeline id and other information. + """ + if overwrite: + pipeline = self.get_pipeline_by_name(name=pipeline_name) + if pipeline is not None: + try: + logger.info(f"pipeline {pipeline_name} already exists. Trying to delete it.") + self.kfp_client.delete_pipeline(pipeline_id=pipeline.id) + except Exception as e: + logger.warning(f"Exception deleting pipeline {e} before uploading") + return None + try: + pipeline = self.kfp_client.upload_pipeline( + pipeline_package_path=pipeline_package_path, pipeline_name=pipeline_name, description=description + ) + except Exception as e: + logger.warning(f"Exception uploading pipeline {e}") + return None + if pipeline is None: + logger.warning(f"Failed to upload pipeline {pipeline_name}.") + return None + logger.info("Pipeline uploaded") + return pipeline + + def delete_pipeline(self, pipeline_id): + """ + Delete pipeline. + :param pipeline_id: id of the pipeline. + :return + Returns: + Object. If the method is called asynchronously, returns the request thread. + Raises: + kfp_server_api.ApiException: If pipeline is not found. + """ + return self.kfp_client.delete_pipeline(pipeline_id) + + def start_pipeline( + self, + pipeline: models.api_pipeline.ApiPipeline, + experiment: models.api_experiment.ApiExperiment, + params: Optional[dict[str, Any]], + ) -> str: + """ + Start a specified pipeline. + :param pipeline: pipeline definition + :param experiment: experiment to use + :param params: pipeline parameters + :return: the id of the run object + """ + job_name = pipeline.name + " " + datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S") + try: + run_id = self.kfp_client.run_pipeline( + experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline.id, params=params + ) + logger.info(f"Pipeline run {job_name} submitted") + return run_id.id + except Exception as e: + logger.warning(f"Exception starting pipeline {e}") + return None + + def get_experiment_by_name(self, name: str = "Default") -> models.api_experiment.ApiExperiment: + """ + Get experiment by name + :param name: name + :return: experiment + """ + try: + return self.kfp_client.get_experiment(experiment_name=name) + except Exception as e: + logger.warning(f"Exception getting experiment {e}") + return None + + def get_pipeline_by_name(self, name: str, np: int = 100) -> models.api_pipeline.ApiPipeline: + """ + Given pipeline name, return the pipeline + :param name: pipeline name + :param np: page size for pipeline query. For large clusters with many pipelines, you might need to + increase this number + :return: pipeline + """ + try: + # Get all pipelines + pipelines = self.kfp_client.list_pipelines(page_size=np).pipelines + required = list(filter(lambda p: name in p.name, pipelines)) + if len(required) != 1: + logger.warning(f"Failure to get pipeline. Number of pipelines with name {name} is {len(required)}") + return None + return required[0] + + except Exception as e: + logger.warning(f"Exception getting pipeline {e}") + return None + + def wait_pipeline_completion(self, run_id: str, timeout: int = -1, wait: int = 600) -> tuple[str, str]: + """ + Waits for a pipeline run to complete + :param run_id: run id + :param timeout: timeout (sec) (-1 wait forever) + :param wait: internal wait (sec) + :return: Completion status and an error message if such exists + """ + try: + if timeout > 0: + end = time.time() + timeout + else: + end = 2**63 - 1 + run_details = self.kfp_client.get_run(run_id=run_id) + status = run_details.run.status + while status is None or status.lower() not in ["succeeded", "completed", "failed", "skipped", "error"]: + time.sleep(wait) + if (end - time.time()) < 0: + return "failed", f"Execution is taking too long" + run_details = self.kfp_client.get_run(run_id=run_id) + status = run_details.run.status + logger.info(f"Got pipeline execution status {status}") + + if status.lower() in ["succeeded", "completed"]: + return status, "" + return status, run_details.run.error + + except Exception as e: + logger.warning(f"Failed waiting pipeline completion {e}") + return "failed", str(e) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipelines_tests_utils.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipelines_tests_utils.py new file mode 100644 index 000000000..1e7ff9cf7 --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/pipeline_utils/pipelines_tests_utils.py @@ -0,0 +1,75 @@ +import os +import sys + +from data_processing.utils import get_logger, str2bool + +from . import PipelinesUtils + + +logger = get_logger(__name__) + + +def run_test(pipeline_package_path: str, endpoint: str = "http://localhost:8080/", overwrite: bool = True): + """ + Upload and run a single pipeline + + :param pipeline_package_path: Local path to the pipeline package. + :param endpoint: endpoint to kfp service. + :return the pipeline name as it appears in the kfp GUI. + """ + tmout: int = 800 + wait: int = 60 + file_name = os.path.basename(pipeline_package_path) + pipeline_name = os.path.splitext(file_name)[0] + utils = PipelinesUtils(host=endpoint) + pipeline = utils.upload_pipeline( + pipeline_package_path=pipeline_package_path, + pipeline_name=pipeline_name, + overwrite=overwrite, + ) + if pipeline is None: + return None + experiment = utils.get_experiment_by_name() + run_id = utils.start_pipeline(pipeline, experiment, params=[]) + status, error = utils.wait_pipeline_completion(run_id=run_id, timeout=tmout, wait=wait) + if status.lower() not in ["succeeded", "completed"]: + # Execution failed + logger.warning(f"Pipeline {pipeline_name} failed with error {error} and status {status}") + return None + logger.info(f"Pipeline {pipeline_name} successfully completed") + return pipeline_name + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Run sanity test") + parser.add_argument("-c", "--command", type=str, choices=["upload", "sanity-test"]) + parser.add_argument("-e", "--endpoint", type=str, default="http://localhost:8080/") + parser.add_argument("-p", "--pipeline_package_path", type=str, default="") + parser.add_argument("-o", "--overwrite", type=str, default="True") + + args = parser.parse_args() + match args.command: + case "upload": + file_name = os.path.basename(args.pipeline_package_path) + pipeline_name = os.path.splitext(file_name)[0] + utils = PipelinesUtils(host=args.endpoint) + pipeline = utils.upload_pipeline( + pipeline_package_path=args.pipeline_package_path, + pipeline_name=pipeline_name, + overwrite=str2bool(args.overwrite), + ) + if pipeline is None: + sys.exit(1) + case "sanity-test": + run = run_test( + endpoint=args.endpoint, + pipeline_package_path=args.pipeline_package_path, + overwrite=str2bool(args.overwrite), + ) + if run is None: + sys.exit(1) + case _: + logger.warning("Unsupported command") + exit(1) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/runtime_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py similarity index 100% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/runtime_utils/__init__.py rename to kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py similarity index 100% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/runtime_utils/kfp_utils.py rename to kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/runtime_utils/remote_jobs_utils.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py similarity index 100% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/runtime_utils/remote_jobs_utils.py rename to kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/test/pipeline_utils_test.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/test/pipeline_utils_test.py new file mode 100644 index 000000000..77cca5635 --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/test/pipeline_utils_test.py @@ -0,0 +1,34 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from workflow_support.utils import PipelinesUtils + +server_url = "http://localhost:8080/" + +def test_pipelines(): + """ + Test pipelines utils + """ + utils = PipelinesUtils(host=server_url) + # get pipeline by name + pipeline = utils.get_pipeline_by_name("[Tutorial] Data passing in python components") + assert pipeline is not None + # get default experiment + experiment = utils.get_experiment_by_name() + assert experiment is not None + # start pipeline + run = utils.start_pipeline(pipeline=pipeline, experiment=experiment, params={}) + assert run is not None + # wait for completion + status, error = utils.wait_pipeline_completion(run_id=run, wait=10) + assert status.lower() == "succeeded" + assert error == "" diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py new file mode 100644 index 000000000..7b9ad2c13 --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py @@ -0,0 +1,91 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from configmaps import ConfigmapsManager +from python_apiserver_client.params import ConfigMapVolume +from workflow_support.utils import RayRemoteJobs + +server_url = "http:localhost:8080/ray/" + +def test_ray_remote_jobs(): + """ + Test the full cycle of job submission + :return: + """ + # This shows how to create volumes dictionary + volumes = [ + ConfigMapVolume( + name="code-sample", + mount_path="/home/ray/samples", + source="ray-job-code-sample", + items={"sample_code.py": "sample_code.py"}, + ) + ] + dct_volumes = {"volumes": [v.to_dict() for v in volumes]} + + head_node = { + "cpu": 2, + "memory": 4, + "image": "rayproject/ray:2.9.3-py310", + # Ray start params, just to show + "ray_start_params": {"metrics-export-port": "8080", "num-cpus": "0", "dashboard-host": "0.0.0.0"}, + "image_pull_policy": "Always", + } | dct_volumes + + worker_node = { + "cpu": 2, + "memory": 4, + "image": "rayproject/ray:2.9.3-py310", + "replicas": 1, + "min_replicas": 1, + "max_replicas": 1, + "image_pull_policy": "Always", + } | dct_volumes + + # Create configmap for testing + cm_manager = ConfigmapsManager() + cm_manager.delete_code_map() + cm_manager.create_code_map() + + # create cluster + remote_jobs = RayRemoteJobs(server_url=server_url) + status, error = remote_jobs.create_ray_cluster( + name="job-test", namespace="default", head_node=head_node, worker_nodes=[worker_node] + ) + print(f"Created cluster - status: {status}, error: {error}") + assert status == 200 + assert error is None + # submitting ray job + runtime_env = """ + pip: + - requests==2.26.0 + - pendulum==2.1.2 + env_vars: + counter_name: test_counter + """ + status, error, submission = remote_jobs.submit_job( + name="job-test", + namespace="default", + request={}, + runtime_env=runtime_env, + executor="/home/ray/samples/sample_code.py", + ) + print(f"submit job - status: {status}, error: {error}, submission id {submission}") + assert status == 200 + assert error is None + # print execution log + remote_jobs.follow_execution(name="job-test", namespace="default", submission_id=submission, print_timeout=20) + # cleanup + status, error = remote_jobs.delete_ray_cluster(name="job-test", namespace="default") + print(f"Deleted cluster - status: {status}, error: {error}") + assert status == 200 + assert error is None diff --git a/transforms/universal/noop/kfp_ray/v2/noop_wf.py b/transforms/universal/noop/kfp_ray/v2/noop_wf.py index 02a4bea6a..613b362fd 100644 --- a/transforms/universal/noop/kfp_ray/v2/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/v2/noop_wf.py @@ -13,7 +13,7 @@ import kfp.compiler as compiler import kfp.components as comp import kfp.dsl as dsl -from kfp_support.workflow_support.runtime_utils import ( +from workflow_support.compile_utils import ( ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils,