diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 9e0953a33b0a6..f3dee51e8ace5 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -156,6 +156,7 @@ from ray.includes.libcoreworker cimport ( CFiberEvent, CActorHandle, CGeneratorBackpressureWaiter, + CReaderRefInfo, ) from ray.includes.ray_config cimport RayConfig @@ -3645,37 +3646,37 @@ cdef class CoreWorker: def experimental_channel_register_writer(self, ObjectRef writer_ref, - ObjectRef reader_ref, - writer_node, - reader_node, - ActorID reader, - int64_t num_readers): + remote_reader_ref_info): cdef: CObjectID c_writer_ref = writer_ref.native() - CObjectID c_reader_ref = reader_ref.native() - CNodeID c_reader_node = CNodeID.FromHex(reader_node) - CNodeID *c_reader_node_id = NULL - CActorID c_reader_actor = reader.native() - - if num_readers == 0: - return - if writer_node != reader_node: - c_reader_node_id = &c_reader_node + c_vector[CNodeID] c_remote_reader_nodes + c_vector[CReaderRefInfo] c_remote_reader_ref_info + CReaderRefInfo c_reader_ref_info + + for node_id, reader_ref_info in remote_reader_ref_info.items(): + c_reader_ref_info = CReaderRefInfo() + c_reader_ref_info.reader_ref_id = ( + reader_ref_info.reader_ref).native() + c_reader_ref_info.owner_reader_actor_id = ( + reader_ref_info.ref_owner_actor_id).native() + num_reader_actors = reader_ref_info.num_reader_actors + assert num_reader_actors != 0 + c_reader_ref_info.num_reader_actors = num_reader_actors + c_remote_reader_ref_info.push_back(c_reader_ref_info) + c_remote_reader_nodes.push_back(CNodeID.FromHex(node_id)) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker() - .ExperimentalRegisterMutableObjectWriter(c_writer_ref, - c_reader_node_id, - )) - if writer_node != reader_node: - with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker() - .ExperimentalRegisterMutableObjectReaderRemote(c_writer_ref, - c_reader_actor, - num_readers, - c_reader_ref - )) + .ExperimentalRegisterMutableObjectWriter( + c_writer_ref, + c_remote_reader_nodes, + )) + check_status( + CCoreWorkerProcess.GetCoreWorker() + .ExperimentalRegisterMutableObjectReaderRemote( + c_writer_ref, + c_remote_reader_ref_info, + )) def experimental_channel_register_reader(self, ObjectRef object_ref): cdef: diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 9920318c9545c..9526a242f331c 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -115,7 +115,6 @@ def do_exec_tasks( if done: break for operation in schedule: - print("SANG-TODO operation: ", operation) done = tasks[operation.exec_task_idx].exec_operation( self, operation.type ) @@ -1838,7 +1837,6 @@ def teardown(self, wait: bool): return logger.info("Tearing down compiled DAG") - outer._dag_submitter.close() outer._dag_output_fetcher.close() diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 52c3853b71c05..a627e0f96fba2 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -5,8 +5,10 @@ import time import pytest from ray.dag import InputNode, MultiOutputNode +import ray.remote_function from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray.tests.conftest import * # noqa +from ray.tests.conftest import wait_for_condition if sys.platform != "linux" and sys.platform != "darwin": pytest.skip("Skipping, requires Linux or Mac.", allow_module_level=True) @@ -69,100 +71,79 @@ def test_readers_on_different_nodes(ray_start_cluster): cluster = ray_start_cluster # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor) and # one of the readers. - first_node_handle = cluster.add_node(num_cpus=2) - # This node is for the other reader. - second_node_handle = cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=1) ray.init(address=cluster.address) + # 2 more nodes for other readers. + cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=1) cluster.wait_for_nodes() + # Wait until nodes actually start, otherwise the code below will fail. + wait_for_condition(lambda: len(ray.nodes()) == 3) - nodes = [first_node_handle.node_id, second_node_handle.node_id] - # We want to check that the readers are on different nodes. Thus, we convert `nodes` - # to a set and then back to a list to remove duplicates. Then we check that the - # length of `nodes` is 2. - nodes = list(set(nodes)) - assert len(nodes) == 2 - - def create_actor(node): - return Actor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) - ).remote(0) - - a = create_actor(nodes[0]) - b = create_actor(nodes[1]) - actors = [a, b] + a = Actor.options(num_cpus=1).remote(0) + b = Actor.options(num_cpus=1).remote(0) + c = Actor.options(num_cpus=1).remote(0) + actors = [a, b, c] def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() - nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - a_node = nodes_check[0] - b_node = nodes_check[1] - assert a_node != b_node + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == 3 with InputNode() as inp: x = a.inc.bind(inp) y = b.inc.bind(inp) - dag = MultiOutputNode([x, y]) + z = c.inc.bind(inp) + dag = MultiOutputNode([x, y, z]) - with pytest.raises( - ValueError, - match="All reader actors must be on the same node.*", - ): - dag.experimental_compile() + adag = dag.experimental_compile() + + for i in range(1, 10): + assert ray.get(adag.execute(1)) == [i, i, i] + + adag.teardown() def test_bunch_readers_on_different_nodes(ray_start_cluster): cluster = ray_start_cluster - # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor) and - # two of the readers. - first_node_handle = cluster.add_node(num_cpus=3) - # This node is for the other two readers. - second_node_handle = cluster.add_node(num_cpus=2) + ACTORS_PER_NODE = 2 + NUM_REMOTE_NODES = 2 + # driver node + cluster.add_node(num_cpus=ACTORS_PER_NODE) ray.init(address=cluster.address) + # additional nodes for multi readers in multi nodes + for _ in range(NUM_REMOTE_NODES): + cluster.add_node(num_cpus=ACTORS_PER_NODE) cluster.wait_for_nodes() - nodes = [first_node_handle.node_id, second_node_handle.node_id] - # We want to check that the readers are on different nodes. Thus, we convert `nodes` - # to a set and then back to a list to remove duplicates. Then we check that the - # length of `nodes` is 2. - nodes = list(set(nodes)) - assert len(nodes) == 2 - - def create_actor(node): - return Actor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) - ).remote(0) + wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1) - a = create_actor(nodes[0]) - b = create_actor(nodes[0]) - c = create_actor(nodes[1]) - d = create_actor(nodes[1]) - actors = [a, b, c, d] + actors = [ + Actor.options(num_cpus=1).remote(0) + for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() - nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - a_node = nodes_check[0] - b_node = nodes_check[1] - c_node = nodes_check[2] - d_node = nodes_check[3] - assert a_node == b_node - assert b_node != c_node - assert c_node == d_node + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == NUM_REMOTE_NODES + 1 with InputNode() as inp: - w = a.inc.bind(inp) - x = b.inc.bind(inp) - y = c.inc.bind(inp) - z = d.inc.bind(inp) - dag = MultiOutputNode([w, x, y, z]) + outputs = [] + for actor in actors: + outputs.append(actor.inc.bind(inp)) + dag = MultiOutputNode(outputs) + + adag = dag.experimental_compile() + + for i in range(1, 10): + assert ray.get(adag.execute(1)) == [ + i for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] - with pytest.raises( - ValueError, - match="All reader actors must be on the same node.*", - ): - dag.experimental_compile() + adag.teardown() def test_pp(ray_start_cluster): @@ -247,13 +228,10 @@ def get_node_id(self): compiled_dag = dag.experimental_compile() - # Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that - # is a bit larger. size = GRPC_MAX_SIZE + (1024 * 1024 * 2) val = b"x" * size for i in range(3): - print(f"{i} iteration") ref = compiled_dag.execute(val) result = ray.get(ref) assert result == val @@ -263,6 +241,58 @@ def get_node_id(self): compiled_dag.teardown() +@pytest.mark.parametrize("num_actors", [1, 4]) +@pytest.mark.parametrize("num_nodes", [1, 4]) +def test_multi_node_multi_reader_large_payload( + ray_start_cluster, num_actors, num_nodes, monkeypatch +): + # Set max grpc size to 5mb. + GRPC_MAX_SIZE = 1024 * 1024 * 5 + monkeypatch.setenv("RAY_max_grpc_message_size", str(GRPC_MAX_SIZE)) + cluster = ray_start_cluster + ACTORS_PER_NODE = num_actors + NUM_REMOTE_NODES = num_nodes + cluster.add_node(num_cpus=ACTORS_PER_NODE) + ray.init(address=cluster.address) + # This node is for the other two readers. + for _ in range(NUM_REMOTE_NODES): + cluster.add_node(num_cpus=ACTORS_PER_NODE) + cluster.wait_for_nodes() + + wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1) + + actors = [ + Actor.options(num_cpus=1).remote(0) + for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] + + def _get_node_id(self) -> "ray.NodeID": + return ray.get_runtime_context().get_node_id() + + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == NUM_REMOTE_NODES + 1 + + with InputNode() as inp: + outputs = [] + for actor in actors: + outputs.append(actor.echo.bind(inp)) + dag = MultiOutputNode(outputs) + + compiled_dag = dag.experimental_compile() + + # Set the object size a little bigger than the gRPC size so that + # it triggers chunking and resizing. + size = GRPC_MAX_SIZE + (1024 * 1024 * 2) + val = b"x" * size + + for _ in range(3): + ref = compiled_dag.execute(val) + result = ray.get(ref) + assert result == [val for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))] + + compiled_dag.teardown() + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index b4435b73af4a1..44da4736a4755 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -1,6 +1,7 @@ import io import logging import time +from collections import defaultdict, namedtuple from typing import Any, Dict, List, Optional, Set, Tuple, Union import ray @@ -16,7 +17,7 @@ # entry/init points. logger = logging.getLogger(__name__) -DEFAULT_MAX_BUFFER_SIZE = int(100 * 1e6) # 100 mB +DEFAULT_MAX_BUFFER_SIZE = int(1e6) # 100 mB # The min buffer size must be large enough to at least fit an instance of the # _ResizeChannel class along with any metadata. MIN_BUFFER_SIZE = int(1000) # 1000 bytes @@ -73,6 +74,15 @@ def _get_self_actor() -> Optional["ray.actor.ActorHandle"]: return None +# aDAG maintains 1 reader object reference (also called buffer) per node. +# reader_ref: The object reference. +# ref_owner_actor_id: The actor who created the object reference. +# num_readers: The number of reader actors who reads this object reference. +ReaderRefInfo = namedtuple( + "ReaderRefInfo", ["reader_ref", "ref_owner_actor_id", "num_reader_actors"] +) + + class _ResizeChannel: """ When a channel must be resized, the channel backing store must be resized on both @@ -81,8 +91,15 @@ class _ResizeChannel: resize its own backing store. The class instance is sent through the channel. """ - def __init__(self, reader_ref: "ray.ObjectRef"): - self._reader_ref = reader_ref + def __init__( + self, + _node_id_to_reader_ref_info: Dict[str, ReaderRefInfo], + ): + """ + Args: + _node_id_to_reader_ref_info: A node id to ReaderRefInfo. + """ + self._node_id_to_reader_ref_info = _node_id_to_reader_ref_info class SharedMemoryType(ChannelOutputType): @@ -159,9 +176,8 @@ def __init__( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], typ: Optional[Union[int, SharedMemoryType]] = None, _writer_node_id: Optional["ray.NodeID"] = None, - _reader_node_id: Optional["ray.NodeID"] = None, _writer_ref: Optional["ray.ObjectRef"] = None, - _reader_ref: Optional["ray.ObjectRef"] = None, + _node_id_to_reader_ref_info: Optional[Dict[str, ReaderRefInfo]] = None, _writer_registered: bool = False, _reader_registered: bool = False, ): @@ -206,6 +222,19 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered + # NodeID -> ReaderRefInfo on that node. Note that there's only 1 + # reader ref per node. + self._node_id_to_reader_ref_info: Dict[str, ReaderRefInfo] = ( + _node_id_to_reader_ref_info or {} + ) + + # Node ID -> a list of reader actors. + self._node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) + for reader, node_id in self._reader_and_node_list: + self._node_id_to_readers[node_id].append(reader) + + # Number of readers in a local node. + self._num_local_readers = 0 if _writer_ref is None: # We are the writer. Check that the passed handle matches the @@ -217,79 +246,84 @@ def __init__( self_actor = _get_self_actor() assert writer == self_actor - # For now, all readers must be on the same node. Note that the writer can - # still be on a different node than the readers though. - # - # Note that we only check this when the writer is creating the channel. - # Ideally, when each reader constructs its own instance of the channel, it - # would check this as well. However, this could result in deadlock as two - # readers attempt to execute a remote function on each other to get each - # other's node ID. We cannot use a separate concurrency group to execute the - # function because reader actors may not have been declared with an - # additional concurrency group beyond default. - # - # TODO(jhumphri): Allow different readers for the same channel to be on - # different nodes. - prev_reader_node = None - prev_reader = None - for reader, node in reader_and_node_list: - if prev_reader_node is None: - prev_reader_node = node - elif prev_reader_node != node: - raise ValueError( - f"All reader actors must be on the same node. Actor " - f"{prev_reader} is on node {prev_reader_node} while actor " - f"{reader} is on node {node}." - ) - prev_reader = reader - self._writer_node_id = ( ray.runtime_context.get_runtime_context().get_node_id() ) self._writer_ref = _create_channel_ref(self, typ.buffer_size_bytes) - self._reader_node_id = prev_reader_node - self._create_reader_ref(reader_and_node_list, typ.buffer_size_bytes) - - assert self._reader_ref is not None + self._create_reader_refs(typ.buffer_size_bytes) else: assert ( _writer_node_id is not None ), "_writer_node_id must also be passed to the constructor when " "_writer_ref is." - assert ( - _reader_ref is not None - ), "_reader_ref must also be passed to the constructor when _writer_ref is." + assert _node_id_to_reader_ref_info is not None, ( + "_node_id_to_reader_ref_info must also be passed to the constructor " + "when _writer_ref is." + ) self._writer_ref = _writer_ref self._writer_node_id = _writer_node_id - self._reader_node_id = _reader_node_id - self._reader_ref = _reader_ref - - self._num_readers = len(self._reader_and_node_list) - if self.is_remote(): - # Even though there may be multiple readers on a remote node, we set - # `self._num_readers` to 1 here. On this local node, only the IO thread in - # the mutable object provider will read the mutable object. The IO thread - # will then send a gRPC with the mutable object contents to the remote node - # where the readers are. - self._num_readers = 1 - - def _create_reader_ref( + self._node_id_to_reader_ref_info = _node_id_to_reader_ref_info + + assert self._num_local_readers == 0 + remote_node_exists = False + for node_id, readers in self._node_id_to_readers.items(): + if self.is_local_node(node_id): + self._num_local_readers += len(readers) + else: + remote_node_exists = True + # If remote node exists, we have 1 additional reader that listens + # to object changes and push them to remote nodes. + if remote_node_exists: + self._num_local_readers += 1 + # There must be at least 1 local reader + assert self._num_local_readers > 0 + + self._local_reader_ref: Optional["ray.ObjectRef"] = self._get_local_reader_ref( + self._node_id_to_reader_ref_info + ) + + def _get_local_reader_ref( + self, _node_id_to_reader_ref_info: Dict[str, ReaderRefInfo] + ) -> Optional["ray.ObjectRef"]: + for node_id, reader_ref_info in _node_id_to_reader_ref_info.items(): + if self.is_local_node(node_id): + return reader_ref_info.reader_ref + return None + + def _create_reader_refs( self, - reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], buffer_size_bytes: int, ): # TODO(jhumphri): Free the current reader ref once the reference to it is # destroyed below. - reader = reader_and_node_list[0][0] - if self.is_remote(): - fn = reader.__ray_call__ - self._reader_ref = ray.get( - fn.remote(_create_channel_ref, buffer_size_bytes) - ) - else: - self._reader_ref = self._writer_ref + + for node_id, readers in self._node_id_to_readers.items(): + if not self.is_local_node(node_id): + # Find 1 reader in a remote node to create a reference that's + # shared by all readers. When a new value is written to a reference, + # it is sent to this reference. + reader = readers[0] + fn = reader.__ray_call__ + self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo( + reader_ref=ray.get( + fn.remote(_create_channel_ref, buffer_size_bytes) + ), + ref_owner_actor_id=reader._actor_id, + num_reader_actors=len(readers), + ) + else: + writer_id = ray.ActorID.nil() + if self._writer is not None: + writer_id = self._writer._actor_id + self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo( + reader_ref=self._writer_ref, + ref_owner_actor_id=writer_id, + num_reader_actors=len(readers), + ) + # There must be only 1 node reader reference per node. + assert len(self._node_id_to_reader_ref_info) == len(self._node_id_to_readers) # We need to register the new writer_ref. self._writer_registered = False @@ -299,9 +333,6 @@ def _create_reader_ref( def is_local_node(node_id): return ray.runtime_context.get_runtime_context().get_node_id() == node_id - def is_remote(self): - return self._writer_node_id != self._reader_node_id - def ensure_registered_as_writer(self) -> None: if self._writer_registered: return @@ -312,17 +343,15 @@ def ensure_registered_as_writer(self) -> None: "the writer is on." ) - assert ( - self._reader_ref - ), "`self._reader_ref` must be not be None when registering a writer, because " - "it should have been initialized in the constructor." + remote_reader_ref_info: Dict[str, ReaderRefInfo] = {} + for node_id, reader_ref_info in self._node_id_to_reader_ref_info.items(): + if self.is_local_node(node_id): + continue + remote_reader_ref_info[node_id] = reader_ref_info + self._worker.core_worker.experimental_channel_register_writer( self._writer_ref, - self._reader_ref, - self._writer_node_id, - self._reader_node_id, - self._reader_and_node_list[0][0]._actor_id, - len(self._reader_and_node_list), + remote_reader_ref_info, ) self._writer_registered = True @@ -330,9 +359,11 @@ def ensure_registered_as_reader(self) -> None: if self._reader_registered: return - self._worker.core_worker.experimental_channel_register_reader( - self._reader_ref, - ) + for node_id, reader_ref_info in self._node_id_to_reader_ref_info.items(): + if self.is_local_node(node_id): + self._worker.core_worker.experimental_channel_register_reader( + reader_ref_info.reader_ref, + ) self._reader_registered = True @staticmethod @@ -341,9 +372,8 @@ def _deserialize_reader_channel( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], typ: int, writer_node_id, - reader_node_id, writer_ref: "ray.ObjectRef", - reader_ref: "ray.ObjectRef", + node_id_to_reader_ref_info: Dict[str, ReaderRefInfo], writer_registered: bool, reader_registered: bool, ) -> "Channel": @@ -352,31 +382,30 @@ def _deserialize_reader_channel( reader_and_node_list, typ, _writer_node_id=writer_node_id, - _reader_node_id=reader_node_id, _writer_ref=writer_ref, - _reader_ref=reader_ref, + _node_id_to_reader_ref_info=node_id_to_reader_ref_info, _writer_registered=writer_registered, _reader_registered=reader_registered, ) return chan def __reduce__(self): - assert self._reader_ref is not None + assert self._node_id_to_reader_ref_info is not None return self._deserialize_reader_channel, ( self._writer, self._reader_and_node_list, self._typ, self._writer_node_id, - self._reader_node_id, self._writer_ref, - self._reader_ref, + self._node_id_to_reader_ref_info, self._writer_registered, self._reader_registered, ) def __str__(self) -> str: return ( - f"Channel(_reader_ref={self._reader_ref}, _writer_ref={self._writer_ref})" + f"Channel(_node_id_to_reader_ref_info={self._node_id_to_reader_ref_info}, " + f"_writer_ref={self._writer_ref})" ) def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): @@ -389,25 +418,32 @@ def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): self._typ.buffer_size_bytes = size # TODO(jhumphri): Free the current writer ref once the reference to it is # destroyed below. + # TODO(sang): Support different policies such as 2X buffer size. prev_writer_ref = self._writer_ref self._writer_ref = _create_channel_ref(self, self._typ.buffer_size_bytes) - - self._create_reader_ref( - self._reader_and_node_list, self._typ.buffer_size_bytes + self._create_reader_refs(self._typ.buffer_size_bytes) + self._local_reader_ref = self._get_local_reader_ref( + self._node_id_to_reader_ref_info ) # Write a special message to the channel so that the readers know to # stop using the current reader_ref. - special_message = _ResizeChannel(self._reader_ref) + special_message = _ResizeChannel(self._node_id_to_reader_ref_info) special_message_serialized = ( self._worker.get_serialization_context().serialize(special_message) ) self._worker.core_worker.experimental_channel_put_serialized( special_message_serialized, prev_writer_ref, - self._num_readers, + self._num_local_readers, timeout_ms, ) + # TODO(sang): Clean the previous ref that won't be used. + # Right now, if we just close it here, it will not work because + # of race conditions. + # self._worker.core_worker.experimental_channel_set_error( + # prev_writer_ref + # ) def write(self, value: Any, timeout: Optional[float] = None) -> None: self.ensure_registered_as_writer() @@ -443,7 +479,7 @@ def write(self, value: Any, timeout: Optional[float] = None) -> None: self._worker.core_worker.experimental_channel_put_serialized( serialized_value, self._writer_ref, - self._num_readers, + self._num_local_readers, timeout_ms, ) @@ -455,11 +491,14 @@ def read(self, timeout: Optional[float] = None) -> Any: start_time = time.monotonic() ret = self._worker.get_objects( - [self._reader_ref], timeout=timeout, return_exceptions=True + [self._local_reader_ref], timeout=timeout, return_exceptions=True )[0][0] if isinstance(ret, _ResizeChannel): - self._reader_ref = ret._reader_ref + self._node_id_to_reader_ref_info = ret._node_id_to_reader_ref_info + self._local_reader_ref = self._get_local_reader_ref( + self._node_id_to_reader_ref_info + ) # We need to register the new reader_ref. self._reader_registered = False self.ensure_registered_as_reader() @@ -467,7 +506,7 @@ def read(self, timeout: Optional[float] = None) -> Any: timeout -= time.monotonic() - start_time timeout = max(timeout, 0) ret = self._worker.get_objects( - [self._reader_ref], timeout=timeout, return_exceptions=True + [self._local_reader_ref], timeout=timeout, return_exceptions=True )[0][0] return ret @@ -478,9 +517,18 @@ def close(self) -> None: reader_ref. """ self._worker.core_worker.experimental_channel_set_error(self._writer_ref) - if self.is_local_node(self._reader_node_id): + is_local_node_reader = False + + for node_id in self._node_id_to_readers.keys(): + if self.is_local_node(node_id): + is_local_node_reader = True + if is_local_node_reader: self.ensure_registered_as_reader() - self._worker.core_worker.experimental_channel_set_error(self._reader_ref) + + for reader_ref_info in self._node_id_to_reader_ref_info.values(): + self._worker.core_worker.experimental_channel_set_error( + reader_ref_info.reader_ref + ) @DeveloperAPI diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 235ae16d34b5b..242c5f10dd492 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -81,6 +81,14 @@ cdef extern from "ray/core_worker/fiber.h" nogil: void Wait() void Notify() +cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil: + cdef cppclass CReaderRefInfo "ray::experimental::ReaderRefInfo": + CReaderRefInfo() + CObjectID reader_ref_id + CActorID owner_reader_actor_id + int64_t num_reader_actors + + cdef extern from "ray/core_worker/context.h" nogil: cdef cppclass CWorkerContext "ray::core::WorkerContext": c_bool CurrentActorIsAsync() @@ -265,11 +273,12 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus ExperimentalChannelSetError( const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectWriter( - const CObjectID &object_id, const CNodeID *node_id) + const CObjectID &writer_object_id, + const c_vector[CNodeID] &remote_reader_node_ids) CRayStatus ExperimentalRegisterMutableObjectReader(const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectReaderRemote( - const CObjectID &object_id, const CActorID &reader_actor, - int64_t num_readers, const CObjectID &reader_ref) + const CObjectID &object_id, + const c_vector[CReaderRefInfo] &remote_reader_ref_info) CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, const unique_ptr[CAddress] &owner_address) CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, diff --git a/python/ray/tests/test_channel.py b/python/ray/tests/test_channel.py index d8812488c53b1..df3379df173d2 100644 --- a/python/ray/tests/test_channel.py +++ b/python/ray/tests/test_channel.py @@ -1222,6 +1222,10 @@ class Actor: def get_node_id(self): return ray.get_runtime_context().get_node_id() + def read(self, channel, val): + assert channel.read() == val + return val + def create_actor(node): return Actor.options( scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) @@ -1239,12 +1243,12 @@ def create_actor(node): driver_actor = create_driver_actor() driver_node = get_actor_node_id(driver_actor) - with pytest.raises( - ValueError, match="All reader actors must be on the same node.*" - ): - ray_channel.Channel( - None, [(driver_actor, driver_node), (a, a_node), (b, b_node)], 1000 - ) + ch = ray_channel.Channel( + None, [(driver_actor, driver_node), (a, a_node), (b, b_node)], 1000 + ) + val = 1 + ch.write(val) + assert ray.get([a.read.remote(ch, val) for a in actors]) == [val, val] @pytest.mark.skipif( @@ -1273,6 +1277,10 @@ class Actor: def get_node_id(self): return ray.get_runtime_context().get_node_id() + def read(self, channel, val): + assert channel.read() == val + return val + def create_actor(node): return Actor.options( scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) @@ -1296,20 +1304,22 @@ def create_actor(node): driver_actor = create_driver_actor() driver_node = get_actor_node_id(driver_actor) - with pytest.raises( - ValueError, match="All reader actors must be on the same node.*" - ): - ray_channel.Channel( - None, - [ - (driver_actor, driver_node), - (a, a_node), - (b, b_node), - (c, c_node), - (d, d_node), - ], - 1000, - ) + ch = ray_channel.Channel( + None, + [ + (driver_actor, driver_node), + (a, a_node), + (b, b_node), + (c, c_node), + (d, d_node), + ], + 1000, + ) + i = 1 + ch.write(i) + assert ray.get([a.read.remote(ch, i) for a in actors]) == [ + i for _ in range(len(actors)) + ] def test_buffered_channel(shutdown_only): diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 36f6daea238ca..2b62d334de541 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1488,59 +1488,54 @@ Status CoreWorker::ExperimentalChannelReadRelease( return experimental_mutable_object_provider_->ReadRelease(object_ids[0]); } -Status CoreWorker::ExperimentalRegisterMutableObjectWriter(const ObjectID &object_id, - const NodeID *node_id) { - experimental_mutable_object_provider_->RegisterWriterChannel(object_id, node_id); +Status CoreWorker::ExperimentalRegisterMutableObjectWriter( + const ObjectID &writer_object_id, const std::vector &remote_reader_node_ids) { + experimental_mutable_object_provider_->RegisterWriterChannel(writer_object_id, + remote_reader_node_ids); return Status::OK(); } Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( const ObjectID &writer_object_id, - const ActorID &reader_actor, - int64_t num_readers, - const ObjectID &reader_object_id) { - rpc::Address addr; - { - std::promise promise; - RAY_CHECK(gcs_client_->Actors() - .AsyncGet(reader_actor, - [&addr, &promise]( - Status status, - const std::optional &result) { - RAY_CHECK(result); - if (result) { - addr.set_ip_address(result->address().ip_address()); - addr.set_port(result->address().port()); - addr.set_worker_id(result->address().worker_id()); - } - promise.set_value(); - }) - .ok()); - promise.get_future().wait(); + const std::vector &remote_reader_ref_info) { + if (remote_reader_ref_info.size() == 0) { + return Status::OK(); } - // TODO(jhumphri): Support case where there are readers on multiple different nodes. - // Currently, this code only supports the case where all readers are on a single node. - { + std::shared_ptr num_replied = std::make_shared(0); + size_t num_requests = remote_reader_ref_info.size(); + std::promise promise; + for (const auto &reader_ref_info : remote_reader_ref_info) { + const auto &owner_reader_actor_id = reader_ref_info.owner_reader_actor_id; + const auto &reader_object_id = reader_ref_info.reader_ref_id; + const auto &num_reader = reader_ref_info.num_reader_actors; + const auto &addr = actor_task_submitter_->GetActorAddress(owner_reader_actor_id); + // It can happen if an actor is not created yet. We assume the API is called only when + // an actor is alive, which is true now. + RAY_CHECK(addr.has_value()); + std::shared_ptr conn = - core_worker_client_pool_->GetOrConnect(addr); + core_worker_client_pool_->GetOrConnect(*addr); rpc::RegisterMutableObjectReaderRequest req; req.set_writer_object_id(writer_object_id.Binary()); - req.set_num_readers(num_readers); + req.set_num_readers(num_reader); req.set_reader_object_id(reader_object_id.Binary()); rpc::RegisterMutableObjectReaderReply reply; - std::promise promise; + // TODO(sang): Add timeout. conn->RegisterMutableObjectReader( req, - [&promise](const Status &status, - const rpc::RegisterMutableObjectReaderReply &reply) { + [&promise, num_replied, num_requests, addr]( + const Status &status, const rpc::RegisterMutableObjectReaderReply &reply) { RAY_CHECK(status.ok()); - promise.set_value(); + *num_replied += 1; + if (*num_replied == num_requests) { + promise.set_value(); + } }); - promise.get_future().wait(); } + promise.get_future().wait(); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index a0c08bc4b7b00..7eb1dca838917 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -736,29 +736,31 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Experimental method for mutable objects. Registers a writer channel. /// - /// \param[in] object_id The ID of the object. - /// \param[in] node_id If non-NULL, sends each write to the readers on node `node_id`. - Status ExperimentalRegisterMutableObjectWriter(const ObjectID &object_id, - const NodeID *node_id); + /// The API is not idempotent. + /// + /// \param[in] writer_object_id The ID of the object. + /// \param[in] remote_reader_node_ids The list of remote reader's node ids. + Status ExperimentalRegisterMutableObjectWriter( + const ObjectID &writer_object_id, + const std::vector &remote_reader_node_ids); /// Experimental method for mutable objects. Registers a reader channel. /// + /// The API is not idempotent. + /// /// \param[in] object_id The ID of the object. Status ExperimentalRegisterMutableObjectReader(const ObjectID &object_id); /// Experimental method for mutable objects. Registers a mapping from a mutable object /// that is written to on this node to the corresponding mutable object that is read on - /// the node that `reader_actor` is on. + /// the node that `remote_reader_actors` is on. /// /// \param[in] writer_object_id The ID of the object that is written on this node. - /// \param[in] reader_actor The actor that reads the object. - /// \param[in] num_readers The total number of readers. - /// \param[in] reader_object_id The ID of the corresponding object that is read on the - /// remote node. - Status ExperimentalRegisterMutableObjectReaderRemote(const ObjectID &writer_object_id, - const ActorID &reader_actor, - int64_t num_readers, - const ObjectID &reader_object_id); + /// \param[in] remote_reader_ref_info The remote reader reference info. There's + /// 1 reader reference per node. + Status ExperimentalRegisterMutableObjectReaderRemote( + const ObjectID &writer_object_id, + const std::vector &remote_reader_ref_info); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. diff --git a/src/ray/core_worker/experimental_mutable_object_manager.cc b/src/ray/core_worker/experimental_mutable_object_manager.cc index 4e8aa20190edf..ef2120a80a1c8 100644 --- a/src/ray/core_worker/experimental_mutable_object_manager.cc +++ b/src/ray/core_worker/experimental_mutable_object_manager.cc @@ -331,7 +331,7 @@ Status MutableObjectManager::ReadAcquire(const ObjectID &object_id, channel->reading = true; int64_t version_read = 0; Status s = object->header->ReadAcquire( - sem, channel->next_version_to_read, version_read, timeout_point); + object_id, sem, channel->next_version_to_read, version_read, timeout_point); if (!s.ok()) { RAY_LOG(DEBUG) << "ReadAcquire error was set, returning " << object_id; // Failed because the error bit was set on the mutable object. diff --git a/src/ray/core_worker/experimental_mutable_object_manager.h b/src/ray/core_worker/experimental_mutable_object_manager.h index dd24ffcac43d7..2ddafebc70858 100644 --- a/src/ray/core_worker/experimental_mutable_object_manager.h +++ b/src/ray/core_worker/experimental_mutable_object_manager.h @@ -34,6 +34,17 @@ namespace ray { namespace experimental { +struct ReaderRefInfo { + ReaderRefInfo() = default; + + // The ObjectID of the reader reference. + ObjectID reader_ref_id; + // The actor id of the owner of the reference. + ActorID owner_reader_actor_id; + // The number of reader actors reading this buffer. + int64_t num_reader_actors; +}; + class MutableObjectManager : public std::enable_shared_from_this { public: /// Buffer for a mutable object. This buffer wraps a shared memory buffer of diff --git a/src/ray/core_worker/experimental_mutable_object_provider.cc b/src/ray/core_worker/experimental_mutable_object_provider.cc index dba3dc705b7c8..9dea315d957ad 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.cc +++ b/src/ray/core_worker/experimental_mutable_object_provider.cc @@ -39,40 +39,56 @@ MutableObjectProvider::~MutableObjectProvider() { } } -void MutableObjectProvider::RegisterWriterChannel(const ObjectID &object_id, - const NodeID *node_id) { +void MutableObjectProvider::RegisterWriterChannel( + const ObjectID &writer_object_id, const std::vector &remote_reader_node_ids) { { - std::unique_ptr object; - RAY_CHECK_OK(plasma_->GetExperimentalMutableObject(object_id, &object)); - RAY_CHECK_OK( - object_manager_->RegisterChannel(object_id, std::move(object), /*reader=*/false)); + std::unique_ptr writer_object; + RAY_CHECK_OK(plasma_->GetExperimentalMutableObject(writer_object_id, &writer_object)); + RAY_CHECK_OK(object_manager_->RegisterChannel( + writer_object_id, std::move(writer_object), /*reader=*/false)); // `object` is now a nullptr. } - if (node_id) { - // Start a thread that repeatedly listens for values on this object and then sends - // them via RPC to the remote reader. - io_contexts_.push_back(std::make_unique()); - instrumented_io_context &io_context = *io_contexts_.back(); - io_works_.push_back( - std::make_unique< - boost::asio::executor_work_guard>( - io_context.get_executor())); + if (remote_reader_node_ids.size() == 0) { + return; + } + + std::shared_ptr>> + remote_readers = + std::make_shared>>(); + // TODO(sang): Currently, these attributes are not cleaned up. + // Start a thread that repeatedly listens for values on this object and then sends + // them via RPC to the remote reader. + io_contexts_.push_back(std::make_unique()); + instrumented_io_context &io_context = *io_contexts_.back(); + io_works_.push_back( + std::make_unique< + boost::asio::executor_work_guard>( + io_context.get_executor())); + + // Find remote readers. + for (const auto &node_id : remote_reader_node_ids) { client_call_managers_.push_back(std::make_unique(io_context)); std::shared_ptr reader = - raylet_client_factory_(*node_id, *client_call_managers_.back()); + raylet_client_factory_(node_id, *client_call_managers_.back()); RAY_CHECK(reader); - // TODO(jhumphri): Extend this to support multiple channels. Currently, we must have - // one thread per channel because the thread blocks on the channel semaphore. - - io_context.post( - [this, &io_context, object_id, reader]() { - PollWriterClosure(io_context, object_id, reader); - }, - "experimental::MutableObjectProvider.PollWriter"); - io_threads_.push_back(std::make_unique( - &MutableObjectProvider::RunIOContext, this, std::ref(io_context))); + remote_readers->push_back(reader); } + + // TODO(jhumphri): Extend this to support multiple channels. Currently, we must have + // one thread per channel because the thread blocks on the channel semaphore. + // TODO(sang): We currently create a thread per object id. It is not scalable. + // We should instead just use a pool of threads. + io_context.post( + [this, + &io_context, + writer_object_id, + remote_readers = std::move(remote_readers)]() { + PollWriterClosure(io_context, writer_object_id, remote_readers); + }, + "experimental::MutableObjectProvider.PollWriter"); + io_threads_.push_back(std::make_unique( + &MutableObjectProvider::RunIOContext, this, std::ref(io_context))); } void MutableObjectProvider::RegisterReaderChannel(const ObjectID &object_id) { @@ -123,7 +139,7 @@ void MutableObjectProvider::HandlePushMutableObject( tmp_written_so_far = written_so_far_[writer_object_id]; written_so_far_[writer_object_id] += chunk_size; if (written_so_far_[writer_object_id] == total_size) { - written_so_far_[writer_object_id] = 0; + written_so_far_.erase(written_so_far_.find(writer_object_id)); } } @@ -199,12 +215,14 @@ Status MutableObjectProvider::GetChannelStatus(const ObjectID &object_id, void MutableObjectProvider::PollWriterClosure( instrumented_io_context &io_context, - const ObjectID &object_id, - std::shared_ptr reader) { + const ObjectID &writer_object_id, + std::shared_ptr>> + remote_readers) { + // NOTE: There's only 1 PollWriterClosure at any time in a single thread. std::shared_ptr object; // The corresponding ReadRelease() will be automatically called when // `object` goes out of scope. - Status status = object_manager_->ReadAcquire(object_id, object); + Status status = object_manager_->ReadAcquire(writer_object_id, object); // Check if the thread returned from ReadAcquire() because the process is exiting, not // because there is something to read. if (status.code() == StatusCode::ChannelError) { @@ -216,20 +234,31 @@ void MutableObjectProvider::PollWriterClosure( RAY_CHECK(object->GetData()); RAY_CHECK(object->GetMetadata()); - RAY_LOG(ERROR) << "SANG-TODO Push mutable object! " << object_id; - reader->PushMutableObject( - object_id, - object->GetData()->Size(), - object->GetMetadata()->Size(), - object->GetData()->Data(), - [this, &io_context, object_id, reader](const Status &status, - const rpc::PushMutableObjectReply &reply) { - io_context.post( - [this, &io_context, object_id, reader]() { - PollWriterClosure(io_context, object_id, reader); - }, - "experimental::MutableObjectProvider.PollWriter"); - }); + std::shared_ptr num_replied = std::make_shared(0); + for (const auto &reader : *remote_readers) { + reader->PushMutableObject( + writer_object_id, + object->GetData()->Size(), + object->GetMetadata()->Size(), + object->GetData()->Data(), + [this, &io_context, writer_object_id, remote_readers, num_replied]( + const Status &status, const rpc::PushMutableObjectReply &reply) { + *num_replied += 1; + if (!status.ok()) { + RAY_LOG(ERROR) + << "Failed to transfer object to a remote node for an object id " + << writer_object_id << ". It can cause hang."; + } + + if (*num_replied == remote_readers->size()) { + io_context.post( + [this, &io_context, writer_object_id, remote_readers]() { + PollWriterClosure(io_context, writer_object_id, remote_readers); + }, + "experimental::MutableObjectProvider.PollWriter"); + } + }); + } } void MutableObjectProvider::RunIOContext(instrumented_io_context &io_context) { diff --git a/src/ray/core_worker/experimental_mutable_object_provider.h b/src/ray/core_worker/experimental_mutable_object_provider.h index ddf248f3994e3..c0fcd86bd0404 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.h +++ b/src/ray/core_worker/experimental_mutable_object_provider.h @@ -44,9 +44,11 @@ class MutableObjectProvider { /// Registers a writer channel for `object_id` on this node. On each write to this /// channel, the write will be sent via RPC to node `node_id`. + /// /// \param[in] object_id The ID of the object. - /// \param[in] node_id The ID of the node to write to. - void RegisterWriterChannel(const ObjectID &object_id, const NodeID *node_id); + /// \param[in] remote_reader_node_ids The list of remote reader's node ids. + void RegisterWriterChannel(const ObjectID &writer_object_id, + const std::vector &remote_reader_node_ids); /// Handles an RPC request from another note to register a mutable object on this node. /// The remote node writes the object and this node reads the object. This node is @@ -147,11 +149,17 @@ class MutableObjectProvider { ObjectID local_object_id; }; - // Listens for local changes to `object_id` and sends the changes to remote nodes via - // the network. - void PollWriterClosure(instrumented_io_context &io_context, - const ObjectID &object_id, - std::shared_ptr reader); + /// Listens for local changes to `object_id` and sends the changes to remote nodes via + /// the network. + /// + /// \param[in] io_context The IO context. + /// \param[in] writer_object_id The object ID of the writer. + /// \param[in] remote_readers A list of remote reader clients. + void PollWriterClosure( + instrumented_io_context &io_context, + const ObjectID &writer_object_id, + std::shared_ptr>> + remote_readers); // Kicks off `io_context`. void RunIOContext(instrumented_io_context &io_context); diff --git a/src/ray/core_worker/test/mutable_object_provider_test.cc b/src/ray/core_worker/test/mutable_object_provider_test.cc index faaaed59a737e..2b5fdd1395970 100644 --- a/src/ray/core_worker/test/mutable_object_provider_test.cc +++ b/src/ray/core_worker/test/mutable_object_provider_test.cc @@ -153,7 +153,7 @@ TEST(MutableObjectProvider, RegisterWriterChannel) { MutableObjectProvider provider( plasma, /*factory=*/absl::bind_front(GetTestInterface, interface)); - provider.RegisterWriterChannel(object_id, &node_id); + provider.RegisterWriterChannel(object_id, {node_id}); std::shared_ptr data; EXPECT_EQ(provider @@ -179,7 +179,7 @@ TEST(MutableObjectProvider, MutableObjectBufferReadRelease) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider @@ -239,7 +239,7 @@ TEST(MutableObjectProvider, MutableObjectBufferSetError) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider @@ -294,7 +294,7 @@ TEST(MutableObjectProvider, MutableObjectBufferSetErrorBeforeWriteRelease) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider @@ -349,7 +349,7 @@ TEST(MutableObjectProvider, MutableObjectBufferSetErrorBeforeReadRelease) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index c3ce03d9ce1ab..babd1ba8dc6db 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -802,6 +802,23 @@ bool ActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { return (iter != client_queues_.end() && iter->second.rpc_client); } +std::optional ActorTaskSubmitter::GetActorAddress( + const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + + auto iter = client_queues_.find(actor_id); + if (iter == client_queues_.end()) { + return std::nullopt; + } + + const auto &rpc_client = iter->second.rpc_client; + if (rpc_client == nullptr) { + return std::nullopt; + } + + return iter->second.rpc_client->Addr(); +} + bool ActorTaskSubmitter::PendingTasksFull(const ActorID &actor_id) const { absl::MutexLock lock(&mu_); auto it = client_queues_.find(actor_id); diff --git a/src/ray/core_worker/transport/actor_task_submitter.h b/src/ray/core_worker/transport/actor_task_submitter.h index ebee8ebceaab7..18624e8988acf 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.h +++ b/src/ray/core_worker/transport/actor_task_submitter.h @@ -191,6 +191,10 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// \return Whether this actor is alive. bool IsActorAlive(const ActorID &actor_id) const; + /// Get the given actor id's address. + /// It returns nullopt if the actor's address is not reported. + std::optional GetActorAddress(const ActorID &actor_id) const; + /// Get the local actor state. nullopt if the state is unknown. std::optional GetLocalActorState( const ActorID &actor_id) const; diff --git a/src/ray/object_manager/common.cc b/src/ray/object_manager/common.cc index 1d8664351d051..c67f18a227a62 100644 --- a/src/ray/object_manager/common.cc +++ b/src/ray/object_manager/common.cc @@ -162,6 +162,7 @@ Status PlasmaObjectHeader::WriteRelease(Semaphores &sem) { } Status PlasmaObjectHeader::ReadAcquire( + const ObjectID &object_id, Semaphores &sem, int64_t version_to_read, int64_t &version_read, @@ -177,8 +178,8 @@ Status PlasmaObjectHeader::ReadAcquire( sched_yield(); // We need to get the desired version before timeout if (timeout_point && std::chrono::steady_clock::now() >= *timeout_point) { - return Status::ChannelTimeoutError( - "Timed out waiting for object available to read."); + return Status::ChannelTimeoutError(absl::StrCat( + "Timed out waiting for object available to read. ObjectID: ", object_id.Hex())); } RAY_RETURN_NOT_OK(TryToAcquireSemaphore(sem.header_sem, timeout_point)); } @@ -255,6 +256,7 @@ Status PlasmaObjectHeader::WriteRelease(Semaphores &sem) { } Status PlasmaObjectHeader::ReadAcquire( + const ObjectID &object_id, Semaphores &sem, int64_t version_to_read, int64_t &version_read, diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index b3b5f791b55d4..1f9fa91dd4b15 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -152,6 +152,7 @@ struct PlasmaObjectHeader { /// Blocks until the given version is ready to read. Returns false if the /// maximum number of readers have already read the requested version. /// + /// \param[in] object_id ObjectID to acquire a lock from. /// \param[in] sem The semaphores for this channel. /// \param[in] read_version The version to read. /// \param[out] version_read For normal immutable objects, this will be set to @@ -163,7 +164,8 @@ struct PlasmaObjectHeader { /// TimedOut immediately without blocking. /// \return Whether the correct version was read and there were still /// reads remaining. - Status ReadAcquire(Semaphores &sem, + Status ReadAcquire(const ObjectID &object_id, + Semaphores &sem, int64_t version_to_read, int64_t &version_read, const std::unique_ptr diff --git a/src/ray/object_manager/plasma/test/mutable_object_test.cc b/src/ray/object_manager/plasma/test/mutable_object_test.cc index ecdbc2edfa7ce..68d12f320e519 100644 --- a/src/ray/object_manager/plasma/test/mutable_object_test.cc +++ b/src/ray/object_manager/plasma/test/mutable_object_test.cc @@ -79,7 +79,8 @@ void Read(PlasmaObjectHeader *header, int64_t version_to_read = 1; for (size_t i = 0; i < num_reads; i++) { int64_t version_read = 0; - if (!header->ReadAcquire(sem, version_to_read, version_read).ok()) { + if (!header->ReadAcquire(ObjectID::FromRandom(), sem, version_to_read, version_read) + .ok()) { data_results.push_back("error"); metadata_results.push_back("error"); return; diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index dfc7ea1eb3fd2..938f20fab80e7 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -426,8 +426,7 @@ void raylet::RayletClient::PushMutableObject( const ray::rpc::ClientCallback &callback) { // Ray sets the gRPC max payload size to ~512 MiB. We set the max chunk size to a // slightly lower value to allow extra padding just in case. - uint64_t kMaxGrpcPayloadSize = - RayConfig::instance().max_grpc_message_size() * 0.98; // 500 MiB. + uint64_t kMaxGrpcPayloadSize = RayConfig::instance().max_grpc_message_size() * 0.98; uint64_t total_size = data_size + metadata_size; uint64_t total_num_chunks = total_size / kMaxGrpcPayloadSize; // If `total_size` is not a multiple of `kMaxGrpcPayloadSize`, then we need to send an