Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][dashboard] Update nodes on delta. #47367

Merged
merged 17 commits into from
Aug 29, 2024
95 changes: 58 additions & 37 deletions python/ray/_private/gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
from grpc.experimental import aio as aiogrpc

import ray._private.gcs_utils as gcs_utils
import ray._private.logging_utils as logging_utils
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated import gcs_service_pb2
from ray.core.generated import gcs_pb2
from ray.core.generated import common_pb2
from ray.core.generated import pubsub_pb2

Expand Down Expand Up @@ -90,39 +90,6 @@ def _should_terminate_polling(e: grpc.RpcError) -> None:
return True
return False

@staticmethod
def _pop_error_info(queue):
if len(queue) == 0:
return None, None
msg = queue.popleft()
return msg.key_id, msg.error_info_message

@staticmethod
def _pop_log_batch(queue):
if len(queue) == 0:
return None
msg = queue.popleft()
return logging_utils.log_batch_proto_to_dict(msg.log_batch_message)

@staticmethod
def _pop_resource_usage(queue):
if len(queue) == 0:
return None, None
msg = queue.popleft()
return msg.key_id.decode(), msg.node_resource_usage_message.json

@staticmethod
def _pop_actors(queue, batch_size=100):
if len(queue) == 0:
return []
popped = 0
msgs = []
while len(queue) > 0 and popped < batch_size:
msg = queue.popleft()
msgs.append((msg.key_id, msg.actor_message))
popped += 1
return msgs


class GcsAioPublisher(_PublisherBase):
"""Publisher to GCS. Uses async io."""
Expand Down Expand Up @@ -222,7 +189,7 @@ async def _poll(self, timeout=None) -> None:
self._max_processed_sequence_id = 0
for msg in poll.result().pub_messages:
if msg.sequence_id <= self._max_processed_sequence_id:
logger.warn(f"Ignoring out of order message {msg}")
logger.warning(f"Ignoring out of order message {msg}")
continue
self._max_processed_sequence_id = msg.sequence_id
self._queue.append(msg)
Expand Down Expand Up @@ -266,6 +233,13 @@ async def poll(self, timeout=None) -> Tuple[bytes, str]:
await self._poll(timeout=timeout)
return self._pop_resource_usage(self._queue)

@staticmethod
def _pop_resource_usage(queue):
if len(queue) == 0:
return None, None
msg = queue.popleft()
return msg.key_id.decode(), msg.node_resource_usage_message.json


class GcsAioActorSubscriber(_AioSubscriber):
def __init__(
Expand All @@ -280,11 +254,58 @@ def __init__(
def queue_size(self):
return len(self._queue)

async def poll(self, timeout=None, batch_size=500) -> List[Tuple[bytes, str]]:
async def poll(
self, batch_size, timeout=None
) -> List[Tuple[bytes, gcs_pb2.ActorTableData]]:
"""Polls for new actor message.

Returns:
A tuple of binary actor ID and actor table data.
A list of tuples of binary actor ID and actor table data.
"""
await self._poll(timeout=timeout)
return self._pop_actors(self._queue, batch_size=batch_size)

@staticmethod
def _pop_actors(queue, batch_size):
if len(queue) == 0:
return []
popped = 0
msgs = []
while len(queue) > 0 and popped < batch_size:
msg = queue.popleft()
msgs.append((msg.key_id, msg.actor_message))
popped += 1
return msgs


class GcsAioNodeInfoSubscriber(_AioSubscriber):
def __init__(
self,
worker_id: bytes = None,
address: str = None,
channel: grpc.Channel = None,
):
super().__init__(pubsub_pb2.GCS_NODE_INFO_CHANNEL, worker_id, address, channel)

async def poll(
self, batch_size, timeout=None
) -> List[Tuple[bytes, gcs_pb2.GcsNodeInfo]]:
"""Polls for new node info message.

Returns:
A list of tuples of (node_id, GcsNodeInfo).
"""
await self._poll(timeout=timeout)
return self._pop_node_infos(self._queue, batch_size=batch_size)

@staticmethod
def _pop_node_infos(queue, batch_size):
if len(queue) == 0:
return []
popped = 0
msgs = []
while len(queue) > 0 and popped < batch_size:
msg = queue.popleft()
msgs.append((msg.key_id, msg.node_info_message))
popped += 1
return msgs
20 changes: 15 additions & 5 deletions python/ray/dashboard/modules/actor/actor_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ def __init__(self, dashboard_head):
self.accumulative_event_processing_s = 0

async def _update_actors(self):
"""
Processes actor info. First gets all actors from GCS, then subscribes to
actor updates. For each actor update, updates DataSource.node_actors and
DataSource.actors.

To prevent Time-of-check to time-of-use issue [1], the get-all-actor-info
happens after the subscription. That is, an update between get-all-actor-info
and the subscription is not missed.
# [1] https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
"""
# Receive actors from channel.
gcs_addr = self._dashboard_head.gcs_address
subscriber = GcsAioActorSubscriber(address=gcs_addr)
await subscriber.subscribe()

# Get all actor info.
while True:
try:
Expand Down Expand Up @@ -198,11 +213,6 @@ def process_actor_data_from_pubsub(actor_id, actor_table_data):
node_actors[actor_id] = actor_table_data
DataSource.node_actors[node_id] = node_actors

# Receive actors from channel.
gcs_addr = self._dashboard_head.gcs_address
subscriber = GcsAioActorSubscriber(address=gcs_addr)
await subscriber.subscribe()

while True:
try:
published = await subscriber.poll(batch_size=200)
Expand Down
21 changes: 11 additions & 10 deletions python/ray/dashboard/modules/node/node_consts.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from ray._private.ray_constants import env_float, env_integer
from ray._private.ray_constants import env_integer

NODE_STATS_UPDATE_INTERVAL_SECONDS = env_integer(
"NODE_STATS_UPDATE_INTERVAL_SECONDS", 5
)
UPDATE_NODES_INTERVAL_SECONDS = env_integer("UPDATE_NODES_INTERVAL_SECONDS", 5)
# Until the head node is registered,
# the API server is doing more frequent update
# with this interval.
FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS = env_float(
"FREQUENTY_UPDATE_NODES_INTERVAL_SECONDS", 0.1
RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT = env_integer(
"RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT", 10
)
# If the head node is not updated within
# this timeout, it will stop frequent update.
FREQUENT_UPDATE_TIMEOUT_SECONDS = env_integer("FREQUENT_UPDATE_TIMEOUT_SECONDS", 10)
MAX_COUNT_OF_GCS_RPC_ERROR = 10
# This is consistent with gcs_node_manager.cc
MAX_DEAD_NODES_TO_CACHE = env_integer("RAY_maximum_gcs_dead_node_cached_count", 1000)
RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZE = env_integer(
"RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZE", 200
)
RAY_DASHBOARD_AGENT_POLL_INTERVAL_S = env_integer(
"RAY_DASHBOARD_AGENT_POLL_INTERVAL_S", 1
)
Loading
Loading