Skip to content

Commit

Permalink
Merge pull request #223 from revit13/test1
Browse files Browse the repository at this point in the history
Fixes after testing.
  • Loading branch information
roytman authored Jun 3, 2024
2 parents 96f8dd4 + 8ff9434 commit de204fa
Show file tree
Hide file tree
Showing 20 changed files with 132 additions and 94 deletions.
3 changes: 2 additions & 1 deletion kfp/kfp_ray_components/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ FROM docker.io/rayproject/ray:2.9.3-py310

ARG BUILD_DATE
ARG GIT_COMMIT
ARG KFP_v2

LABEL build-date=$BUILD_DATE
LABEL git-commit=$GIT_COMMIT
Expand All @@ -22,7 +23,7 @@ RUN cd python_apiserver_client && pip install --no-cache-dir -e .

COPY --chown=ray:users workflow_support_lib workflow_support_lib/
RUN cd workflow_support_lib && pip install --no-cache-dir -e .

ENV KFP_v2=$KFP_v2
# remove credentials-containing file
RUN rm requirements.txt
# components
Expand Down
13 changes: 12 additions & 1 deletion kfp/kfp_ray_components/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,25 @@ endif
#DOCKER_IMG=${DOCKER_HOSTNAME}/${DOCKER_NAMESPACE}/${DOCKER_IMAGE_NAME}:${DOCKER_IMAGE_VERSION}
DOCKER_IMG=$(DOCKER_LOCAL_IMAGE)

.PHONY: .kfp_image
.kfp_image:: # Must be called with a DOCKER_IMAGE= settings.
@# Help: Create the docker image $(DOCKER_LOCAL_IMAGE) and a tag for $(DOCKER_REMOTE_IMAGE)
$(DOCKER) build -t $(DOCKER_LOCAL_IMAGE) \
-f $(DOCKER_FILE) \
--build-arg EXTRA_INDEX_URL=$(EXTRA_INDEX_URL) \
--build-arg BASE_IMAGE=$(BASE_IMAGE) \
--build-arg BUILD_DATE=$(shell date -u +'%Y-%m-%dT%H:%M:%SZ') \
--build-arg KFP_v2=$(KFPv2) \
--build-arg GIT_COMMIT=$(shell git log -1 --format=%h) .
$(DOCKER) tag $(DOCKER_LOCAL_IMAGE) $(DOCKER_REMOTE_IMAGE)

.PHONY: .lib-src-image
.lib-src-image::
$(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_RAY_LIB_DIR) LIB_NAME=data-processing-lib-ray
$(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_PYTHON_LIB_DIR) LIB_NAME=data-processing-lib-python
$(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/python_apiserver_client LIB_NAME=python_apiserver_client
$(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB) LIB_NAME=workflow_support_lib
$(MAKE) .defaults.image
$(MAKE) .kfp_image
rm -rf data-processing-lib-ray
rm -rf data-processing-lib-python
rm -rf python_apiserver_client
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/createRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/deleteRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/executeRayJobComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ inputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/executeSubWorkflowComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ outputs:

implementation:
container:
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0-v2
image: quay.io/dataprep1/data-prep-kit/kfp-data-processing_v2:0.2.0-v2
# command is a list of strings (command-line arguments).
# The YAML language has two syntaxes for lists, and you can use either of them.
# Here we use the "flow syntax" - comma-separated strings inside square brackets.
Expand Down
3 changes: 2 additions & 1 deletion kfp/kfp_ray_components/src/create_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import sys

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
print(kfp_v2)
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs
print(f"Load KFPv2 libs")
else:
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/delete_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import sys

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs
print(f"Load KFPv2 libs")
else:
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs
print(f"Load KFPv2 libs")
else:
Expand Down
2 changes: 1 addition & 1 deletion kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import os

kfp_v2 = os.getenv("KFP_v2", 0)
if kfp_v2 == 1:
if kfp_v2 == "1":
from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs
print(f"Load KFPv2 libs")
else:
Expand Down
1 change: 1 addition & 0 deletions kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ authors = [
]
dependencies = [
"kfp==2.7.0",
"kfp-kubernetes==1.2.0",
"ray==2.9.3",
"requests",
"data-prep-toolkit==0.2.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from kfp_support.workflow_support.compile_utils.component import (
from workflow_support.compile_utils.component import (
ONE_HOUR_SEC,
ONE_DAY_SEC,
ONE_WEEK_SEC,
ComponentUtils
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

RUN_NAME = "KFP_RUN_NAME"

ONE_HOUR_SEC = 60 * 60
ONE_DAY_SEC = ONE_HOUR_SEC * 24
ONE_WEEK_SEC = ONE_DAY_SEC * 7

class ComponentUtils:
"""
Class containing methods supporting building pipelines
Expand Down Expand Up @@ -52,50 +56,3 @@ def set_s3_env_vars_to_component(
for env_name, _ in env2key.items():
env2key[prefix + "_" + env_name] = env2key.pop(env_name)
kubernetes.use_secret_as_env(task=task, secret_name='s3-secret', secret_key_to_env=env2key)

@staticmethod
def default_compute_execution_params(
worker_options: str, # ray worker configuration
actor_options: str, # cpus per actor
) -> str:
"""
This is the most simplistic transform execution parameters computation
:param worker_options: configuration of ray workers
:param actor_options: actor request requirements
:return: number of actors
"""
import sys

from data_processing.utils import GB, get_logger
from kfp_support.workflow_support.runtime_utils import KFPUtils

logger = get_logger(__name__)

# convert input
w_options = KFPUtils.load_from_json(worker_options.replace("'", '"'))
a_options = KFPUtils.load_from_json(actor_options.replace("'", '"'))
# Compute available cluster resources
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_mem = w_options["replicas"] * w_options["memory"]
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
actor_gpu = a_options.get("num_gpus", 0)
if actor_gpu > 0:
n_actors_gpu = int(cluster_gpu / actor_gpu)
n_actors = min(n_actors, n_actors_gpu)
logger.info(f"Number of actors - {n_actors}")
if n_actors < 1:
logger.warning(
f"Not enough cpu/gpu/memory to run transform, "
f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, "
f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, "
f"required cpu {actor_gpu}, available {cluster_gpu}"
)
sys.exit(1)

return str(n_actors)
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from kfp_support.workflow_support.runtime_utils.kfp_utils import KFPUtils
from kfp_support.workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs
from workflow_support.runtime_utils.kfp_utils import KFPUtils
from workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,50 @@ def load_from_json(js: str) -> dict[str, Any]:
except Exception as e:
logger.warning(f"Failed to load parameters {js} with error {e}")
sys.exit(1)

@staticmethod
def default_compute_execution_params(
worker_options: str, # ray worker configuration
actor_options: str, # cpus per actor
) -> str:
"""
This is the most simplistic transform execution parameters computation
:param worker_options: configuration of ray workers
:param actor_options: actor request requirements
:return: number of actors
"""
import sys

from data_processing.utils import GB, get_logger
from workflow_support.runtime_utils import KFPUtils

logger = get_logger(__name__)

# convert input
w_options = KFPUtils.load_from_json(worker_options.replace("'", '"'))
a_options = KFPUtils.load_from_json(actor_options.replace("'", '"'))
# Compute available cluster resources
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_mem = w_options["replicas"] * w_options["memory"]
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
actor_gpu = a_options.get("num_gpus", 0)
if actor_gpu > 0:
n_actors_gpu = int(cluster_gpu / actor_gpu)
n_actors = min(n_actors, n_actors_gpu)
logger.info(f"Number of actors - {n_actors}")
if n_actors < 1:
logger.warning(
f"Not enough cpu/gpu/memory to run transform, "
f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, "
f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, "
f"required cpu {actor_gpu}, available {cluster_gpu}"
)
sys.exit(1)

return str(n_actors)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

from data_processing.data_access import DataAccess, DataAccessFactory
from data_processing.utils import ParamsUtils, get_logger
from kfp_support.api_server_client import KubeRayAPIs
from kfp.kfp_support_lib.python_apiserver_client.src.python_apiserver_client.params import (
from python_apiserver_client import KubeRayAPIs
from python_apiserver_client.params import (
DEFAULT_HEAD_START_PARAMS,
DEFAULT_WORKER_START_PARAMS,
Cluster,
Expand All @@ -30,7 +30,7 @@
environment_variables_decoder,
volume_decoder,
)
from kfp_support.workflow_support.runtime_utils import KFPUtils
from workflow_support.runtime_utils import KFPUtils
from ray.job_submission import JobStatus


Expand Down
10 changes: 5 additions & 5 deletions kfp/requirements.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ KFP_v2=2.7.0
KFP_v1=1.8.22

ifeq ($(KFPv2), 1)
KFP=$(KFP_v2)
WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support
KFP=$(KFP_v2)
WORKFLOW_SUPPORT_LIB=kfp_v2_workflow_support
else
KFP=$(KFP_v1)
WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support
endif
KFP=$(KFP_v1)
WORKFLOW_SUPPORT_LIB=kfp_v1_workflow_support
endif
18 changes: 16 additions & 2 deletions transforms/universal/noop/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,29 @@ workflow-venv:

.PHONY: workflow-build
workflow-build:
ifeq ($(KFPv2), 0)
$(MAKE) -C kfp_ray/v1 workflow-build
else
$(MAKE) -C kfp_ray/v2 workflow-build
endif


.PHONY: workflow-test
workflow-test:
$(MAKE) -C $(PIPELINE_PATH) workflow-test
ifeq ($(KFPv2), 0)
$(MAKE) -C kfp_ray/v2 workflow-test
else
$(MAKE) -C kfp_ray/v2 workflow-test
endif

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C $(PIPELINE_PATH) workflow-upload
ifeq ($(KFPv2), 0)
$(MAKE) -C kfp_ray/v1 workflow-upload
else
$(MAKE) -C kfp_ray/v2 workflow-upload
endif


.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
Expand Down
Loading

0 comments on commit de204fa

Please sign in to comment.