Skip to content

Commit

Permalink
add heterogeneous cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
glvov-bdai committed Oct 28, 2024
1 parent 50a7e00 commit 62f8d15
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 113 deletions.
6 changes: 3 additions & 3 deletions docs/source/features/ray.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ For clarity, this guide refers to the jobs one layer below the topmost aggregate
Both resource-wrapped and tuning aggregate jobs dispatch individual jobs to a designated Ray
cluster, which leverages the cluster's resources (e.g., a single workstation node or multiple nodes)
to execute these jobs with workers in parallel and/or sequentially. By default, aggregate jobs use all \
available resources on each available GPU-enabled node for each sub-job worker. This can be changed through
specifying the ``--num_workers_per_node`` argument, especially critical for parallel aggregate
available resources on each available GPU-enabled node for each sub-job worker. This can be changed through
specifying the ``--num_workers_per_node`` argument, especially critical for parallel aggregate
job processing on local or virtual multi-GPU machines

In resource-wrapped aggregate jobs, each sub-job and its
Expand Down Expand Up @@ -134,7 +134,7 @@ any cloud provider should work if one configures the following:
- A Kubernetes setup with available NVIDIA RTX (likely ``l4`` or ``l40``) GPU-passthrough node-pool resources,
that has access to your container registry/storage bucket and has the Ray operator enabled with correct IAM
permissions. This can be easily achieved with services such as Google GKE or AWS EKS,
provided that your account or organization has been granted a GPU-budget. It is recommended
provided that your account or organization has been granted a GPU-budget. It is recommended
to use manual kubernetes services as opposed to "autopilot" services for cost-effective
experimentation as this way clusters can be completely shut down when not in use, although
this may installing the `Nvidia GPU Operator <https://docs.nvidia.com/datacenter/cloud-native/gpu-operator/latest/google-gke.html>`_
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Jinja is used for templating here as full helm setup is excessive for application
# Jinja is used for templating here as full helm setup is excessive for application
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
Expand Down
22 changes: 9 additions & 13 deletions source/standalone/workflows/ray/grok_cluster_with_kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,22 @@ def check_clusters_running(pods: list, clusters: set) -> bool:
return clusters_running


def get_ray_address(head_pod: str, namespace: str = "default",
ray_head_name: str = "head") -> str:
def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str:
cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace]
try:
output = subprocess.check_output(cmd).decode()
except subprocess.CalledProcessError as e:
raise ValueError(f"Could not enter head container with cmd {cmd}: {e}"
"Perhaps try a different namespace or ray head name.")
raise ValueError(
f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name."
)
match = re.search(r"RAY_ADDRESS='([^']+)'", output)
if match:
return match.group(1)
else:
return None


def process_cluster(cluster_info: dict, ray_head_name:str = "head") -> str:
def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str:
cluster, pods, namespace = cluster_info
head_pod = None
for pod_name, status in pods:
Expand All @@ -95,13 +95,9 @@ def process_cluster(cluster_info: dict, ray_head_name:str = "head") -> str:
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.")
parser.add_argument(
"--prefix", default="isaacray", help="The prefix for the cluster names."
)
parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.")
parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.")
parser.add_argument("--ray_head_name",
default="head",
help="The metadata name for the ray head container")
parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container")
args = parser.parse_args()

CLUSTER_NAME_PREFIX = args.prefix
Expand Down Expand Up @@ -150,8 +146,8 @@ def main():

with ThreadPoolExecutor() as executor:
future_to_cluster = {
executor.submit(process_cluster,
info, args.ray_head_name): info[0] for info in cluster_infos}
executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
}
for future in as_completed(future_to_cluster):
cluster_name = future_to_cluster[future]
try:
Expand Down
8 changes: 4 additions & 4 deletions source/standalone/workflows/ray/isaac_ray_tune.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ def invoke_tuning_run(
num_samples=num_samples, # Ensure args.num_samples is well-defined
),
run_config=air.RunConfig(
name=f"IsaacRay-{cfg["runner_args"]["--task"]}-tune",
storage_path=storage_path,
verbose=1,
failure_config=air.FailureConfig(fail_fast=True)
name=f"IsaacRay-{cfg['runner_args']['--task']}-tune",
storage_path=storage_path,
verbose=1,
failure_config=air.FailureConfig(fail_fast=True),
),
)

Expand Down
138 changes: 85 additions & 53 deletions source/standalone/workflows/ray/isaac_ray_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import ray
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator

import argparse
import os
import re
import subprocess
from datetime import datetime

import ray
from tensorboard.backend.event_processing.event_accumulator import EventAccumulator


def load_tensorboard_logs(directory: str) -> dict:
"""From a tensorboard directory, get the latest scalar values.
Expand Down Expand Up @@ -94,7 +95,6 @@ def execute_job(
test_mode: bool = False,
extract_experiment: bool = False,
persistent_dir: str | None = None,

) -> str | dict:
"""Issue a job (shell command).
Expand Down Expand Up @@ -133,29 +133,43 @@ def execute_job(
output = result.stdout.strip().split("\n")
for gpu_info in output:
name, memory_free, serial = gpu_info.split(", ")
result_details.append(f"{identifier_string}[INFO]: Name: {name}|Memory Available: {memory_free} MB|Serial Number {serial} \n")

result_details.append(
f"{identifier_string}[INFO]: Name: {name}|Memory Available: {memory_free} MB|Serial Number"
f" {serial} \n"
)

# Get GPU count from PyTorch
num_gpus_detected = torch.cuda.device_count()
result_details.append(f"{identifier_string}[INFO]: Detected GPUs from PyTorch: {num_gpus_detected} \n")

# Check CUDA_VISIBLE_DEVICES and count the number of visible GPUs
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES", None)
cuda_visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices:
visible_devices_count = len(cuda_visible_devices.split(","))
result_details.append(f"{identifier_string}[INFO]: GPUs visible via CUDA_VISIBLE_DEVICES: {visible_devices_count} \n")
result_details.append(
f"{identifier_string}[INFO]: GPUs visible via CUDA_VISIBLE_DEVICES: {visible_devices_count} \n"
)
else:
visible_devices_count = len(output) # All GPUs visible if CUDA_VISIBLE_DEVICES is not set
result_details.append(f"{identifier_string}[INFO]: CUDA_VISIBLE_DEVICES not set; all GPUs visible ({visible_devices_count}) \n")

result_details.append(
f"{identifier_string}[INFO]: CUDA_VISIBLE_DEVICES not set; all GPUs visible"
f" ({visible_devices_count}) \n"
)

# If PyTorch GPU count disagrees with nvidia-smi, reset CUDA_VISIBLE_DEVICES and rerun detection
if num_gpus_detected != len(output):
result_details.append(f"{identifier_string}[WARNING]: PyTorch and nvidia-smi disagree on GPU count! Re-running with all GPUs visible. \n")
result_details.append(
f"{identifier_string}[WARNING]: PyTorch and nvidia-smi disagree on GPU count! Re-running with all"
" GPUs visible. \n"
)
result_details.append(f"{identifier_string}[INFO]: This shows that GPU resources were isolated.\n")
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join([str(i) for i in range(len(output))])
num_gpus_detected_after_reset = torch.cuda.device_count()
result_details.append(f"{identifier_string}[INFO]: After setting CUDA_VISIBLE_DEVICES, PyTorch detects {num_gpus_detected_after_reset} GPUs \n")

result_details.append(
f"{identifier_string}[INFO]: After setting CUDA_VISIBLE_DEVICES, PyTorch detects"
f" {num_gpus_detected_after_reset} GPUs \n"
)

except subprocess.CalledProcessError as e:
print(f"Error calling nvidia-smi: {e.stderr}")
result_details.append({"error": "Failed to retrieve GPU information"})
Expand Down Expand Up @@ -215,22 +229,24 @@ def execute_job(
return " ".join(result_details)


def get_gpu_node_resources(total_resources: bool = False,
one_node_only: bool = False,
include_gb_ram: bool = False,
include_id: bool = False,
ray_address: str = "auto") -> list[dict]|dict:
def get_gpu_node_resources(
total_resources: bool = False,
one_node_only: bool = False,
include_gb_ram: bool = False,
include_id: bool = False,
ray_address: str = "auto",
) -> list[dict] | dict:
"""Get information about available GPU node resources.
Args:
total_resources: When true, return total available resources. Defaults to False.
one_node_only: When true, return resources for a single node. Defaults to False.
include_gb_ram: Set to true to convert MB to GB in result
include_id: Set to true to include node ID
include_id: Set to true to include node ID
ray_address: The ray address to connect to.
Returns:
Resource information for all nodes, sorted by descending GPU count, then descending CPU
Resource information for all nodes, sorted by descending GPU count, then descending CPU
count, then descending RAM capacity, and finally by node ID in ascending order if available,
or simply the resource for a single node if requested.
"""
Expand Down Expand Up @@ -260,10 +276,7 @@ def get_gpu_node_resources(total_resources: bool = False,
total_cpus += cpus
total_gpus += gpus
total_memory += memory
node_resources = sorted(node_resources, key=lambda x: (-x["gpu"],
-x["cpu"],
-x["memory"],
x.get("id", "")))
node_resources = sorted(node_resources, key=lambda x: (-x["gpu"], -x["cpu"], -x["memory"], x.get("id", "")))

if total_resources:
# Return summed total resources
Expand All @@ -274,10 +287,13 @@ def get_gpu_node_resources(total_resources: bool = False,

return node_resources

def add_resource_arguments(arg_parser: argparse.ArgumentParser,
defaults: list | None = None,
cluster_create_defaults: bool =False,) -> argparse.ArgumentParser:
"""Add resource arguments to a cluster; this is shared across both

def add_resource_arguments(
arg_parser: argparse.ArgumentParser,
defaults: list | None = None,
cluster_create_defaults: bool = False,
) -> argparse.ArgumentParser:
"""Add resource arguments to a cluster; this is shared across both
wrapping resources and launching clusters.
Args:
Expand All @@ -292,41 +308,56 @@ def add_resource_arguments(arg_parser: argparse.ArgumentParser,
defaults = [[1], [8], [16], [1]]
else:
defaults = [None, None, None, [1]]
arg_parser.add_argument("--gpu_per_worker", nargs='+', type=int, default=defaults[0],
help="Number of GPUs per worker node. Supply more than one for heterogeneous resources")
arg_parser.add_argument("--cpu_per_worker", nargs='+', type=int, default=defaults[1],
help="Number of CPUs per worker node. Supply more than one for heterogeneous resources")
arg_parser.add_argument("--ram_gb_per_worker", nargs='+', type=int, default=defaults[2],
help="RAM in GB per worker node. Supply more than one for heterogeneous resources.")
arg_parser.add_argument("--num_workers",
nargs='+',
type=int,
default=defaults[3],
help="Number of desired workers. Supply more than one for heterogeneous resources.")
arg_parser.add_argument(
"--gpu_per_worker",
nargs="+",
type=int,
default=defaults[0],
help="Number of GPUs per worker node. Supply more than one for heterogeneous resources",
)
arg_parser.add_argument(
"--cpu_per_worker",
nargs="+",
type=int,
default=defaults[1],
help="Number of CPUs per worker node. Supply more than one for heterogeneous resources",
)
arg_parser.add_argument(
"--ram_gb_per_worker",
nargs="+",
type=int,
default=defaults[2],
help="RAM in GB per worker node. Supply more than one for heterogeneous resources.",
)
arg_parser.add_argument(
"--num_workers",
nargs="+",
type=int,
default=defaults[3],
help="Number of desired workers. Supply more than one for heterogeneous resources.",
)
return arg_parser

def fill_in_missing_resources(args: argparse.Namespace,
resources: dict | None = None,
cluster_creation_flag: bool = False,
policy: callable = max):
""" Normalize the lengths of resource lists based on the longest list provided. """

def fill_in_missing_resources(
args: argparse.Namespace, resources: dict | None = None, cluster_creation_flag: bool = False, policy: callable = max
):
"""Normalize the lengths of resource lists based on the longest list provided."""
print("[INFO]: Filling in missing command line arguments with best guess...")
if resources is None:
resources = {
'gpu_per_worker': args.gpu_per_worker,
'cpu_per_worker': args.cpu_per_worker,
'ram_gb_per_worker': args.ram_gb_per_worker,
'num_workers': args.num_workers
"gpu_per_worker": args.gpu_per_worker,
"cpu_per_worker": args.cpu_per_worker,
"ram_gb_per_worker": args.ram_gb_per_worker,
"num_workers": args.num_workers,
}
if cluster_creation_flag:
cluster_creation_resources = {
'worker_accelerator': args.worker_accelerator
}
cluster_creation_resources = {"worker_accelerator": args.worker_accelerator}
resources.update(cluster_creation_resources)

# Calculate the maximum length of any list
max_length = max(len(v) for v in resources.values())
print(f"[INFO]: Resource list lengths:")
print("[INFO]: Resource list lengths:")
for key, value in resources.items():
print(f"[INFO] {key}: {len(value)} values {value}")

Expand All @@ -351,6 +382,7 @@ def fill_in_missing_resources(args: argparse.Namespace,
print("[INFO]: Done filling in command line arguments...\n\n")
return args


def populate_isaac_ray_cfg_args(cfg: dict = {}) -> dict:
"""Small utility method to create empty fields if needed for a configuration."""
if "runner_args" not in cfg:
Expand Down
19 changes: 9 additions & 10 deletions source/standalone/workflows/ray/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import subprocess
import yaml

import isaac_ray_util
from jinja2 import Environment, FileSystemLoader
from kubernetes import config
import isaac_ray_util

"""This script helps create one or more KubeRay clusters.
Expand All @@ -27,7 +27,7 @@
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
--num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1
# The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
# The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
# and 2 GPUx4 nvidia-tesla-t4 GPU workers
./isaaclab.sh -p source/standalone/workflows/ray/launch.py --cluster_host google_cloud \
--namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
Expand Down Expand Up @@ -57,7 +57,7 @@ def apply_manifest(args: argparse.Namespace) -> None:

# Convert args namespace to a dictionary
template_params = vars(args)

# Load and render the template
template = jinja_env.get_template(template_file)
file_contents = template.render(template_params)
Expand Down Expand Up @@ -128,19 +128,16 @@ def parse_args() -> argparse.Namespace:
nargs="+",
type=str,
default=["nvidia-l4"],
help="GPU accelerator name. Supply more than one for heterogenous resources.",
help="GPU accelerator name. Supply more than one for heterogeneous resources.",
)

arg_parser = isaac_ray_util.add_resource_arguments(arg_parser,
cluster_create_defaults=True)

arg_parser = isaac_ray_util.add_resource_arguments(arg_parser, cluster_create_defaults=True)

arg_parser.add_argument(
"--num_clusters",
type=int,
default=1,
help=(
"How many Ray Clusters to create."
),
help="How many Ray Clusters to create.",
)
arg_parser.add_argument(
"--num_head_cpu",
Expand All @@ -156,6 +153,7 @@ def parse_args() -> argparse.Namespace:
args = arg_parser.parse_args()
return isaac_ray_util.fill_in_missing_resources(args, cluster_creation_flag=True)


def main():
args = parse_args()

Expand All @@ -169,5 +167,6 @@ def main():
args.name = default_name + "-" + str(i)
apply_manifest(args)


if __name__ == "__main__":
main()
Loading

0 comments on commit 62f8d15

Please sign in to comment.