Skip to content

Commit

Permalink
[core][dashboard] Update nodes on delta. (ray-project#47367)
Browse files Browse the repository at this point in the history
Like actor_head.py, we now update DataSource.nodes on delta. It first
queries all node infos, then subscribes node deltas. Each delta updates:

1. DataSource.nodes[node_id]
2. DataSource.agents[node_id]
3. a warning generated after
RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = 10s

Note on (2) agents: it's read from internal kv, and is not readily
available until the agent.py is spawned and writes its own port to
internal kv. So we make an async task for each node to poll this port
every 1s.

It occurs that the get-all-then-subscribe code has a TOCTOU problem, so
also updated actor_head.py to first subscribe then get all actors.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
Signed-off-by: ujjawal-khare <ujjawal.khare@dream11.com>
  • Loading branch information
rynewang authored and ujjawal-khare committed Oct 15, 2024
1 parent 6d93b2c commit 02015ce
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 50 deletions.
49 changes: 41 additions & 8 deletions python/ray/dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,16 @@ async def _update_actors(self):
Processes actor info. First gets all actors from GCS, then subscribes to
actor updates. For each actor update, updates DataSource.node_actors and
DataSource.actors.
"""
# To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info
# happens after the subscription. That is, an update between get-all-actor-info
# and the subscription is not missed.
#
To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info
happens after the subscription. That is, an update between get-all-actor-info
and the subscription is not missed.
# [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
"""
# Receive actors from channel.
gcs_addr = self._dashboard_head.gcs_address
actor_channel_subscriber = GcsAioActorSubscriber(address=gcs_addr)
await actor_channel_subscriber.subscribe()
subscriber = GcsAioActorSubscriber(address=gcs_addr)
await subscriber.subscribe()

# Get all actor info.
while True:
Expand Down Expand Up @@ -222,7 +222,40 @@ async def _update_actors(self):
actor_consts.RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS
)

# Pull incremental updates from the GCS channel
state_keys = (
"state",
"address",
"numRestarts",
"timestamp",
"pid",
"exitDetail",
"startTime",
"endTime",
"reprName",
)

def process_actor_data_from_pubsub(actor_id, actor_table_data):
actor_table_data = actor_table_data_to_dict(actor_table_data)
# If actor is not new registered but updated, we only update
# states related fields.
if actor_table_data["state"] != "DEPENDENCIES_UNREADY":
actors = DataSource.actors[actor_id]
for k in state_keys:
if k in actor_table_data:
actors[k] = actor_table_data[k]
actor_table_data = actors
actor_id = actor_table_data["actorId"]
node_id = actor_table_data["address"]["rayletId"]
if actor_table_data["state"] == "DEAD":
self.dead_actors_queue.append(actor_id)
# Update actors.
DataSource.actors[actor_id] = actor_table_data
# Update node actors (only when node_id is not Nil).
if node_id != actor_consts.NIL_NODE_ID:
node_actors = DataSource.node_actors.get(node_id, {})
node_actors[actor_id] = actor_table_data
DataSource.node_actors[node_id] = node_actors

while True:
try:
updated_actor_table_entries = await self._poll_updated_actor_table_data(
Expand Down
65 changes: 26 additions & 39 deletions python/ray/dashboard/modules/node/node_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
import os
import time
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
from typing import AsyncGenerator, Dict, Iterable, List, Optional
from typing import AsyncGenerator, Dict, List, Tuple

import aiohttp.web
import grpc
Expand All @@ -18,13 +17,8 @@
import ray.dashboard.utils as dashboard_utils
from ray import NodeID
from ray._private import ray_constants
from ray._private.collections_utils import split
from ray._private.gcs_pubsub import GcsAioNodeInfoSubscriber
from ray._private.ray_constants import (
DEBUG_AUTOSCALING_ERROR,
DEBUG_AUTOSCALING_STATUS,
env_integer,
)
from ray._private.ray_constants import DEBUG_AUTOSCALING_ERROR, DEBUG_AUTOSCALING_STATUS
from ray._private.utils import get_or_create_event_loop
from ray.autoscaler._private.util import (
LoadMetricsSummary,
Expand All @@ -50,20 +44,24 @@
routes = dashboard_optional_utils.DashboardHeadRouteTable


# NOTE: Executor in this head is intentionally constrained to just 1 thread by
# default to limit its concurrency, therefore reducing potential for
# GIL contention
RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS = env_integer(
"RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS", 1
)


def _gcs_node_info_to_dict(message: gcs_pb2.GcsNodeInfo) -> dict:
return dashboard_utils.message_to_dict(
message, {"nodeId"}, always_print_fields_with_no_presence=True
)


def _map_batch_node_info_to_dict(
messages: Dict[NodeID, gcs_pb2.GcsNodeInfo]
) -> List[dict]:
return [_gcs_node_info_to_dict(message) for message in messages.values()]


def _list_gcs_node_info_to_dict(
messages: List[Tuple[bytes, gcs_pb2.GcsNodeInfo]]
) -> List[dict]:
return [_gcs_node_info_to_dict(node_info) for _, node_info in messages]


def node_stats_to_dict(message):
decode_keys = {
"actorId",
Expand Down Expand Up @@ -213,38 +211,27 @@ async def _subscribe_for_node_updates(self) -> AsyncGenerator[dict, None]:
# it happens after the subscription. That is, an update between
# get-all-node-info and the subscription is not missed.
# [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
all_node_info = await self._get_all_node_info_client(timeout=None)

def _convert_to_dict(messages: Iterable[gcs_pb2.GcsNodeInfo]) -> List[dict]:
return [_gcs_node_info_to_dict(m) for m in messages]
all_node_info = await self.get_all_node_info(timeout=None)

all_node_infos = await get_or_create_event_loop().run_in_executor(
self._executor,
_convert_to_dict,
all_node_info.values(),
all_node_dicts = await get_or_create_event_loop().run_in_executor(
self._dashboard_head._thread_pool_executor,
_map_batch_node_info_to_dict,
all_node_info,
)

for node in all_node_infos:
for node in all_node_dicts:
yield node

while True:
try:
node_id_updated_info_tuples = await subscriber.poll(
published = await subscriber.poll(
batch_size=node_consts.RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZE
)

if node_id_updated_info_tuples:
_, updated_infos_proto = zip(*node_id_updated_info_tuples)
else:
updated_infos_proto = []

updated_infos = await get_or_create_event_loop().run_in_executor(
self._executor,
_convert_to_dict,
updated_infos_proto,
updated_dicts = await get_or_create_event_loop().run_in_executor(
self._dashboard_head._thread_pool_executor,
_list_gcs_node_info_to_dict,
published,
)

for node in updated_infos:
for node in updated_dicts:
yield node
except Exception:
logger.exception("Failed handling updated nodes.")
Expand Down
3 changes: 0 additions & 3 deletions python/ray/dashboard/modules/node/tests/test_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
wait_until_server_available,
)
from ray.cluster_utils import Cluster
from ray.dashboard.consts import RAY_DASHBOARD_STATS_UPDATING_INTERVAL
from ray.dashboard.tests.conftest import * # noqa

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -248,8 +247,6 @@ def verify():
node_to_remove = worker_nodes.pop(node_index)
cluster.remove_node(node_to_remove)

assert success


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))

0 comments on commit 02015ce

Please sign in to comment.