Skip to content

Commit

Permalink
Merge branch 'master' into dedicated-kv-ioctx
Browse files Browse the repository at this point in the history
  • Loading branch information
rynewang committed Sep 19, 2024
2 parents 91d6f4f + 5f69744 commit aadcd14
Show file tree
Hide file tree
Showing 55 changed files with 1,204 additions and 796 deletions.
3 changes: 3 additions & 0 deletions .buildkite/linux_aarch64.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ steps:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
cuda:
- "11.7.1-cudnn8"
- "11.8.0-cudnn8"
Expand All @@ -47,6 +48,7 @@ steps:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
instance_type: builder-arm64
env:
PYTHON_VERSION: "{{matrix}}"
Expand Down Expand Up @@ -91,6 +93,7 @@ steps:
- "3.9"
- "3.10"
- "3.11"
- "3.12"

- label: ":ray: core: wheel-aarch64 tests"
tags: linux_wheels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,21 @@ To understand the following content better, you should understand the difference
* `jobId` (Optional): Defines the submission ID for the Ray job. If not provided, KubeRay generates one automatically. See {ref}`Ray Jobs CLI API Reference <ray-job-submission-cli-ref>` for more details about the submission ID.
* `metadata` (Optional): See {ref}`Ray Jobs CLI API Reference <ray-job-submission-cli-ref>` for more details about the `--metadata-json` option.
* `entrypointNumCpus` / `entrypointNumGpus` / `entrypointResources` (Optional): See {ref}`Ray Jobs CLI API Reference <ray-job-submission-cli-ref>` for more details.
* `backoffLimit` (Optional, added in version 1.2.0): Specifies the number of retries before marking this RayJob failed. Each retry creates a new RayCluster. The default value is 0.
* Submission configuration
* `submissionMode` (Optional): `submissionMode` specifies how RayJob submits the Ray job to the RayCluster. In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job. In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job. The default value is "K8sJobMode".
* `submitterPodTemplate` (Optional): Defines the Pod template for the submitter Kubernetes Job. This field is only effective when `submissionMode` is "K8sJobMode".
* `RAY_DASHBOARD_ADDRESS` - The KubeRay operator injects this environment variable to the submitter Pod. The value is `$HEAD_SERVICE:$DASHBOARD_PORT`.
* `RAY_JOB_SUBMISSION_ID` - The KubeRay operator injects this environment variable to the submitter Pod. The value is the `RayJob.Status.JobId` of the RayJob.
* Example: `ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID ...`
* See [ray-job.sample.yaml](https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray-job.sample.yaml) for more details.
* `submitterConfig` (Optional): Additional configurations for the submitter Kubernetes Job.
* `backoffLimit` (Optional, added in version 1.2.0): The number of retries before marking the submitter Job as failed. The default value is 2.
* Automatic resource cleanup
* `shutdownAfterJobFinishes` (Optional): Determines whether to recycle the RayCluster after the Ray job finishes. The default value is false.
* `ttlSecondsAfterFinished` (Optional): Only works if `shutdownAfterJobFinishes` is true. The KubeRay operator deletes the RayCluster and the submitter `ttlSecondsAfterFinished` seconds after the Ray job finishes. The default value is 0.
* `activeDeadlineSeconds` (Optional): If the RayJob doesn't transition the `JobDeploymentStatus` to `Complete` or `Failed` within `activeDeadlineSeconds`, the KubeRay operator transitions the `JobDeploymentStatus` to `Failed`, citing `DeadlineExceeded` as the reason.
* `DELETE_RAYJOB_CR_AFTER_JOB_FINISHES` (Optional, added in version 1.2.0): Set this environment variable for the KubeRay operator, not the RayJob resource. If you set this environment variable to true, the RayJob custom resource itself is deleted if you also set `shutdownAfterJobFinishes` to true. Note that KubeRay deletes all resources created by the RayJob, including the Kubernetes Job.

## Example: Run a simple Ray job with RayJob

Expand Down
14 changes: 12 additions & 2 deletions doc/source/cluster/kubernetes/user-guides/kuberay-gcs-ft.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,22 @@ export REDIS_POD=$(kubectl get pods --selector=app=redis -o custom-columns=POD:m
kubectl exec -it $REDIS_POD -- redis-cli -a "5241590000000000"

# Step 6.4: Check the keys in Redis.
# Note: the schema changed in Ray 2.38.0. Previously we use a single HASH table,
# now we use multiple HASH tables with a common prefix.

KEYS *
# [Example output]:
# 1) "864b004c-6305-42e3-ac46-adfa8eb6f752"
# 1) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@INTERNAL_CONFIG"
# 2) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@KV"
# 3) "RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE"
# [Example output Before Ray 2.38.0]:
# 2) "864b004c-6305-42e3-ac46-adfa8eb6f752"
#

# Step 6.5: Check the value of the key.
HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
HGETALL RAY864b004c-6305-42e3-ac46-adfa8eb6f752@NODE
# Before Ray 2.38.0:
# HGETALL 864b004c-6305-42e3-ac46-adfa8eb6f752
```

In [ray-cluster.external-redis.yaml](https://github.com/ray-project/kuberay/blob/v1.0.0/ray-operator/config/samples/ray-cluster.external-redis.yaml), the `ray.io/external-storage-namespace` annotation isn't set for the RayCluster.
Expand Down
10 changes: 10 additions & 0 deletions python/ray/_private/collections_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import List, Any


def split(items: List[Any], chunk_size: int):
"""Splits provided list into chunks of given size"""

assert chunk_size > 0, "Chunk size has to be > 0"

for i in range(0, len(items), chunk_size):
yield items[i : i + chunk_size]
10 changes: 6 additions & 4 deletions python/ray/_private/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def cleanup_redis_storage(
storage_namespace: The namespace of the storage to be deleted.
"""

from ray._raylet import del_key_from_storage # type: ignore
from ray._raylet import del_key_prefix_from_storage # type: ignore

if not isinstance(host, str):
raise ValueError("Host must be a string")
Expand All @@ -142,6 +142,8 @@ def cleanup_redis_storage(
if not isinstance(storage_namespace, str):
raise ValueError("storage namespace must be a string")

# Right now, GCS store all data into a hash set key by storage_namespace.
# So we only need to delete the specific key to cleanup the cluster.
return del_key_from_storage(host, port, password, use_ssl, storage_namespace)
# Right now, GCS stores all data into multiple hashes with keys prefixed by
# storage_namespace. So we only need to delete the specific key prefix to cleanup
# the cluster.
# Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`.
return del_key_prefix_from_storage(host, port, password, use_ssl, storage_namespace)
5 changes: 3 additions & 2 deletions python/ray/_private/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2636,8 +2636,9 @@ def get(
Raises:
GetTimeoutError: A GetTimeoutError is raised if a timeout is set and
the get takes longer than timeout to return.
Exception: An exception is raised if the task that created the object
or that created one of the objects raised an exception.
Exception: An exception is raised immediately if any task that created
the object or that created one of the objects raised an exception,
without waiting for the remaining ones to finish.
"""
worker = global_worker
worker.check_connected()
Expand Down
11 changes: 8 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ from ray.includes.libcoreworker cimport (

from ray.includes.ray_config cimport RayConfig
from ray.includes.global_state_accessor cimport CGlobalStateAccessor
from ray.includes.global_state_accessor cimport RedisDelKeySync, RedisGetKeySync
from ray.includes.global_state_accessor cimport (
RedisDelKeyPrefixSync,
RedisGetKeySync
)
from ray.includes.optional cimport (
optional, nullopt
)
Expand Down Expand Up @@ -5176,8 +5179,10 @@ cdef void async_callback(shared_ptr[CRayObject] obj,
cpython.Py_DECREF(user_callback)


def del_key_from_storage(host, port, password, use_ssl, key):
return RedisDelKeySync(host, port, password, use_ssl, key)
# Note this deletes keys with prefix `RAY{key_prefix}@`
# Example: with key_prefix = `default`, we remove all `RAYdefault@...` keys.
def del_key_prefix_from_storage(host, port, password, use_ssl, key_prefix):
return RedisDelKeyPrefixSync(host, port, password, use_ssl, key_prefix)


def get_session_key_from_storage(host, port, password, use_ssl, config, key):
Expand Down
7 changes: 0 additions & 7 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,6 @@ def __init__(
# Mapping from the actor handle to the node ID that the actor is on.
self.actor_to_node_id: Dict["ray.actor.ActorHandle", str] = {}

# Type hints specified by the user for DAG (intermediate) outputs.
self._type_hints = []

# This is set to true when type hint of `transport="nccl"`` is used
self._use_default_nccl_group = False
# This is set to the specified custom nccl group
Expand Down Expand Up @@ -744,7 +741,6 @@ def _preprocess(self) -> None:

self.input_task_idx, self.output_task_idx = None, None
self.actor_task_count.clear()
self._type_hints.clear()

nccl_actors: Set["ray.actor.ActorHandle"] = set()

Expand Down Expand Up @@ -950,9 +946,6 @@ def _preprocess(self) -> None:
# Add all readers to the NCCL group.
nccl_actors.add(downstream_actor_handle)

if dag_node.type_hint is not None:
self._type_hints.append(dag_node.type_hint)

# If there were type hints indicating transport via NCCL, initialize
# the NCCL group on the participating actors.
nccl_actors = list(nccl_actors)
Expand Down
5 changes: 0 additions & 5 deletions python/ray/dashboard/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pathlib
import signal
import sys
from concurrent.futures import ThreadPoolExecutor

import ray
import ray._private.ray_constants as ray_constants
Expand Down Expand Up @@ -49,10 +48,6 @@ def __init__(
# Public attributes are accessible for all agent modules.
self.ip = node_ip_address
self.minimal = minimal
self.thread_pool_executor = ThreadPoolExecutor(
max_workers=dashboard_consts.RAY_AGENT_THREAD_POOL_MAX_WORKERS,
thread_name_prefix="dashboard_agent_tpe",
)

assert gcs_address is not None
self.gcs_address = gcs_address
Expand Down
8 changes: 1 addition & 7 deletions python/ray/dashboard/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
"RAY_DASHBOARD_STATS_PURGING_INTERVAL", 60 * 10
)
RAY_DASHBOARD_STATS_UPDATING_INTERVAL = env_integer(
"RAY_DASHBOARD_STATS_UPDATING_INTERVAL", 2
"RAY_DASHBOARD_STATS_UPDATING_INTERVAL", 15
)
DASHBOARD_RPC_ADDRESS = "dashboard_rpc"
DASHBOARD_RPC_PORT = env_integer("RAY_DASHBOARD_RPC_PORT", 0)
Expand All @@ -49,12 +49,6 @@
# Example: "your.module.ray_cluster_activity_hook".
RAY_CLUSTER_ACTIVITY_HOOK = "RAY_CLUSTER_ACTIVITY_HOOK"

# Works in the thread pool should not starve the main thread loop, so we default to 1.
RAY_DASHBOARD_THREAD_POOL_MAX_WORKERS = env_integer(
"RAY_DASHBOARD_THREAD_POOL_MAX_WORKERS", 1
)
RAY_AGENT_THREAD_POOL_MAX_WORKERS = env_integer("RAY_AGENT_THREAD_POOL_MAX_WORKERS", 1)

# The number of candidate agents
CANDIDATE_AGENT_NUMBER = max(env_integer("CANDIDATE_AGENT_NUMBER", 1), 1)
# when head receive JobSubmitRequest, maybe not any agent is available,
Expand Down
8 changes: 8 additions & 0 deletions python/ray/dashboard/dashboard_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ def __init__(self, registry: Optional[CollectorRegistry] = None):
namespace="ray",
registry=self.registry,
)
self.metrics_event_loop_tasks = Gauge(
"dashboard_event_loop_tasks",
"Number of tasks currently pending in the event loop's queue.",
tuple(COMPONENT_METRICS_TAG_KEYS),
unit="tasks",
namespace="ray",
registry=self.registry,
)
self.metrics_event_loop_lag = Gauge(
"dashboard_event_loop_lag",
"Event loop lag in seconds.",
Expand Down
70 changes: 41 additions & 29 deletions python/ray/dashboard/datacenter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import logging
from typing import Any, List, Optional

Expand Down Expand Up @@ -72,46 +71,55 @@ async def organize(cls, thread_pool_executor):
to make sure it's on the main event loop thread. To avoid blocking the main
event loop, we yield after each node processed.
"""
loop = get_or_create_event_loop()

node_workers = {}
core_worker_stats = {}
# nodes may change during process, so we create a copy of keys().

# NOTE: We copy keys of the `DataSource.nodes` to make sure
# it doesn't change during the iteration (since its being updated
# from another async task)
for node_id in list(DataSource.nodes.keys()):
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
node_stats = DataSource.node_stats.get(node_id, {})
# Offloads the blocking operation to a thread pool executor. This also
# yields to the event loop.
workers = await get_or_create_event_loop().run_in_executor(
workers = await loop.run_in_executor(
thread_pool_executor,
cls.merge_workers_for_node,
cls._extract_workers_for_node,
node_physical_stats,
node_stats,
)

for worker in workers:
for stats in worker.get("coreWorkerStats", []):
worker_id = stats["workerId"]
core_worker_stats[worker_id] = stats

node_workers[node_id] = workers

DataSource.node_workers.reset(node_workers)
DataSource.core_worker_stats.reset(core_worker_stats)

@classmethod
def merge_workers_for_node(cls, node_physical_stats, node_stats):
def _extract_workers_for_node(cls, node_physical_stats, node_stats):
workers = []
# Merge coreWorkerStats (node stats) to workers (node physical stats)
pid_to_worker_stats = {}
pid_to_language = {}
pid_to_job_id = {}
pids_on_node = set()

for core_worker_stats in node_stats.get("coreWorkersStats", []):
pid = core_worker_stats["pid"]
pids_on_node.add(pid)

pid_to_worker_stats[pid] = core_worker_stats
pid_to_language[pid] = core_worker_stats["language"]
pid_to_job_id[pid] = core_worker_stats["jobId"]

for worker in node_physical_stats.get("workers", []):
worker = dict(worker)
pid = worker["pid"]

core_worker_stats = pid_to_worker_stats.get(pid)
# Empty list means core worker stats is not available.
worker["coreWorkerStats"] = [core_worker_stats] if core_worker_stats else []
Expand All @@ -121,6 +129,7 @@ def merge_workers_for_node(cls, node_physical_stats, node_stats):
worker["jobId"] = pid_to_job_id.get(pid, dashboard_consts.DEFAULT_JOB_ID)

workers.append(worker)

return workers

@classmethod
Expand Down Expand Up @@ -156,10 +165,14 @@ async def get_node_info(cls, node_id, get_summary=False):
)

if not get_summary:
actor_table_entries = DataSource.node_actors.get(node_id, {})

# Merge actors to node physical stats
node_info["actors"] = await DataOrganizer._get_all_actors(
DataSource.node_actors.get(node_id, {})
)
node_info["actors"] = {
actor_id: await DataOrganizer._get_actor_info(actor_table_entry)
for actor_id, actor_table_entry in actor_table_entries.items()
}

# Update workers to node physical stats
node_info["workers"] = DataSource.node_workers.get(node_id, [])

Expand All @@ -168,6 +181,8 @@ async def get_node_info(cls, node_id, get_summary=False):
@classmethod
async def get_all_node_summary(cls):
return [
# NOTE: We're intentionally awaiting in a loop to avoid excessive
# concurrency spinning up excessive # of tasks for large clusters
await DataOrganizer.get_node_info(node_id, get_summary=True)
for node_id in DataSource.nodes.keys()
]
Expand Down Expand Up @@ -209,28 +224,25 @@ def _create_agent_info(node_id: str):
return {node_id: _create_agent_info(node_id) for node_id in target_node_ids}

@classmethod
async def get_all_actors(cls):
return await cls._get_all_actors(DataSource.actors)
async def get_actor_infos(cls, actor_ids: Optional[List[str]] = None):
target_actor_table_entries: dict[str, Optional[dict]]
if actor_ids is not None:
target_actor_table_entries = {
actor_id: DataSource.actors.get(actor_id) for actor_id in actor_ids
}
else:
target_actor_table_entries = DataSource.actors

@staticmethod
async def _get_all_actors(actors):
result = {}
for index, (actor_id, actor) in enumerate(actors.items()):
result[actor_id] = await DataOrganizer._get_actor(actor)
# There can be thousands of actors including dead ones. Processing
# them all can take many seconds, which blocks all other requests
# to the dashboard. The ideal solution might be to implement
# pagination. For now, use a workaround to yield to the event loop
# periodically, so other request handlers have a chance to run and
# avoid long latencies.
if index % 1000 == 0 and index > 0:
# Canonical way to yield to the event loop:
# https://github.com/python/asyncio/issues/284
await asyncio.sleep(0)
return result
return {
actor_id: await DataOrganizer._get_actor_info(actor_table_entry)
for actor_id, actor_table_entry in target_actor_table_entries.items()
}

@staticmethod
async def _get_actor(actor):
async def _get_actor_info(actor):
if actor is None:
return None

actor = dict(actor)
worker_id = actor["address"]["workerId"]
core_worker_stats = DataSource.core_worker_stats.get(worker_id, {})
Expand Down
Loading

0 comments on commit aadcd14

Please sign in to comment.