From a0028b2aa110c6672ce5f728a1f7764c9055c1e7 Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Mon, 3 Jun 2024 15:06:46 +0300 Subject: [PATCH 1/3] Fixes after testing. Signed-off-by: Revital Sur --- kfp/kfp_ray_components/Dockerfile | 3 +- kfp/kfp_ray_components/Makefile | 13 ++++- .../createRayClusterComponent.yaml | 2 +- .../deleteRayClusterComponent.yaml | 2 +- .../executeRayJobComponent.yaml | 2 +- .../executeRayJobComponent_multi_s3.yaml | 2 +- .../executeSubWorkflowComponent.yaml | 2 +- .../src/create_ray_cluster.py | 3 +- .../src/delete_ray_cluster.py | 2 +- kfp/kfp_ray_components/src/execute_ray_job.py | 2 +- .../src/execute_ray_job_multi_s3.py | 2 +- .../kfp_v2_workflow_support/pyproject.toml | 1 + .../compile_utils/__init__.py | 5 +- .../compile_utils/component.py | 6 ++- .../runtime_utils/__init__.py | 4 +- .../runtime_utils/remote_jobs_utils.py | 6 +-- kfp/requirements.env | 10 ++-- transforms/universal/noop/Makefile | 18 ++++++- .../universal/noop/kfp_ray/v2/noop_wf.py | 47 ++++++++++--------- 19 files changed, 85 insertions(+), 47 deletions(-) diff --git a/kfp/kfp_ray_components/Dockerfile b/kfp/kfp_ray_components/Dockerfile index ab0ef7588..81b391eec 100644 --- a/kfp/kfp_ray_components/Dockerfile +++ b/kfp/kfp_ray_components/Dockerfile @@ -2,6 +2,7 @@ FROM docker.io/rayproject/ray:2.9.3-py310 ARG BUILD_DATE ARG GIT_COMMIT +ARG KFP_v2 LABEL build-date=$BUILD_DATE LABEL git-commit=$GIT_COMMIT @@ -22,7 +23,7 @@ RUN cd python_apiserver_client && pip install --no-cache-dir -e . COPY --chown=ray:users workflow_support_lib workflow_support_lib/ RUN cd workflow_support_lib && pip install --no-cache-dir -e . - +ENV KFP_v2=$KFP_v2 # remove credentials-containing file RUN rm requirements.txt # components diff --git a/kfp/kfp_ray_components/Makefile b/kfp/kfp_ray_components/Makefile index 717ad7754..a1e4fe730 100644 --- a/kfp/kfp_ray_components/Makefile +++ b/kfp/kfp_ray_components/Makefile @@ -26,6 +26,17 @@ endif #DOCKER_IMG=${DOCKER_HOSTNAME}/${DOCKER_NAMESPACE}/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_VERSION} DOCKER_IMG=$(DOCKER_LOCAL_IMAGE) +.PHONY: .kfp_image +.kfp_image:: # Must be called with a DOCKER_IMAGE= settings. + @# Help: Create the docker image $(DOCKER_LOCAL_IMAGE) and a tag for $(DOCKER_REMOTE_IMAGE) + $(DOCKER) build -t $(DOCKER_LOCAL_IMAGE) \ + -f $(DOCKER_FILE) \ + --build-arg EXTRA_INDEX_URL=$(EXTRA_INDEX_URL) \ + --build-arg BASE_IMAGE=$(BASE_IMAGE) \ + --build-arg BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ') \ + --build-arg KFP_v2=$(KFPv2) \ + --build-arg GIT_COMMIT=$(shell git log -1 --format=%h) . + $(DOCKER) tag $(DOCKER_LOCAL_IMAGE) $(DOCKER_REMOTE_IMAGE) .PHONY: .lib-src-image .lib-src-image:: @@ -33,7 +44,7 @@ DOCKER_IMG=$(DOCKER_LOCAL_IMAGE) $(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_PYTHON_LIB_DIR) LIB_NAME=data-processing-lib-python $(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/python_apiserver_client LIB_NAME=python_apiserver_client $(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB) LIB_NAME=workflow_support_lib - $(MAKE) .defaults.image + $(MAKE) .kfp_image rm -rf data-processing-lib-ray rm -rf data-processing-lib-python rm -rf python_apiserver_client diff --git a/kfp/kfp_ray_components/createRayClusterComponent.yaml b/kfp/kfp_ray_components/createRayClusterComponent.yaml index 71df1893a..f86af3991 100644 --- a/kfp/kfp_ray_components/createRayClusterComponent.yaml +++ b/kfp/kfp_ray_components/createRayClusterComponent.yaml @@ -11,7 +11,7 @@ inputs: implementation: container: - image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2 + image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2 # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. diff --git a/kfp/kfp_ray_components/deleteRayClusterComponent.yaml b/kfp/kfp_ray_components/deleteRayClusterComponent.yaml index 41d03fd5d..d62312d0c 100644 --- a/kfp/kfp_ray_components/deleteRayClusterComponent.yaml +++ b/kfp/kfp_ray_components/deleteRayClusterComponent.yaml @@ -8,7 +8,7 @@ inputs: implementation: container: - image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2 + image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2 # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. diff --git a/kfp/kfp_ray_components/executeRayJobComponent.yaml b/kfp/kfp_ray_components/executeRayJobComponent.yaml index b6589dcfb..d339bd05b 100644 --- a/kfp/kfp_ray_components/executeRayJobComponent.yaml +++ b/kfp/kfp_ray_components/executeRayJobComponent.yaml @@ -12,7 +12,7 @@ inputs: implementation: container: - image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2 + image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2 # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. diff --git a/kfp/kfp_ray_components/executeRayJobComponent_multi_s3.yaml b/kfp/kfp_ray_components/executeRayJobComponent_multi_s3.yaml index ca8f44a55..0c6c549fa 100644 --- a/kfp/kfp_ray_components/executeRayJobComponent_multi_s3.yaml +++ b/kfp/kfp_ray_components/executeRayJobComponent_multi_s3.yaml @@ -13,7 +13,7 @@ inputs: implementation: container: - image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2 + image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2 # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. diff --git a/kfp/kfp_ray_components/executeSubWorkflowComponent.yaml b/kfp/kfp_ray_components/executeSubWorkflowComponent.yaml index d4b862747..4187d0893 100644 --- a/kfp/kfp_ray_components/executeSubWorkflowComponent.yaml +++ b/kfp/kfp_ray_components/executeSubWorkflowComponent.yaml @@ -27,7 +27,7 @@ outputs: implementation: container: - image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2 + image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2 # command is a list of strings (command-line arguments). # The YAML language has two syntaxes for lists, and you can use either of them. # Here we use the "flow syntax" - comma-separated strings inside square brackets. diff --git a/kfp/kfp_ray_components/src/create_ray_cluster.py b/kfp/kfp_ray_components/src/create_ray_cluster.py index ee8312b1b..900e482ca 100644 --- a/kfp/kfp_ray_components/src/create_ray_cluster.py +++ b/kfp/kfp_ray_components/src/create_ray_cluster.py @@ -13,7 +13,8 @@ import sys kfp_v2 = os.getenv("KFP_v2", 0) -if kfp_v2 == 1: +print(kfp_v2) +if kfp_v2 == "1": from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs print(f"Load KFPv2 libs") else: diff --git a/kfp/kfp_ray_components/src/delete_ray_cluster.py b/kfp/kfp_ray_components/src/delete_ray_cluster.py index ccbb31b93..02eeeb650 100644 --- a/kfp/kfp_ray_components/src/delete_ray_cluster.py +++ b/kfp/kfp_ray_components/src/delete_ray_cluster.py @@ -14,7 +14,7 @@ import sys kfp_v2 = os.getenv("KFP_v2", 0) -if kfp_v2 == 1: +if kfp_v2 == "1": from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs print(f"Load KFPv2 libs") else: diff --git a/kfp/kfp_ray_components/src/execute_ray_job.py b/kfp/kfp_ray_components/src/execute_ray_job.py index 037a3baaa..efbb8e723 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job.py +++ b/kfp/kfp_ray_components/src/execute_ray_job.py @@ -13,7 +13,7 @@ import os kfp_v2 = os.getenv("KFP_v2", 0) -if kfp_v2 == 1: +if kfp_v2 == "1": from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs print(f"Load KFPv2 libs") else: diff --git a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py index 7a9246cdf..7493c247f 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py +++ b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py @@ -13,7 +13,7 @@ import os kfp_v2 = os.getenv("KFP_v2", 0) -if kfp_v2 == 1: +if kfp_v2 == "1": from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs print(f"Load KFPv2 libs") else: diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml b/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml index bedc6f334..05e39be76 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml @@ -13,6 +13,7 @@ authors = [ ] dependencies = [ "kfp==2.7.0", + "kfp-kubernetes==1.2.0", "ray==2.9.3", "requests", "data-prep-toolkit==0.2.0", diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/__init__.py index bbe1476fb..6b99a6be1 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/__init__.py +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/__init__.py @@ -1,3 +1,6 @@ -from kfp_support.workflow_support.compile_utils.component import ( +from workflow_support.compile_utils.component import ( + ONE_HOUR_SEC, + ONE_DAY_SEC, + ONE_WEEK_SEC, ComponentUtils ) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py index 1f66bf59f..93a604d22 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py @@ -4,6 +4,10 @@ RUN_NAME = "KFP_RUN_NAME" +ONE_HOUR_SEC = 60 * 60 +ONE_DAY_SEC = ONE_HOUR_SEC * 24 +ONE_WEEK_SEC = ONE_DAY_SEC * 7 + class ComponentUtils: """ Class containing methods supporting building pipelines @@ -67,7 +71,7 @@ def default_compute_execution_params( import sys from data_processing.utils import GB, get_logger - from kfp_support.workflow_support.runtime_utils import KFPUtils + from workflow_support.runtime_utils import KFPUtils logger = get_logger(__name__) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py index d2301bd0a..8d2cdd648 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py @@ -1,2 +1,2 @@ -from kfp_support.workflow_support.runtime_utils.kfp_utils import KFPUtils -from kfp_support.workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs +from workflow_support.runtime_utils.kfp_utils import KFPUtils +from workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py index c7e7cbe45..0b20b28c4 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py @@ -17,8 +17,8 @@ from data_processing.data_access import DataAccess, DataAccessFactory from data_processing.utils import ParamsUtils, get_logger -from kfp_support.api_server_client import KubeRayAPIs -from kfp.kfp_support_lib.python_apiserver_client.src.python_apiserver_client.params import ( +from python_apiserver_client import KubeRayAPIs +from python_apiserver_client.params import ( DEFAULT_HEAD_START_PARAMS, DEFAULT_WORKER_START_PARAMS, Cluster, @@ -30,7 +30,7 @@ environment_variables_decoder, volume_decoder, ) -from kfp_support.workflow_support.runtime_utils import KFPUtils +from workflow_support.runtime_utils import KFPUtils from ray.job_submission import JobStatus diff --git a/kfp/requirements.env b/kfp/requirements.env index c5f60ed03..6fa707df5 100644 --- a/kfp/requirements.env +++ b/kfp/requirements.env @@ -3,9 +3,9 @@ KFP_v2=2.7.0 KFP_v1=1.8.22 ifeq ($(KFPv2), 1) -KFP=$(KFP_v2) -WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support + KFP=$(KFP_v2) + WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support else -KFP=$(KFP_v1) -WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support -endif \ No newline at end of file + KFP=$(KFP_v1) + WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support +endif diff --git a/transforms/universal/noop/Makefile b/transforms/universal/noop/Makefile index 02fd06dc2..6ca460863 100644 --- a/transforms/universal/noop/Makefile +++ b/transforms/universal/noop/Makefile @@ -47,15 +47,29 @@ workflow-venv: .PHONY: workflow-build workflow-build: +ifeq ($(KFPv2), 0) $(MAKE) -C kfp_ray/v1 workflow-build +else + $(MAKE) -C kfp_ray/v2 workflow-build +endif + .PHONY: workflow-test workflow-test: - $(MAKE) -C $(PIPELINE_PATH) workflow-test +ifeq ($(KFPv2), 0) + $(MAKE) -C kfp_ray/v2 workflow-test +else + $(MAKE) -C kfp_ray/v2 workflow-test +endif .PHONY: workflow-upload workflow-upload: - $(MAKE) -C $(PIPELINE_PATH) workflow-upload +ifeq ($(KFPv2), 0) + $(MAKE) -C kfp_ray/v1 workflow-upload +else + $(MAKE) -C kfp_ray/v2 workflow-upload +endif + .PHONY: workflow-reconcile-requirements workflow-reconcile-requirements: diff --git a/transforms/universal/noop/kfp_ray/v2/noop_wf.py b/transforms/universal/noop/kfp_ray/v2/noop_wf.py index 613b362fd..a77e3a2b4 100644 --- a/transforms/universal/noop/kfp_ray/v2/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/v2/noop_wf.py @@ -23,13 +23,13 @@ # FIXME: create a component to get run id RUN_ID = uuid.uuid4().hex -task_image = "quay.io/dataprep1/data-prep-kit/noop:0.8.0" +task_image = "quay.io/dataprep1/data-prep-kit/noop:0.9.0" # the name of the job script EXEC_SCRIPT_NAME: str = "noop_transform.py" # components -base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1-kfp-v21" +base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2" # path to kfp component specifications files component_spec_path = "../../../../../kfp/kfp_ray_components/" @@ -37,9 +37,12 @@ # compute execution parameters. Here different tranforms might need different implementations. As # a result, instead of creating a component we are creating it in place here. @dsl.component(base_image=base_kfp_image) -compute_exec_params_op = comp.func_to_container_op( - func=ComponentUtils.default_compute_execution_params -) +def compute_exec_params(worker_options: str, actor_options: str) -> str: + from workflow_support.compile_utils import ComponentUtils + + return ComponentUtils.default_compute_execution_params(worker_options, actor_options) + + # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") # execute job @@ -111,49 +114,49 @@ def noop( """ # create clean_up task clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=RUN_ID, server_url=server_url) - ComponentUtils.add_settings_to_component(clean_up_task, 60, image_pull_policy="Always") + ComponentUtils.add_settings_to_component(clean_up_task, 60) # pipeline definition with dsl.ExitHandler(clean_up_task): - # compute execution params - compute_exec_params = compute_exec_params_op( + # compute execution params + compute_exec_params_task = compute_exec_params( worker_options=ray_worker_options, actor_options=runtime_actor_options, ) - ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2,image_pull_policy="Always") + ComponentUtils.add_settings_to_component(compute_exec_params_task, ONE_HOUR_SEC * 2) # start Ray cluster - ray_cluster = create_ray_op( + ray_cluster = create_ray_op( ray_name=ray_name, run_id=RUN_ID, ray_head_options=ray_head_options, ray_worker_options=ray_worker_options, server_url=server_url, additional_params=additional_params, - ) - ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2, image_pull_policy="Always") + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) #ray_cluster.after(compute_exec_params) # Execute job - execute_job = execute_ray_jobs_op( + execute_job = execute_ray_jobs_op( ray_name=ray_name, run_id=RUN_ID, additional_params=additional_params, # note that the parameters below are specific for NOOP transform exec_params={ "data_s3_config": "{'input_folder': 'dev-code-datasets/data-prep-labs/kfp-v2/noop/input/', 'output_folder': 'dev-code-datasets/data-prep-labs/kfp-v2/noop/output/'}", - "data_max_files": data_max_files, - "data_num_samples": data_num_samples, + "data_max_files": -1, + "data_num_samples": -1, "runtime_num_workers": "1", "runtime_worker_options": "{'num_cpus': 0.8}", - "runtime_pipeline_id": runtime_actor_options, + "runtime_pipeline_id": "{'num_cpus': 0.8}", "runtime_job_id": RUN_ID, - "runtime_code_location": runtime_code_location, - "noop_sleep_sec": noop_sleep_sec, + "runtime_code_location": "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + "noop_sleep_sec": 10, }, exec_script_name=EXEC_SCRIPT_NAME, server_url=server_url, - ) - ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC,image_pull_policy="Always") - ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) - execute_job.after(ray_cluster) + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + execute_job.after(ray_cluster) # Configure the pipeline level to one week (in seconds) # dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC) From 47c4c687786e671879c7390948a2062d040275cf Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Mon, 3 Jun 2024 17:30:52 +0300 Subject: [PATCH 2/3] Fix compute_exec_params Signed-off-by: Revital Sur --- .../compile_utils/component.py | 47 ------------------- .../runtime_utils/kfp_utils.py | 47 +++++++++++++++++++ .../universal/noop/kfp_ray/v2/noop_wf.py | 4 +- 3 files changed, 49 insertions(+), 49 deletions(-) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py index 93a604d22..4fa47290f 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/compile_utils/component.py @@ -56,50 +56,3 @@ def set_s3_env_vars_to_component( for env_name, _ in env2key.items(): env2key[prefix + "_" + env_name] = env2key.pop(env_name) kubernetes.use_secret_as_env(task=task, secret_name='s3-secret', secret_key_to_env=env2key) - - @staticmethod - def default_compute_execution_params( - worker_options: str, # ray worker configuration - actor_options: str, # cpus per actor - ) -> str: - """ - This is the most simplistic transform execution parameters computation - :param worker_options: configuration of ray workers - :param actor_options: actor request requirements - :return: number of actors - """ - import sys - - from data_processing.utils import GB, get_logger - from workflow_support.runtime_utils import KFPUtils - - logger = get_logger(__name__) - - # convert input - w_options = KFPUtils.load_from_json(worker_options.replace("'", '"')) - a_options = KFPUtils.load_from_json(actor_options.replace("'", '"')) - # Compute available cluster resources - cluster_cpu = w_options["replicas"] * w_options["cpu"] - cluster_mem = w_options["replicas"] * w_options["memory"] - cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0) - logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}") - # compute number of actors - n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5)) - n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB)) - n_actors = min(n_actors_cpu, n_actors_memory) - # Check if we need gpu calculations as well - actor_gpu = a_options.get("num_gpus", 0) - if actor_gpu > 0: - n_actors_gpu = int(cluster_gpu / actor_gpu) - n_actors = min(n_actors, n_actors_gpu) - logger.info(f"Number of actors - {n_actors}") - if n_actors < 1: - logger.warning( - f"Not enough cpu/gpu/memory to run transform, " - f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, " - f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, " - f"required cpu {actor_gpu}, available {cluster_gpu}" - ) - sys.exit(1) - - return str(n_actors) \ No newline at end of file diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py index ef00b0e92..0e9951282 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py @@ -111,3 +111,50 @@ def load_from_json(js: str) -> dict[str, Any]: except Exception as e: logger.warning(f"Failed to load parameters {js} with error {e}") sys.exit(1) + + @staticmethod + def default_compute_execution_params( + worker_options: str, # ray worker configuration + actor_options: str, # cpus per actor + ) -> str: + """ + This is the most simplistic transform execution parameters computation + :param worker_options: configuration of ray workers + :param actor_options: actor request requirements + :return: number of actors + """ + import sys + + from data_processing.utils import GB, get_logger + from workflow_support.runtime_utils import KFPUtils + + logger = get_logger(__name__) + + # convert input + w_options = KFPUtils.load_from_json(worker_options.replace("'", '"')) + a_options = KFPUtils.load_from_json(actor_options.replace("'", '"')) + # Compute available cluster resources + cluster_cpu = w_options["replicas"] * w_options["cpu"] + cluster_mem = w_options["replicas"] * w_options["memory"] + cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0) + logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}") + # compute number of actors + n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5)) + n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB)) + n_actors = min(n_actors_cpu, n_actors_memory) + # Check if we need gpu calculations as well + actor_gpu = a_options.get("num_gpus", 0) + if actor_gpu > 0: + n_actors_gpu = int(cluster_gpu / actor_gpu) + n_actors = min(n_actors, n_actors_gpu) + logger.info(f"Number of actors - {n_actors}") + if n_actors < 1: + logger.warning( + f"Not enough cpu/gpu/memory to run transform, " + f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, " + f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, " + f"required cpu {actor_gpu}, available {cluster_gpu}" + ) + sys.exit(1) + + return str(n_actors) diff --git a/transforms/universal/noop/kfp_ray/v2/noop_wf.py b/transforms/universal/noop/kfp_ray/v2/noop_wf.py index a77e3a2b4..4c9ce970e 100644 --- a/transforms/universal/noop/kfp_ray/v2/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/v2/noop_wf.py @@ -38,9 +38,9 @@ # a result, instead of creating a component we are creating it in place here. @dsl.component(base_image=base_kfp_image) def compute_exec_params(worker_options: str, actor_options: str) -> str: - from workflow_support.compile_utils import ComponentUtils + from workflow_support.runtime_utils import KFPUtils - return ComponentUtils.default_compute_execution_params(worker_options, actor_options) + return KFPUtils.default_compute_execution_params(worker_options, actor_options) # create Ray cluster From 8ff943493beff1b2dc7cdb70b20e3f83f39a1297 Mon Sep 17 00:00:00 2001 From: Revital Sur Date: Mon, 3 Jun 2024 17:35:09 +0300 Subject: [PATCH 3/3] Minor fix. Signed-off-by: Revital Sur --- transforms/universal/noop/kfp_ray/v2/noop_wf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transforms/universal/noop/kfp_ray/v2/noop_wf.py b/transforms/universal/noop/kfp_ray/v2/noop_wf.py index 4c9ce970e..4107fee2b 100644 --- a/transforms/universal/noop/kfp_ray/v2/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/v2/noop_wf.py @@ -133,7 +133,7 @@ def noop( additional_params=additional_params, ) ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) - #ray_cluster.after(compute_exec_params) + ray_cluster.after(compute_exec_params) # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name,