diff --git a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py index c7362ef5e..5225508fb 100644 --- a/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py +++ b/data-processing-lib/ray/src/data_processing_ray/runtime/ray/ray_utils.py @@ -111,7 +111,7 @@ def operator() -> ActorHandle: cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','') actors = [operator() for _ in range(n_actors)] - for i in range(60): + for i in range(120): time.sleep(1) alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")]) if len(actors) == len(alive): diff --git a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py index 73b6a5cd4..7fa76453f 100644 --- a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py +++ b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py @@ -138,7 +138,7 @@ def default_compute_execution_params( cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0) logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}") # compute number of actors - n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5)) + n_actors_cpu = int((cluster_cpu - 1) * 0.7 / a_options.get("num_cpus", 0.5)) n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB)) n_actors = min(n_actors_cpu, n_actors_memory) # Check if we need gpu calculations as well diff --git a/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py b/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py index a20a2e030..99bcb2fd3 100644 --- a/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py +++ b/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py @@ -10,6 +10,7 @@ # limitations under the License. ################################################################################ + def ededup_compute_execution_params( worker_options: dict, # ray worker configuration actor_options: dict, # actor's resource requirements @@ -94,9 +95,9 @@ def ededup_compute_execution_params( ) sys.exit(1) # Define number of workers - n_workers = int((0.85 * cluster_cpu - required_hash_cpu) / actor_cpu) + n_workers = int((0.85 * (cluster_cpu - 1) - required_hash_cpu) / actor_cpu) print(f"Number of workers - {n_workers}") - if n_workers < 2: + if n_workers < 0: print(f"Cluster is too small - estimated number of workers {n_workers}") sys.exit(1) # Limit amount of workers and processors to prevent S3 saturation diff --git a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py index bb2cc3194..3156ab6f1 100644 --- a/transforms/universal/fdedup/kfp_ray/fdedup_wf.py +++ b/transforms/universal/fdedup/kfp_ray/fdedup_wf.py @@ -82,7 +82,7 @@ def fdedup( data_max_files: int = -1, data_num_samples: int = -1, # orchestrator - runtime_actor_options: dict = {"num_cpus": 0.8}, + runtime_actor_options: dict = {"num_cpus": 0.7}, runtime_pipeline_id: str = "pipeline_id", runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'}, # columns used diff --git a/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py b/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py index ebcecadb9..726200339 100644 --- a/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py +++ b/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py @@ -140,7 +140,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float: cluster_cpu = worker_options["replicas"] * worker_options["cpu"] cluster_memory = worker_options["replicas"] * worker_options["memory"] print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}") - cluster_cpu *= 0.85 + cluster_cpu -= 1 cluster_memory *= 0.85 # get actor requirements actor_cpu = actor_options["num_cpus"] @@ -172,7 +172,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float: n_preprocessors = int( (0.85 * cluster_cpu - b_actors * bucket_cpu - m_actors * mhash_cpu - d_actors * doc_cpu) / actor_cpu ) - if n_preprocessors < 0: + if n_preprocessors <= 0: print(f"Not enough CPUs to run fuzzy de duping, computed number of workers is {n_preprocessors}") print(f"Required bucket actors {b_actors}, minhash actors {m_actors}, document actors {d_actors}") print("Try to increase the size of the cluster")