Skip to content

Commit

Permalink
Fix the calculation of the desired ray actors.
Browse files Browse the repository at this point in the history
Signed-off-by: Revital Sur <eres@il.ibm.com>
Co-authored-by: Boris Lublinsky <blublinsky@ibm.com>
  • Loading branch information
revit13 and Boris Lublinsky committed Oct 2, 2024
1 parent 00d1fea commit e49af82
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def operator() -> ActorHandle:

cls_name = clazz.__class__.__name__.replace('ActorClass(', '').replace(')','')
actors = [operator() for _ in range(n_actors)]
for i in range(60):
for i in range(120):
time.sleep(1)
alive = list_actors(filters=[("class_name", "=", cls_name), ("state", "=", "ALIVE")])
if len(actors) == len(alive):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def default_compute_execution_params(
cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0)
logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}")
# compute number of actors
n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5))
n_actors_cpu = int((cluster_cpu - 1) * 0.7 / a_options.get("num_cpus", 0.5))
n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB))
n_actors = min(n_actors_cpu, n_actors_memory)
# Check if we need gpu calculations as well
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################


def ededup_compute_execution_params(
worker_options: dict, # ray worker configuration
actor_options: dict, # actor's resource requirements
Expand Down Expand Up @@ -94,9 +95,9 @@ def ededup_compute_execution_params(
)
sys.exit(1)
# Define number of workers
n_workers = int((0.85 * cluster_cpu - required_hash_cpu) / actor_cpu)
n_workers = int((0.85 * (cluster_cpu - 1) - required_hash_cpu) / actor_cpu)
print(f"Number of workers - {n_workers}")
if n_workers < 2:
if n_workers < 0:
print(f"Cluster is too small - estimated number of workers {n_workers}")
sys.exit(1)
# Limit amount of workers and processors to prevent S3 saturation
Expand Down
2 changes: 1 addition & 1 deletion transforms/universal/fdedup/kfp_ray/fdedup_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def fdedup(
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: dict = {"num_cpus": 0.8},
runtime_actor_options: dict = {"num_cpus": 0.7},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# columns used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float:
cluster_cpu = worker_options["replicas"] * worker_options["cpu"]
cluster_memory = worker_options["replicas"] * worker_options["memory"]
print(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_memory}")
cluster_cpu *= 0.85
cluster_cpu -= 1
cluster_memory *= 0.85
# get actor requirements
actor_cpu = actor_options["num_cpus"]
Expand Down Expand Up @@ -172,7 +172,7 @@ def _false_negative_probability(ths: float, b: int, r: int) -> float:
n_preprocessors = int(
(0.85 * cluster_cpu - b_actors * bucket_cpu - m_actors * mhash_cpu - d_actors * doc_cpu) / actor_cpu
)
if n_preprocessors < 0:
if n_preprocessors <= 0:
print(f"Not enough CPUs to run fuzzy de duping, computed number of workers is {n_preprocessors}")
print(f"Required bucket actors {b_actors}, minhash actors {m_actors}, document actors {d_actors}")
print("Try to increase the size of the cluster")
Expand Down

0 comments on commit e49af82

Please sign in to comment.