Skip to content

Commit

Permalink
Fix compute_exec_params
Browse files Browse the repository at this point in the history
Signed-off-by: Revital Sur <eres@il.ibm.com>
  • Loading branch information
revit13 committed Jun 3, 2024
1 parent a0028b2 commit 47c4c68
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,50 +56,3 @@ def set_s3_env_vars_to_component(
for env_name, _ in env2key.items():
env2key[prefix + "_" + env_name] = env2key.pop(env_name)
kubernetes.use_secret_as_env(task=task, secret_name='s3-secret', secret_key_to_env=env2key)

@staticmethod
def default_compute_execution_params(
worker_options: str, # ray worker configuration
actor_options: str, # cpus per actor
) -> str:
"""
This is the most simplistic transform execution parameters computation
:param worker_options: configuration of ray workers
:param actor_options: actor request requirements
:return: number of actors
"""
import sys

from data_processing.utils import GB, get_logger
from workflow_support.runtime_utils import KFPUtils

logger = get_logger(__name__)

# convert input
w_options = KFPUtils.load_from_json(worker_options.replace("'", '"'))
a_options = KFPUtils.load_from_json(actor_options.replace("'", '"'))
# Compute available cluster resources
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_mem = w_options["replicas"] * w_options["memory"]
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
actor_gpu = a_options.get("num_gpus", 0)
if actor_gpu > 0:
n_actors_gpu = int(cluster_gpu / actor_gpu)
n_actors = min(n_actors, n_actors_gpu)
logger.info(f"Number of actors - {n_actors}")
if n_actors < 1:
logger.warning(
f"Not enough cpu/gpu/memory to run transform, "
f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, "
f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, "
f"required cpu {actor_gpu}, available {cluster_gpu}"
)
sys.exit(1)

return str(n_actors)
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,50 @@ def load_from_json(js: str) -> dict[str, Any]:
except Exception as e:
logger.warning(f"Failed to load parameters {js} with error {e}")
sys.exit(1)

@staticmethod
def default_compute_execution_params(
worker_options: str, # ray worker configuration
actor_options: str, # cpus per actor
) -> str:
"""
This is the most simplistic transform execution parameters computation
:param worker_options: configuration of ray workers
:param actor_options: actor request requirements
:return: number of actors
"""
import sys

from data_processing.utils import GB, get_logger
from workflow_support.runtime_utils import KFPUtils

logger = get_logger(__name__)

# convert input
w_options = KFPUtils.load_from_json(worker_options.replace("'", '"'))
a_options = KFPUtils.load_from_json(actor_options.replace("'", '"'))
# Compute available cluster resources
cluster_cpu = w_options["replicas"] * w_options["cpu"]
cluster_mem = w_options["replicas"] * w_options["memory"]
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
actor_gpu = a_options.get("num_gpus", 0)
if actor_gpu > 0:
n_actors_gpu = int(cluster_gpu / actor_gpu)
n_actors = min(n_actors, n_actors_gpu)
logger.info(f"Number of actors - {n_actors}")
if n_actors < 1:
logger.warning(
f"Not enough cpu/gpu/memory to run transform, "
f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, "
f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, "
f"required cpu {actor_gpu}, available {cluster_gpu}"
)
sys.exit(1)

return str(n_actors)
4 changes: 2 additions & 2 deletions transforms/universal/noop/kfp_ray/v2/noop_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@
# a result, instead of creating a component we are creating it in place here.
@dsl.component(base_image=base_kfp_image)
def compute_exec_params(worker_options: str, actor_options: str) -> str:
from workflow_support.compile_utils import ComponentUtils
from workflow_support.runtime_utils import KFPUtils

return ComponentUtils.default_compute_execution_params(worker_options, actor_options)
return KFPUtils.default_compute_execution_params(worker_options, actor_options)


# create Ray cluster
Expand Down

0 comments on commit 47c4c68

Please sign in to comment.