From 62f8d15c503f5ce870e56ee7f5a4bafa8ee095cd Mon Sep 17 00:00:00 2001 From: Gary Lvov Date: Mon, 28 Oct 2024 02:14:48 -0400 Subject: [PATCH] add heterogeneous cluster support --- docs/source/features/ray.rst | 6 +- .../google_cloud/kuberay.yaml.jinja | 2 +- .../ray/grok_cluster_with_kubectl.py | 22 ++- .../workflows/ray/isaac_ray_tune.py | 8 +- .../workflows/ray/isaac_ray_util.py | 138 +++++++++++------- source/standalone/workflows/ray/launch.py | 19 ++- .../workflows/ray/submit_isaac_ray_job.py | 2 +- .../workflows/ray/wrap_isaac_ray_resources.py | 49 +++---- 8 files changed, 133 insertions(+), 113 deletions(-) diff --git a/docs/source/features/ray.rst b/docs/source/features/ray.rst index 0d98eaa1d8..4ed881b11a 100644 --- a/docs/source/features/ray.rst +++ b/docs/source/features/ray.rst @@ -31,8 +31,8 @@ For clarity, this guide refers to the jobs one layer below the topmost aggregate Both resource-wrapped and tuning aggregate jobs dispatch individual jobs to a designated Ray cluster, which leverages the cluster's resources (e.g., a single workstation node or multiple nodes) to execute these jobs with workers in parallel and/or sequentially. By default, aggregate jobs use all \ -available resources on each available GPU-enabled node for each sub-job worker. This can be changed through -specifying the ``--num_workers_per_node`` argument, especially critical for parallel aggregate +available resources on each available GPU-enabled node for each sub-job worker. This can be changed through +specifying the ``--num_workers_per_node`` argument, especially critical for parallel aggregate job processing on local or virtual multi-GPU machines In resource-wrapped aggregate jobs, each sub-job and its @@ -134,7 +134,7 @@ any cloud provider should work if one configures the following: - A Kubernetes setup with available NVIDIA RTX (likely ``l4`` or ``l40``) GPU-passthrough node-pool resources, that has access to your container registry/storage bucket and has the Ray operator enabled with correct IAM permissions. This can be easily achieved with services such as Google GKE or AWS EKS, - provided that your account or organization has been granted a GPU-budget. It is recommended + provided that your account or organization has been granted a GPU-budget. It is recommended to use manual kubernetes services as opposed to "autopilot" services for cost-effective experimentation as this way clusters can be completely shut down when not in use, although this may installing the `Nvidia GPU Operator `_ diff --git a/source/standalone/workflows/ray/cluster_configs/google_cloud/kuberay.yaml.jinja b/source/standalone/workflows/ray/cluster_configs/google_cloud/kuberay.yaml.jinja index 9d31e10266..1c256a3240 100644 --- a/source/standalone/workflows/ray/cluster_configs/google_cloud/kuberay.yaml.jinja +++ b/source/standalone/workflows/ray/cluster_configs/google_cloud/kuberay.yaml.jinja @@ -1,4 +1,4 @@ -# Jinja is used for templating here as full helm setup is excessive for application +# Jinja is used for templating here as full helm setup is excessive for application apiVersion: ray.io/v1alpha1 kind: RayCluster metadata: diff --git a/source/standalone/workflows/ray/grok_cluster_with_kubectl.py b/source/standalone/workflows/ray/grok_cluster_with_kubectl.py index 7cb2eb2f3a..bbcce7f799 100644 --- a/source/standalone/workflows/ray/grok_cluster_with_kubectl.py +++ b/source/standalone/workflows/ray/grok_cluster_with_kubectl.py @@ -57,14 +57,14 @@ def check_clusters_running(pods: list, clusters: set) -> bool: return clusters_running -def get_ray_address(head_pod: str, namespace: str = "default", - ray_head_name: str = "head") -> str: +def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str: cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace] try: output = subprocess.check_output(cmd).decode() except subprocess.CalledProcessError as e: - raise ValueError(f"Could not enter head container with cmd {cmd}: {e}" - "Perhaps try a different namespace or ray head name.") + raise ValueError( + f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name." + ) match = re.search(r"RAY_ADDRESS='([^']+)'", output) if match: return match.group(1) @@ -72,7 +72,7 @@ def get_ray_address(head_pod: str, namespace: str = "default", return None -def process_cluster(cluster_info: dict, ray_head_name:str = "head") -> str: +def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str: cluster, pods, namespace = cluster_info head_pod = None for pod_name, status in pods: @@ -95,13 +95,9 @@ def process_cluster(cluster_info: dict, ray_head_name:str = "head") -> str: def main(): # Parse command-line arguments parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.") - parser.add_argument( - "--prefix", default="isaacray", help="The prefix for the cluster names." - ) + parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.") parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.") - parser.add_argument("--ray_head_name", - default="head", - help="The metadata name for the ray head container") + parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container") args = parser.parse_args() CLUSTER_NAME_PREFIX = args.prefix @@ -150,8 +146,8 @@ def main(): with ThreadPoolExecutor() as executor: future_to_cluster = { - executor.submit(process_cluster, - info, args.ray_head_name): info[0] for info in cluster_infos} + executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos + } for future in as_completed(future_to_cluster): cluster_name = future_to_cluster[future] try: diff --git a/source/standalone/workflows/ray/isaac_ray_tune.py b/source/standalone/workflows/ray/isaac_ray_tune.py index 4b7788ce52..103290ba35 100644 --- a/source/standalone/workflows/ray/isaac_ray_tune.py +++ b/source/standalone/workflows/ray/isaac_ray_tune.py @@ -146,10 +146,10 @@ def invoke_tuning_run( num_samples=num_samples, # Ensure args.num_samples is well-defined ), run_config=air.RunConfig( - name=f"IsaacRay-{cfg["runner_args"]["--task"]}-tune", - storage_path=storage_path, - verbose=1, - failure_config=air.FailureConfig(fail_fast=True) + name=f"IsaacRay-{cfg['runner_args']['--task']}-tune", + storage_path=storage_path, + verbose=1, + failure_config=air.FailureConfig(fail_fast=True), ), ) diff --git a/source/standalone/workflows/ray/isaac_ray_util.py b/source/standalone/workflows/ray/isaac_ray_util.py index 4e6398e566..cc51ff1383 100644 --- a/source/standalone/workflows/ray/isaac_ray_util.py +++ b/source/standalone/workflows/ray/isaac_ray_util.py @@ -2,15 +2,16 @@ # All rights reserved. # # SPDX-License-Identifier: BSD-3-Clause -import ray -from tensorboard.backend.event_processing.event_accumulator import EventAccumulator - import argparse import os import re import subprocess from datetime import datetime +import ray +from tensorboard.backend.event_processing.event_accumulator import EventAccumulator + + def load_tensorboard_logs(directory: str) -> dict: """From a tensorboard directory, get the latest scalar values. @@ -94,7 +95,6 @@ def execute_job( test_mode: bool = False, extract_experiment: bool = False, persistent_dir: str | None = None, - ) -> str | dict: """Issue a job (shell command). @@ -133,29 +133,43 @@ def execute_job( output = result.stdout.strip().split("\n") for gpu_info in output: name, memory_free, serial = gpu_info.split(", ") - result_details.append(f"{identifier_string}[INFO]: Name: {name}|Memory Available: {memory_free} MB|Serial Number {serial} \n") - + result_details.append( + f"{identifier_string}[INFO]: Name: {name}|Memory Available: {memory_free} MB|Serial Number" + f" {serial} \n" + ) + # Get GPU count from PyTorch num_gpus_detected = torch.cuda.device_count() result_details.append(f"{identifier_string}[INFO]: Detected GPUs from PyTorch: {num_gpus_detected} \n") - + # Check CUDA_VISIBLE_DEVICES and count the number of visible GPUs - cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None) + cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES") if cuda_visible_devices: visible_devices_count = len(cuda_visible_devices.split(",")) - result_details.append(f"{identifier_string}[INFO]: GPUs visible via CUDA_VISIBLE_DEVICES: {visible_devices_count} \n") + result_details.append( + f"{identifier_string}[INFO]: GPUs visible via CUDA_VISIBLE_DEVICES: {visible_devices_count} \n" + ) else: visible_devices_count = len(output) # All GPUs visible if CUDA_VISIBLE_DEVICES is not set - result_details.append(f"{identifier_string}[INFO]: CUDA_VISIBLE_DEVICES not set; all GPUs visible ({visible_devices_count}) \n") - + result_details.append( + f"{identifier_string}[INFO]: CUDA_VISIBLE_DEVICES not set; all GPUs visible" + f" ({visible_devices_count}) \n" + ) + # If PyTorch GPU count disagrees with nvidia-smi, reset CUDA_VISIBLE_DEVICES and rerun detection if num_gpus_detected != len(output): - result_details.append(f"{identifier_string}[WARNING]: PyTorch and nvidia-smi disagree on GPU count! Re-running with all GPUs visible. \n") + result_details.append( + f"{identifier_string}[WARNING]: PyTorch and nvidia-smi disagree on GPU count! Re-running with all" + " GPUs visible. \n" + ) result_details.append(f"{identifier_string}[INFO]: This shows that GPU resources were isolated.\n") os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in range(len(output))]) num_gpus_detected_after_reset = torch.cuda.device_count() - result_details.append(f"{identifier_string}[INFO]: After setting CUDA_VISIBLE_DEVICES, PyTorch detects {num_gpus_detected_after_reset} GPUs \n") - + result_details.append( + f"{identifier_string}[INFO]: After setting CUDA_VISIBLE_DEVICES, PyTorch detects" + f" {num_gpus_detected_after_reset} GPUs \n" + ) + except subprocess.CalledProcessError as e: print(f"Error calling nvidia-smi: {e.stderr}") result_details.append({"error": "Failed to retrieve GPU information"}) @@ -215,22 +229,24 @@ def execute_job( return " ".join(result_details) -def get_gpu_node_resources(total_resources: bool = False, - one_node_only: bool = False, - include_gb_ram: bool = False, - include_id: bool = False, - ray_address: str = "auto") -> list[dict]|dict: +def get_gpu_node_resources( + total_resources: bool = False, + one_node_only: bool = False, + include_gb_ram: bool = False, + include_id: bool = False, + ray_address: str = "auto", +) -> list[dict] | dict: """Get information about available GPU node resources. Args: total_resources: When true, return total available resources. Defaults to False. one_node_only: When true, return resources for a single node. Defaults to False. include_gb_ram: Set to true to convert MB to GB in result - include_id: Set to true to include node ID + include_id: Set to true to include node ID ray_address: The ray address to connect to. Returns: - Resource information for all nodes, sorted by descending GPU count, then descending CPU + Resource information for all nodes, sorted by descending GPU count, then descending CPU count, then descending RAM capacity, and finally by node ID in ascending order if available, or simply the resource for a single node if requested. """ @@ -260,10 +276,7 @@ def get_gpu_node_resources(total_resources: bool = False, total_cpus += cpus total_gpus += gpus total_memory += memory - node_resources = sorted(node_resources, key=lambda x: (-x["gpu"], - -x["cpu"], - -x["memory"], - x.get("id", ""))) + node_resources = sorted(node_resources, key=lambda x: (-x["gpu"], -x["cpu"], -x["memory"], x.get("id", ""))) if total_resources: # Return summed total resources @@ -274,10 +287,13 @@ def get_gpu_node_resources(total_resources: bool = False, return node_resources -def add_resource_arguments(arg_parser: argparse.ArgumentParser, - defaults: list | None = None, - cluster_create_defaults: bool =False,) -> argparse.ArgumentParser: - """Add resource arguments to a cluster; this is shared across both + +def add_resource_arguments( + arg_parser: argparse.ArgumentParser, + defaults: list | None = None, + cluster_create_defaults: bool = False, +) -> argparse.ArgumentParser: + """Add resource arguments to a cluster; this is shared across both wrapping resources and launching clusters. Args: @@ -292,41 +308,56 @@ def add_resource_arguments(arg_parser: argparse.ArgumentParser, defaults = [[1], [8], [16], [1]] else: defaults = [None, None, None, [1]] - arg_parser.add_argument("--gpu_per_worker", nargs='+', type=int, default=defaults[0], - help="Number of GPUs per worker node. Supply more than one for heterogeneous resources") - arg_parser.add_argument("--cpu_per_worker", nargs='+', type=int, default=defaults[1], - help="Number of CPUs per worker node. Supply more than one for heterogeneous resources") - arg_parser.add_argument("--ram_gb_per_worker", nargs='+', type=int, default=defaults[2], - help="RAM in GB per worker node. Supply more than one for heterogeneous resources.") - arg_parser.add_argument("--num_workers", - nargs='+', - type=int, - default=defaults[3], - help="Number of desired workers. Supply more than one for heterogeneous resources.") + arg_parser.add_argument( + "--gpu_per_worker", + nargs="+", + type=int, + default=defaults[0], + help="Number of GPUs per worker node. Supply more than one for heterogeneous resources", + ) + arg_parser.add_argument( + "--cpu_per_worker", + nargs="+", + type=int, + default=defaults[1], + help="Number of CPUs per worker node. Supply more than one for heterogeneous resources", + ) + arg_parser.add_argument( + "--ram_gb_per_worker", + nargs="+", + type=int, + default=defaults[2], + help="RAM in GB per worker node. Supply more than one for heterogeneous resources.", + ) + arg_parser.add_argument( + "--num_workers", + nargs="+", + type=int, + default=defaults[3], + help="Number of desired workers. Supply more than one for heterogeneous resources.", + ) return arg_parser -def fill_in_missing_resources(args: argparse.Namespace, - resources: dict | None = None, - cluster_creation_flag: bool = False, - policy: callable = max): - """ Normalize the lengths of resource lists based on the longest list provided. """ + +def fill_in_missing_resources( + args: argparse.Namespace, resources: dict | None = None, cluster_creation_flag: bool = False, policy: callable = max +): + """Normalize the lengths of resource lists based on the longest list provided.""" print("[INFO]: Filling in missing command line arguments with best guess...") if resources is None: resources = { - 'gpu_per_worker': args.gpu_per_worker, - 'cpu_per_worker': args.cpu_per_worker, - 'ram_gb_per_worker': args.ram_gb_per_worker, - 'num_workers': args.num_workers + "gpu_per_worker": args.gpu_per_worker, + "cpu_per_worker": args.cpu_per_worker, + "ram_gb_per_worker": args.ram_gb_per_worker, + "num_workers": args.num_workers, } if cluster_creation_flag: - cluster_creation_resources = { - 'worker_accelerator': args.worker_accelerator - } + cluster_creation_resources = {"worker_accelerator": args.worker_accelerator} resources.update(cluster_creation_resources) # Calculate the maximum length of any list max_length = max(len(v) for v in resources.values()) - print(f"[INFO]: Resource list lengths:") + print("[INFO]: Resource list lengths:") for key, value in resources.items(): print(f"[INFO] {key}: {len(value)} values {value}") @@ -351,6 +382,7 @@ def fill_in_missing_resources(args: argparse.Namespace, print("[INFO]: Done filling in command line arguments...\n\n") return args + def populate_isaac_ray_cfg_args(cfg: dict = {}) -> dict: """Small utility method to create empty fields if needed for a configuration.""" if "runner_args" not in cfg: diff --git a/source/standalone/workflows/ray/launch.py b/source/standalone/workflows/ray/launch.py index 3e3058cfe6..6e0fe6735e 100644 --- a/source/standalone/workflows/ray/launch.py +++ b/source/standalone/workflows/ray/launch.py @@ -8,9 +8,9 @@ import subprocess import yaml +import isaac_ray_util from jinja2 import Environment, FileSystemLoader from kubernetes import config -import isaac_ray_util """This script helps create one or more KubeRay clusters. @@ -27,7 +27,7 @@ --namespace --image \ --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1 - # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers, + # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers, # and 2 GPUx4 nvidia-tesla-t4 GPU workers ./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \ --namespace --image \ @@ -57,7 +57,7 @@ def apply_manifest(args: argparse.Namespace) -> None: # Convert args namespace to a dictionary template_params = vars(args) - + # Load and render the template template = jinja_env.get_template(template_file) file_contents = template.render(template_params) @@ -128,19 +128,16 @@ def parse_args() -> argparse.Namespace: nargs="+", type=str, default=["nvidia-l4"], - help="GPU accelerator name. Supply more than one for heterogenous resources.", + help="GPU accelerator name. Supply more than one for heterogeneous resources.", ) - arg_parser = isaac_ray_util.add_resource_arguments(arg_parser, - cluster_create_defaults=True) - + arg_parser = isaac_ray_util.add_resource_arguments(arg_parser, cluster_create_defaults=True) + arg_parser.add_argument( "--num_clusters", type=int, default=1, - help=( - "How many Ray Clusters to create." - ), + help="How many Ray Clusters to create.", ) arg_parser.add_argument( "--num_head_cpu", @@ -156,6 +153,7 @@ def parse_args() -> argparse.Namespace: args = arg_parser.parse_args() return isaac_ray_util.fill_in_missing_resources(args, cluster_creation_flag=True) + def main(): args = parse_args() @@ -169,5 +167,6 @@ def main(): args.name = default_name + "-" + str(i) apply_manifest(args) + if __name__ == "__main__": main() diff --git a/source/standalone/workflows/ray/submit_isaac_ray_job.py b/source/standalone/workflows/ray/submit_isaac_ray_job.py index a8f92e069a..47e3150d33 100644 --- a/source/standalone/workflows/ray/submit_isaac_ray_job.py +++ b/source/standalone/workflows/ray/submit_isaac_ray_job.py @@ -106,7 +106,7 @@ def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None: if len(jobs) < len(clusters): print("[INFO]: Less jobs than clusters, some clusters will not receive jobs") elif len(jobs) == len(clusters): - print(f"[INFO]: Exactly one job per cluster") + print("[INFO]: Exactly one job per cluster") else: print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.") with ThreadPoolExecutor() as executor: diff --git a/source/standalone/workflows/ray/wrap_isaac_ray_resources.py b/source/standalone/workflows/ray/wrap_isaac_ray_resources.py index 4c377a5b8a..8ee5525118 100644 --- a/source/standalone/workflows/ray/wrap_isaac_ray_resources.py +++ b/source/standalone/workflows/ray/wrap_isaac_ray_resources.py @@ -9,7 +9,6 @@ import ray from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy - """ This script dispatches sub-job(s) (either individual jobs or tuning aggregate jobs) to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate @@ -37,7 +36,7 @@ of sub-jobs that can be near-simultaneously submitted. This script is meant to be executed on a Ray cluster head node as an aggregate cluster job. -To submit aggregate cluster jobs such as this script to one or more remote clusters, +To submit aggregate cluster jobs such as this script to one or more remote clusters, see :file:`../submit_isaac_ray_job.py`. KubeRay clusters on Google GKE can be created with :file:`../launch.py` @@ -50,10 +49,7 @@ """ -def wrap_resources_to_jobs( - jobs: list[str], - args: argparse.Namespace -) -> None: +def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None: """ Provided a list of jobs, dispatch jobs to one worker per available node, unless otherwise specified by resource constraints. @@ -66,41 +62,40 @@ def wrap_resources_to_jobs( if not ray.is_initialized(): ray.init(address=args.ray_address, log_to_driver=True) job_results = [] - gpu_node_resources = isaac_ray_util.get_gpu_node_resources(include_id=True, - include_gb_ram=True) - + gpu_node_resources = isaac_ray_util.get_gpu_node_resources(include_id=True, include_gb_ram=True) + if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers: raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).") - + num_nodes = len(gpu_node_resources) # Populate arguments formatted_node_resources = { - "gpu_per_worker":[gpu_node_resources[i]["gpu"] for i in range(num_nodes)], - "cpu_per_worker":[gpu_node_resources[i]["cpu"] for i in range(num_nodes)], + "gpu_per_worker": [gpu_node_resources[i]["gpu"] for i in range(num_nodes)], + "cpu_per_worker": [gpu_node_resources[i]["cpu"] for i in range(num_nodes)], "ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)], - "num_workers": args.num_workers # By default, 1 worker por node + "num_workers": args.num_workers, # By default, 1 worker por node } - args = isaac_ray_util.fill_in_missing_resources(args, - resources=formatted_node_resources, - policy=min) + args = isaac_ray_util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min) print(f"[INFO]: Number of GPU nodes found: {num_nodes}") if args.test: - jobs = ['nvidia-smi'] * num_nodes + jobs = ["nvidia-smi"] * num_nodes for i, job in enumerate(jobs): gpu_node = gpu_node_resources[i % num_nodes] print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}") - print(f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}" - f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}") + print( + f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}" + f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}" + ) print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers") - num_gpus = args.gpu_per_worker[i]/args.num_workers[i] - num_cpus = args.cpu_per_worker[i]/args.num_workers[i] - memory = (args.ram_gb_per_worker[i] * 1024**3)/args.num_workers[i] + num_gpus = args.gpu_per_worker[i] / args.num_workers[i] + num_cpus = args.cpu_per_worker[i] / args.num_workers[i] + memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i] print(f"[INFO]: Requesting {num_gpus = } {num_cpus = } {memory = } id = {gpu_node['id']}") job = isaac_ray_util.remote_execute_job.options( - num_gpus=num_gpus, + num_gpus=num_gpus, num_cpus=num_cpus, memory=memory, - scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False) + scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False), ).remote(job, f"Job {i}", args.test) job_results.append(job) @@ -109,6 +104,7 @@ def wrap_resources_to_jobs( print(f"[INFO]: Job {i} result: {result}") print("[INFO]: All jobs completed.") + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.") parser = isaac_ray_util.add_resource_arguments(arg_parser=parser) @@ -127,7 +123,4 @@ def wrap_resources_to_jobs( else: formatted_jobs = [] print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs = }") - wrap_resources_to_jobs( - jobs=formatted_jobs, - args=args - ) + wrap_resources_to_jobs(jobs=formatted_jobs, args=args)