From 2ad93e7f98fe8b5afeef3983e1d37678fe1176b5 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 30 Aug 2024 23:22:46 -0700 Subject: [PATCH 01/12] . --- .../tests/experimental/test_multi_node_dag.py | 32 +++++++++++++++++++ .../channel/shared_memory_channel.py | 13 -------- src/ray/core_worker/core_worker.cc | 3 +- 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 24931e0e412f..32dd9a2b1097 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -5,6 +5,7 @@ import time import pytest from ray.dag import InputNode, MultiOutputNode +import ray.remote_function from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray.tests.conftest import * # noqa @@ -206,6 +207,37 @@ def execute_model(self, val): compiled_dag.teardown() +def test_multi_reader_on_multi_node(ray_start_cluster): + cluster = ray_start_cluster + head = cluster.add_node(num_cpus=1) + ray.init(address=cluster.address) + worker_1 = cluster.add_node(num_cpus=1) + worker_2 = cluster.add_node(num_cpus=1) + + @ray.remote(num_cpus=1) + class Actor: + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + def f(self, i): + return i + + # 3 actors spread to nodes. + actors = [Actor.remote() for _ in range(3)] + node_ids = set(ray.get([actor.get_node_id.remote() for actor in actors])) + # Make sure actors are spread to 3 nodes. + assert len(node_ids) == 3 + + with InputNode() as inp: + outputs = [] + for actor in actors: + outputs.append(actor.f.bind(inp)) + adag = MultiOutputNode(outputs) + + adag.experimental_compile() + print(ray.get(adag.execute(1))) + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 5dbee7859425..053fed4fb0b7 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -214,19 +214,6 @@ def __init__( self_actor = _get_self_actor() assert writer == self_actor - # For now, all readers must be on the same node. Note that the writer can - # still be on a different node than the readers though. - # - # Note that we only check this when the writer is creating the channel. - # Ideally, when each reader constructs its own instance of the channel, it - # would check this as well. However, this could result in deadlock as two - # readers attempt to execute a remote function on each other to get each - # other's node ID. We cannot use a separate concurrency group to execute the - # function because reader actors may not have been declared with an - # additional concurrency group beyond default. - # - # TODO(jhumphri): Allow different readers for the same channel to be on - # different nodes. prev_reader_node = None prev_reader = None for reader, node in reader_and_node_list: diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 972cf9dd07b8..7963d6e779ab 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1518,8 +1518,7 @@ Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( promise.get_future().wait(); } - // TODO(jhumphri): Support case where there are readers on multiple different nodes. - // Currently, this code only supports the case where all readers are on a single node. + // SANG-TODO { std::shared_ptr conn = core_worker_client_pool_->GetOrConnect(addr); From e1b9fef9821ca8e4f2a12da009d4fa8007369448 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 4 Sep 2024 08:41:26 -0700 Subject: [PATCH 02/12] ip --- .../tests/experimental/test_multi_node_dag.py | 10 +- .../channel/shared_memory_channel.py | 157 ++++++++++-------- src/ray/core_worker/core_worker.cc | 2 +- .../experimental_mutable_object_provider.cc | 1 + 4 files changed, 97 insertions(+), 73 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 32dd9a2b1097..35d5ebc85eb0 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -209,7 +209,7 @@ def execute_model(self, val): def test_multi_reader_on_multi_node(ray_start_cluster): cluster = ray_start_cluster - head = cluster.add_node(num_cpus=1) + head = cluster.add_node(num_cpus=0) ray.init(address=cluster.address) worker_1 = cluster.add_node(num_cpus=1) worker_2 = cluster.add_node(num_cpus=1) @@ -220,13 +220,14 @@ def get_node_id(self): return ray.get_runtime_context().get_node_id() def f(self, i): + print(self.get_node_id()) return i - + # 3 actors spread to nodes. - actors = [Actor.remote() for _ in range(3)] + actors = [Actor.remote() for _ in range(2)] node_ids = set(ray.get([actor.get_node_id.remote() for actor in actors])) # Make sure actors are spread to 3 nodes. - assert len(node_ids) == 3 + assert len(node_ids) == 2 with InputNode() as inp: outputs = [] @@ -236,6 +237,7 @@ def f(self, i): adag.experimental_compile() print(ray.get(adag.execute(1))) + time.sleep(30) if __name__ == "__main__": diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 053fed4fb0b7..80a45c315269 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -1,6 +1,7 @@ import io import logging import time +from collections import defaultdict from typing import Any, Dict, List, Optional, Set, Tuple, Union import ray @@ -82,6 +83,7 @@ class _ResizeChannel: """ def __init__(self, reader_ref: "ray.ObjectRef"): + # SANG-TODO self._reader_ref = reader_ref @@ -156,9 +158,9 @@ def __init__( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], typ: Optional[Union[int, SharedMemoryType]] = None, _writer_node_id: Optional["ray.NodeID"] = None, - _reader_node_id: Optional["ray.NodeID"] = None, + _reader_node_ids: Optional[Set["ray.NodeID"]] = None, _writer_ref: Optional["ray.ObjectRef"] = None, - _reader_ref: Optional["ray.ObjectRef"] = None, + _reader_refs: Optional[Dict[str, "ray.ObjectRef"]] = None, _writer_registered: bool = False, _reader_registered: bool = False, ): @@ -203,6 +205,11 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered + self._reader_node_ids = _reader_node_ids + # node_id -> reader ref. There's only 1 reader ref per node because + # it is shared by all actors on that node. + self._reader_refs: Dict[str, "ray.ObjectRef"] = _reader_refs or {} + self._node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) if _writer_ref is None: # We are the writer. Check that the passed handle matches the @@ -214,66 +221,71 @@ def __init__( self_actor = _get_self_actor() assert writer == self_actor - prev_reader_node = None - prev_reader = None - for reader, node in reader_and_node_list: - if prev_reader_node is None: - prev_reader_node = node - elif prev_reader_node != node: - raise ValueError( - f"All reader actors must be on the same node. Actor " - f"{prev_reader} is on node {prev_reader_node} while actor " - f"{reader} is on node {node}." - ) - prev_reader = reader + for reader, node_id in reader_and_node_list: + self._reader_node_ids.add(node_id) self._writer_node_id = ( ray.runtime_context.get_runtime_context().get_node_id() ) self._writer_ref = _create_channel_ref(self, typ.buffer_size_bytes) - self._reader_node_id = prev_reader_node - self._create_reader_ref(reader_and_node_list, typ.buffer_size_bytes) + self._create_reader_refs(reader_and_node_list, typ.buffer_size_bytes) - assert self._reader_ref is not None + assert len(self._reader_refs) > 0 else: assert ( _writer_node_id is not None ), "_writer_node_id must also be passed to the constructor when " "_writer_ref is." assert ( - _reader_ref is not None - ), "_reader_ref must also be passed to the constructor when _writer_ref is." + _reader_refs is not None + ), "_reader_refs must also be passed to the constructor when _writer_ref is." self._writer_ref = _writer_ref self._writer_node_id = _writer_node_id - self._reader_node_id = _reader_node_id - self._reader_ref = _reader_ref - - self._num_readers = len(self._reader_and_node_list) - if self.is_remote(): - # Even though there may be multiple readers on a remote node, we set - # `self._num_readers` to 1 here. On this local node, only the IO thread in - # the mutable object provider will read the mutable object. The IO thread - # will then send a gRPC with the mutable object contents to the remote node - # where the readers are. - self._num_readers = 1 - - def _create_reader_ref( + self._reader_node_ids = _reader_node_ids + self._reader_refs = _reader_refs + + node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) + for reader, node_id in self._reader_and_node_list: + node_id_to_readers[node_id].append(reader) + + assert self._num_readers == 0 + # Find num_readers. We have local readers and 1 reader per remote node + # which listens to mutable object being changed and push the object to + # remote nodes. + for node_id, readers in node_id_to_readers.items(): + if self.is_local_node(node_id): + self._num_readers += len(readers) + else: + self._num_readers += 1 + + def _create_reader_refs( self, reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], buffer_size_bytes: int, ): # TODO(jhumphri): Free the current reader ref once the reference to it is # destroyed below. - reader = reader_and_node_list[0][0] - if self.is_remote(): - fn = reader.__ray_call__ - self._reader_ref = ray.get( - fn.remote(_create_channel_ref, buffer_size_bytes) - ) - else: - self._reader_ref = self._writer_ref + + node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) + for reader, node_id in reader_and_node_list: + node_id_to_readers[node_id].append(reader) + + for node_id, readers in node_id_to_readers.items(): + if not self.is_local_node(node_id): + # Find 1 reader in a remote node to create a reference that's + # shared by all readers. + reader = readers[0] + fn = reader.__ray_call__ + self._reader_refs[node_id] = ( + ray.get(fn.remote(_create_channel_ref, buffer_size_bytes)), + reader._actor_id, + ) + else: + self._reader_refs[node_id] = (self._writer_ref, self._writer._actor_id) + + assert len(self._reader_refs) == len(node_id_to_readers) # We need to register the new writer_ref. self._writer_registered = False @@ -283,9 +295,6 @@ def _create_reader_ref( def is_local_node(node_id): return ray.runtime_context.get_runtime_context().get_node_id() == node_id - def is_remote(self): - return self._writer_node_id != self._reader_node_id - def ensure_registered_as_writer(self) -> None: if self._writer_registered: return @@ -296,27 +305,32 @@ def ensure_registered_as_writer(self) -> None: "the writer is on." ) - assert ( - self._reader_ref - ), "`self._reader_ref` must be not be None when registering a writer, because " - "it should have been initialized in the constructor." - self._worker.core_worker.experimental_channel_register_writer( - self._writer_ref, - self._reader_ref, - self._writer_node_id, - self._reader_node_id, - self._reader_and_node_list[0][0]._actor_id, - len(self._reader_and_node_list), - ) + node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) + for reader, node_id in self._reader_and_node_list: + node_id_to_readers[node_id].append(reader) + + for reader_node_id, reader_ref_and_reader_id in self._reader_refs.items(): + reader_ref, reader_id = reader_ref_and_reader_id + self._worker.core_worker.experimental_channel_register_writer( + self._writer_ref, + reader_ref, + self._writer_node_id, + reader_node_id, + reader_id, + len(node_id_to_readers[reader_node_id]), + ) self._writer_registered = True def ensure_registered_as_reader(self) -> None: + assert not self._writer_registered if self._reader_registered: return - self._worker.core_worker.experimental_channel_register_reader( - self._reader_ref, - ) + for reader_node_id, reader_ref_and_reader_id in self._reader_refs.items(): + if self.is_local_node(reader_node_id): + self._worker.core_worker.experimental_channel_register_reader( + reader_ref_and_reader_id[0], + ) self._reader_registered = True @staticmethod @@ -325,9 +339,9 @@ def _deserialize_reader_channel( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], typ: int, writer_node_id, - reader_node_id, + reader_node_ids, writer_ref: "ray.ObjectRef", - reader_ref: "ray.ObjectRef", + reader_refs: "ray.ObjectRef", writer_registered: bool, reader_registered: bool, ) -> "Channel": @@ -336,31 +350,31 @@ def _deserialize_reader_channel( reader_and_node_list, typ, _writer_node_id=writer_node_id, - _reader_node_id=reader_node_id, + _reader_node_ids=reader_node_ids, _writer_ref=writer_ref, - _reader_ref=reader_ref, + _reader_refs=reader_refs, _writer_registered=writer_registered, _reader_registered=reader_registered, ) return chan def __reduce__(self): - assert self._reader_ref is not None + assert self._reader_refs is not None return self._deserialize_reader_channel, ( self._writer, self._reader_and_node_list, self._typ, self._writer_node_id, - self._reader_node_id, + self._reader_node_ids, self._writer_ref, - self._reader_ref, + self._reader_refs, self._writer_registered, self._reader_registered, ) def __str__(self) -> str: return ( - f"Channel(_reader_ref={self._reader_ref}, _writer_ref={self._writer_ref})" + f"Channel(_reader_refs={self._reader_refs}, _writer_ref={self._writer_ref})" ) def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): @@ -376,7 +390,8 @@ def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): prev_writer_ref = self._writer_ref self._writer_ref = _create_channel_ref(self, self._typ.buffer_size_bytes) - self._create_reader_ref( + # SANG-TODO add tests. Should only handle reader that's relevant. + self._create_reader_refs( self._reader_and_node_list, self._typ.buffer_size_bytes ) @@ -462,8 +477,14 @@ def close(self) -> None: reader_ref. """ self._worker.core_worker.experimental_channel_set_error(self._writer_ref) - if self.is_local_node(self._reader_node_id): + is_local_node_reader = False + + for node_id in self._reader_node_id: + if self.is_local_node(node_id): + is_local_node_reader = True + if is_local_node_reader: self.ensure_registered_as_reader() + self._worker.core_worker.experimental_channel_set_error(self._reader_ref) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 7963d6e779ab..9668b97589a6 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1501,6 +1501,7 @@ Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( rpc::Address addr; { std::promise promise; + // SANG-TODO Should introduce the timeout or pass it. RAY_CHECK(gcs_client_->Actors() .AsyncGet(reader_actor, [&addr, &promise]( @@ -1518,7 +1519,6 @@ Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( promise.get_future().wait(); } - // SANG-TODO { std::shared_ptr conn = core_worker_client_pool_->GetOrConnect(addr); diff --git a/src/ray/core_worker/experimental_mutable_object_provider.cc b/src/ray/core_worker/experimental_mutable_object_provider.cc index f0d089379f2b..4433cf4bf7ab 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.cc +++ b/src/ray/core_worker/experimental_mutable_object_provider.cc @@ -49,6 +49,7 @@ void MutableObjectProvider::RegisterWriterChannel(const ObjectID &object_id, // `object` is now a nullptr. } + // SANG-TODO Share a thread instead of having it per node. if (node_id) { // Start a thread that repeatedly listens for values on this object and then sends // them via RPC to the remote reader. From 5ea646b635f5ad69f69c268fb6dc0926abae98c7 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Wed, 4 Sep 2024 08:54:40 -0700 Subject: [PATCH 03/12] . --- .../experimental/channel/shared_memory_channel.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 80a45c315269..4c7f627d85e7 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -205,11 +205,12 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered - self._reader_node_ids = _reader_node_ids + self._reader_node_ids = _reader_node_ids or set() # node_id -> reader ref. There's only 1 reader ref per node because # it is shared by all actors on that node. - self._reader_refs: Dict[str, "ray.ObjectRef"] = _reader_refs or {} + self._reader_refs: Dict[str, Tuple["ray.ObjectRef", ray.ActorID]] = _reader_refs or {} self._node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) + self._num_readers = 0 if _writer_ref is None: # We are the writer. Check that the passed handle matches the @@ -250,16 +251,20 @@ def __init__( for reader, node_id in self._reader_and_node_list: node_id_to_readers[node_id].append(reader) - assert self._num_readers == 0 # Find num_readers. We have local readers and 1 reader per remote node # which listens to mutable object being changed and push the object to # remote nodes. + assert self._num_readers == 0 for node_id, readers in node_id_to_readers.items(): if self.is_local_node(node_id): self._num_readers += len(readers) else: self._num_readers += 1 + self._local_reader_ref = None + for node_id, reader_ref_and_reader_id in self._reader_refs.items(): + + def _create_reader_refs( self, reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], @@ -322,7 +327,6 @@ def ensure_registered_as_writer(self) -> None: self._writer_registered = True def ensure_registered_as_reader(self) -> None: - assert not self._writer_registered if self._reader_registered: return @@ -457,6 +461,7 @@ def read(self, timeout: Optional[float] = None) -> Any: [self._reader_ref], timeout=timeout, return_exceptions=True )[0][0] + # SANG-TODO if isinstance(ret, _ResizeChannel): self._reader_ref = ret._reader_ref # We need to register the new reader_ref. From 9bd2e7e97f7228e94c61e4b0dc62b8028cc5c8a0 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 6 Sep 2024 01:23:18 -0700 Subject: [PATCH 04/12] working --- python/ray/_raylet.pyx | 58 +++-- python/ray/dag/compiled_dag_node.py | 1 - .../experimental/test_accelerated_dag.py | 54 ----- .../tests/experimental/test_multi_node_dag.py | 226 +++++++++++------- .../channel/shared_memory_channel.py | 156 +++++++----- python/ray/includes/libcoreworker.pxd | 10 +- src/ray/core_worker/core_worker.cc | 70 +++--- src/ray/core_worker/core_worker.h | 21 +- .../experimental_mutable_object_manager.cc | 2 +- .../experimental_mutable_object_provider.cc | 119 +++++---- .../experimental_mutable_object_provider.h | 14 +- .../transport/actor_task_submitter.cc | 17 ++ .../transport/actor_task_submitter.h | 3 + src/ray/object_manager/common.cc | 6 +- src/ray/object_manager/common.h | 4 +- .../plasma/test/mutable_object_test.cc | 3 +- src/ray/raylet_client/raylet_client.cc | 2 +- 17 files changed, 446 insertions(+), 320 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index f466a1b815cb..06065734b928 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3687,37 +3687,45 @@ cdef class CoreWorker: def experimental_channel_register_writer(self, ObjectRef writer_ref, - ObjectRef reader_ref, writer_node, - reader_node, - ActorID reader, - int64_t num_readers): + remote_reader_refs, + remote_reader_nodes, + remote_readers, + remote_num_readers_per_node): cdef: CObjectID c_writer_ref = writer_ref.native() - CObjectID c_reader_ref = reader_ref.native() - CNodeID c_reader_node = CNodeID.FromHex(reader_node) - CNodeID *c_reader_node_id = NULL - CActorID c_reader_actor = reader.native() - - if num_readers == 0: - return - if writer_node != reader_node: - c_reader_node_id = &c_reader_node + CNodeID c_writer_node = CNodeID.FromHex(writer_node) + c_vector[CObjectID] c_remote_reader_refs + c_vector[CNodeID] c_remote_reader_nodes + c_vector[CActorID] c_remote_readers + c_vector[int64_t] c_remote_num_readers + + for ref, node, reader, num_readers in zip( + remote_reader_refs, + remote_reader_nodes, + remote_readers, + remote_num_readers_per_node): + c_remote_reader_refs.push_back((ref).native()) + c_remote_reader_nodes.push_back(CNodeID.FromHex(node)) + c_remote_readers.push_back((reader).native()) + assert num_readers != 0 + c_remote_num_readers.push_back(num_readers) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker() - .ExperimentalRegisterMutableObjectWriter(c_writer_ref, - c_reader_node_id, - )) - if writer_node != reader_node: - with nogil: - check_status( - CCoreWorkerProcess.GetCoreWorker() - .ExperimentalRegisterMutableObjectReaderRemote(c_writer_ref, - c_reader_actor, - num_readers, - c_reader_ref - )) + .ExperimentalRegisterMutableObjectWriter( + c_writer_ref, + c_writer_node, + c_remote_reader_nodes, + )) + check_status( + CCoreWorkerProcess.GetCoreWorker() + .ExperimentalRegisterMutableObjectReaderRemote( + c_writer_ref, + c_remote_readers, + c_remote_num_readers, + c_remote_reader_refs + )) def experimental_channel_register_reader(self, ObjectRef object_ref): cdef: diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 68025c802a09..46fa23263b11 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1604,7 +1604,6 @@ def teardown(self, wait: bool): return logger.info("Tearing down compiled DAG") - outer._dag_submitter.close() outer._dag_output_fetcher.close() diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 812a6b359129..a4e57f74fd60 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -14,7 +14,6 @@ import pytest from ray.exceptions import RayChannelError, RayChannelTimeoutError -from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy import ray import ray._private import ray.cluster_utils @@ -1448,59 +1447,6 @@ def test_driver_and_actor_as_readers(ray_start_cluster): dag.experimental_compile() -def test_payload_large(ray_start_cluster): - cluster = ray_start_cluster - # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor). - first_node_handle = cluster.add_node(num_cpus=1) - # This node is for the reader. - second_node_handle = cluster.add_node(num_cpus=1) - ray.init(address=cluster.address) - cluster.wait_for_nodes() - - nodes = [first_node_handle.node_id, second_node_handle.node_id] - # We want to check that there are two nodes. Thus, we convert `nodes` to a set and - # then back to a list to remove duplicates. Then we check that the length of `nodes` - # is 2. - nodes = list(set(nodes)) - assert len(nodes) == 2 - - def create_actor(node): - return Actor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) - ).remote(0) - - def get_node_id(self): - return ray.get_runtime_context().get_node_id() - - driver_node = get_node_id(None) - nodes.remove(driver_node) - - a = create_actor(nodes[0]) - a_node = ray.get(a.__ray_call__.remote(get_node_id)) - assert a_node == nodes[0] - # Check that the driver and actor are on different nodes. - assert driver_node != a_node - - with InputNode() as i: - dag = a.echo.bind(i) - - compiled_dag = dag.experimental_compile() - - # Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that - # is a bit larger. - size = 1024 * 1024 * 600 - val = b"x" * size - - for i in range(3): - ref = compiled_dag.execute(val) - result = ray.get(ref) - assert result == val - - # Note: must teardown before starting a new Ray session, otherwise you'll get - # a segfault from the dangling monitor thread upon the new Ray init. - compiled_dag.teardown() - - @ray.remote class TestWorker: def add_one(self, value): diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 35d5ebc85eb0..124514684711 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -7,7 +7,7 @@ from ray.dag import InputNode, MultiOutputNode import ray.remote_function from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from ray.tests.conftest import * # noqa +from ray.tests.conftest import wait_for_condition # noqa if sys.platform != "linux" and sys.platform != "darwin": pytest.skip("Skipping, requires Linux or Mac.", allow_module_level=True) @@ -32,6 +32,7 @@ def _fail_if_needed(self): raise RuntimeError("injected fault") def inc(self, x): + print(self.i) self.i += x self.count += 1 self._fail_if_needed() @@ -43,6 +44,7 @@ def double_and_inc(self, x): return self.i def echo(self, x): + print("Echo run!") self.count += 1 self._fail_if_needed() return x @@ -69,100 +71,78 @@ def test_readers_on_different_nodes(ray_start_cluster): cluster = ray_start_cluster # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor) and # one of the readers. - first_node_handle = cluster.add_node(num_cpus=2) - # This node is for the other reader. - second_node_handle = cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=1) ray.init(address=cluster.address) + # This node is for the other reader. + cluster.add_node(num_cpus=1) + cluster.add_node(num_cpus=1) cluster.wait_for_nodes() + # Wait until nodes actually start, otherwise the code below will fail. + wait_for_condition(lambda: len(ray.nodes()) == 3) - nodes = [first_node_handle.node_id, second_node_handle.node_id] - # We want to check that the readers are on different nodes. Thus, we convert `nodes` - # to a set and then back to a list to remove duplicates. Then we check that the - # length of `nodes` is 2. - nodes = list(set(nodes)) - assert len(nodes) == 2 - - def create_actor(node): - return Actor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) - ).remote(0) - - a = create_actor(nodes[0]) - b = create_actor(nodes[1]) - actors = [a, b] + a = Actor.options(num_cpus=1).remote(0) + b = Actor.options(num_cpus=1).remote(0) + c = Actor.options(num_cpus=1).remote(0) + actors = [a, b, c] def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - a_node = nodes_check[0] - b_node = nodes_check[1] - assert a_node != b_node + assert len(set(nodes_check)) == 3 with InputNode() as inp: x = a.inc.bind(inp) y = b.inc.bind(inp) - dag = MultiOutputNode([x, y]) + z = c.inc.bind(inp) + dag = MultiOutputNode([x, y, z]) + + adag = dag.experimental_compile() + + for i in range(1, 10): + assert ray.get(adag.execute(1)) == [i, i, i] - with pytest.raises( - ValueError, - match="All reader actors must be on the same node.*", - ): - dag.experimental_compile() + adag.teardown() def test_bunch_readers_on_different_nodes(ray_start_cluster): cluster = ray_start_cluster - # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor) and - # two of the readers. - first_node_handle = cluster.add_node(num_cpus=3) - # This node is for the other two readers. - second_node_handle = cluster.add_node(num_cpus=2) + ACTORS_PER_NODE = 2 + NUM_REMOTE_NODES = 2 + cluster.add_node(num_cpus=ACTORS_PER_NODE) ray.init(address=cluster.address) + # This node is for the other two readers. + for _ in range(NUM_REMOTE_NODES): + cluster.add_node(num_cpus=ACTORS_PER_NODE) cluster.wait_for_nodes() - nodes = [first_node_handle.node_id, second_node_handle.node_id] - # We want to check that the readers are on different nodes. Thus, we convert `nodes` - # to a set and then back to a list to remove duplicates. Then we check that the - # length of `nodes` is 2. - nodes = list(set(nodes)) - assert len(nodes) == 2 + wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1) - def create_actor(node): - return Actor.options( - scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) - ).remote(0) - - a = create_actor(nodes[0]) - b = create_actor(nodes[0]) - c = create_actor(nodes[1]) - d = create_actor(nodes[1]) - actors = [a, b, c, d] + actors = [ + Actor.options(num_cpus=1).remote(0) + for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - a_node = nodes_check[0] - b_node = nodes_check[1] - c_node = nodes_check[2] - d_node = nodes_check[3] - assert a_node == b_node - assert b_node != c_node - assert c_node == d_node + assert len(set(nodes_check)) == NUM_REMOTE_NODES + 1 with InputNode() as inp: - w = a.inc.bind(inp) - x = b.inc.bind(inp) - y = c.inc.bind(inp) - z = d.inc.bind(inp) - dag = MultiOutputNode([w, x, y, z]) + outputs = [] + for actor in actors: + outputs.append(actor.inc.bind(inp)) + dag = MultiOutputNode(outputs) + + adag = dag.experimental_compile() + + for i in range(1, 10): + assert ray.get(adag.execute(1)) == [ + i for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] - with pytest.raises( - ValueError, - match="All reader actors must be on the same node.*", - ): - dag.experimental_compile() + adag.teardown() def test_pp(ray_start_cluster): @@ -207,37 +187,109 @@ def execute_model(self, val): compiled_dag.teardown() -def test_multi_reader_on_multi_node(ray_start_cluster): +def test_payload_large(ray_start_cluster): cluster = ray_start_cluster - head = cluster.add_node(num_cpus=0) + # This node is for the driver (including the CompiledDAG.DAGDriverProxyActor). + first_node_handle = cluster.add_node(num_cpus=1) + # This node is for the reader. + second_node_handle = cluster.add_node(num_cpus=1) ray.init(address=cluster.address) - worker_1 = cluster.add_node(num_cpus=1) - worker_2 = cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() - @ray.remote(num_cpus=1) - class Actor: - def get_node_id(self): - return ray.get_runtime_context().get_node_id() + nodes = [first_node_handle.node_id, second_node_handle.node_id] + # We want to check that there are two nodes. Thus, we convert `nodes` to a set and + # then back to a list to remove duplicates. Then we check that the length of `nodes` + # is 2. + nodes = list(set(nodes)) + assert len(nodes) == 2 + + def create_actor(node): + return Actor.options( + scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) + ).remote(0) + + def get_node_id(self): + return ray.get_runtime_context().get_node_id() + + driver_node = get_node_id(None) + nodes.remove(driver_node) + + a = create_actor(nodes[0]) + a_node = ray.get(a.__ray_call__.remote(get_node_id)) + assert a_node == nodes[0] + # Check that the driver and actor are on different nodes. + assert driver_node != a_node + + with InputNode() as i: + dag = a.echo.bind(i) + + compiled_dag = dag.experimental_compile() + + # Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that + # is a bit larger. + size = 1024 * 1024 * 600 + val = b"x" * size + + for i in range(3): + ref = compiled_dag.execute(val) + result = ray.get(ref) + assert result == val + + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() - def f(self, i): - print(self.get_node_id()) - return i - # 3 actors spread to nodes. - actors = [Actor.remote() for _ in range(2)] - node_ids = set(ray.get([actor.get_node_id.remote() for actor in actors])) - # Make sure actors are spread to 3 nodes. - assert len(node_ids) == 2 +@pytest.mark.parametrize("num_actors", [1, 4]) +@pytest.mark.parametrize("num_nodes", [1, 4]) +def test_multi_node_multi_reader_large_payload( + ray_start_cluster, num_actors, num_nodes, monkeypatch +): + # Set max grpc size to 5mb. + GRPC_MAX_SIZE = 1024 * 1024 * 5 + monkeypatch.setenv("RAY_max_grpc_message_size", str(GRPC_MAX_SIZE)) + cluster = ray_start_cluster + ACTORS_PER_NODE = num_actors + NUM_REMOTE_NODES = num_nodes + cluster.add_node(num_cpus=ACTORS_PER_NODE) + ray.init(address=cluster.address) + # This node is for the other two readers. + for _ in range(NUM_REMOTE_NODES): + cluster.add_node(num_cpus=ACTORS_PER_NODE) + cluster.wait_for_nodes() + + wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1) + + actors = [ + Actor.options(num_cpus=1).remote(0) + for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] + + def _get_node_id(self) -> "ray.NodeID": + return ray.get_runtime_context().get_node_id() + + nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(nodes_check)) == NUM_REMOTE_NODES + 1 with InputNode() as inp: outputs = [] for actor in actors: - outputs.append(actor.f.bind(inp)) - adag = MultiOutputNode(outputs) + outputs.append(actor.echo.bind(inp)) + dag = MultiOutputNode(outputs) - adag.experimental_compile() - print(ray.get(adag.execute(1))) - time.sleep(30) + compiled_dag = dag.experimental_compile() + + # Set the object size a little bigger than the gRPC size so that + # it triggers chunking and resizing. + size = GRPC_MAX_SIZE + (1024 * 1024 * 2) + val = b"x" * size + + for _ in range(3): + ref = compiled_dag.execute(val) + result = ray.get(ref) + assert result == [val for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))] + + compiled_dag.teardown() if __name__ == "__main__": diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 4c7f627d85e7..e2301b020f35 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -17,7 +17,8 @@ # entry/init points. logger = logging.getLogger(__name__) -DEFAULT_MAX_BUFFER_SIZE = int(100 * 1e6) # 100 mB +# SANG-TODO this is a bug. +DEFAULT_MAX_BUFFER_SIZE = int(1e6) # 100 mB # The min buffer size must be large enough to at least fit an instance of the # _ResizeChannel class along with any metadata. MIN_BUFFER_SIZE = int(1000) # 1000 bytes @@ -82,9 +83,11 @@ class _ResizeChannel: resize its own backing store. The class instance is sent through the channel. """ - def __init__(self, reader_ref: "ray.ObjectRef"): - # SANG-TODO - self._reader_ref = reader_ref + def __init__( + self, + _reader_refs: Dict[str, "ray.ObjectRef"] = None, + ): + self._reader_refs = _reader_refs class SharedMemoryType(ChannelOutputType): @@ -206,11 +209,19 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered self._reader_node_ids = _reader_node_ids or set() - # node_id -> reader ref. There's only 1 reader ref per node because - # it is shared by all actors on that node. - self._reader_refs: Dict[str, Tuple["ray.ObjectRef", ray.ActorID]] = _reader_refs or {} + # node_id -> reader references. Each node should have only 1 reader reference + # that's shared by all readers. + self._reader_refs: Dict[str, Tuple["ray.ObjectRef", ray.ActorID]] = ( + _reader_refs or {} + ) + + # Node ID -> a list of reader actors. self._node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) - self._num_readers = 0 + for reader, node_id in self._reader_and_node_list: + self._node_id_to_readers[node_id].append(reader) + + # Number of readers in a local node. + self._num_local_readers = 0 if _writer_ref is None: # We are the writer. Check that the passed handle matches the @@ -231,39 +242,46 @@ def __init__( self._writer_ref = _create_channel_ref(self, typ.buffer_size_bytes) self._create_reader_refs(reader_and_node_list, typ.buffer_size_bytes) - - assert len(self._reader_refs) > 0 else: assert ( _writer_node_id is not None ), "_writer_node_id must also be passed to the constructor when " "_writer_ref is." - assert ( - _reader_refs is not None - ), "_reader_refs must also be passed to the constructor when _writer_ref is." + assert _reader_refs is not None, ( + "_reader_refs must also be passed to the constructor " + "when _writer_ref is." + ) self._writer_ref = _writer_ref self._writer_node_id = _writer_node_id self._reader_node_ids = _reader_node_ids self._reader_refs = _reader_refs - node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) - for reader, node_id in self._reader_and_node_list: - node_id_to_readers[node_id].append(reader) - - # Find num_readers. We have local readers and 1 reader per remote node - # which listens to mutable object being changed and push the object to - # remote nodes. - assert self._num_readers == 0 - for node_id, readers in node_id_to_readers.items(): + assert self._num_local_readers == 0 + remote_node_exists = False + for node_id, readers in self._node_id_to_readers.items(): if self.is_local_node(node_id): - self._num_readers += len(readers) + self._num_local_readers += len(readers) else: - self._num_readers += 1 + remote_node_exists = True + # If remote node exists, we have 1 additional reader that listens + # to object changes and push them to remote nodes. + if remote_node_exists: + self._num_local_readers += 1 + # There must be at least 1 local reader + assert self._num_local_readers > 0 + + self._local_reader_ref: Optional["ray.ObjectRef"] = self._get_local_reader_ref( + self._reader_refs + ) - self._local_reader_ref = None - for node_id, reader_ref_and_reader_id in self._reader_refs.items(): - + def _get_local_reader_ref( + self, reader_refs: Dict[str, Tuple["ray.ObjectRef", ray.ActorID]] + ) -> Optional["ray.ObjectRef"]: + for reader_node_id, reader_ref_and_reader_id in reader_refs.items(): + if self.is_local_node(reader_node_id): + return reader_ref_and_reader_id[0] + return None def _create_reader_refs( self, @@ -273,11 +291,7 @@ def _create_reader_refs( # TODO(jhumphri): Free the current reader ref once the reference to it is # destroyed below. - node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) - for reader, node_id in reader_and_node_list: - node_id_to_readers[node_id].append(reader) - - for node_id, readers in node_id_to_readers.items(): + for node_id, readers in self._node_id_to_readers.items(): if not self.is_local_node(node_id): # Find 1 reader in a remote node to create a reference that's # shared by all readers. @@ -288,9 +302,11 @@ def _create_reader_refs( reader._actor_id, ) else: - self._reader_refs[node_id] = (self._writer_ref, self._writer._actor_id) - - assert len(self._reader_refs) == len(node_id_to_readers) + writer_id = ray.ActorID.nil() + if self._writer is not None: + writer_id = self._writer._actor_id + self._reader_refs[node_id] = (self._writer_ref, writer_id) + assert len(self._reader_refs) == len(self._node_id_to_readers) # We need to register the new writer_ref. self._writer_registered = False @@ -310,20 +326,36 @@ def ensure_registered_as_writer(self) -> None: "the writer is on." ) - node_id_to_readers: Dict[str, "ray.actor.ActorHandle"] = defaultdict(list) - for reader, node_id in self._reader_and_node_list: - node_id_to_readers[node_id].append(reader) - + remote_reader_refs: List["ray.ObjectRef"] = [] + remote_reader_node_ids: List[str] = [] + remote_reader_ids: List[str] = [] + remote_num_readers_per_node: List[int] = [] for reader_node_id, reader_ref_and_reader_id in self._reader_refs.items(): + if self.is_local_node(reader_node_id): + continue reader_ref, reader_id = reader_ref_and_reader_id - self._worker.core_worker.experimental_channel_register_writer( - self._writer_ref, - reader_ref, - self._writer_node_id, - reader_node_id, - reader_id, - len(node_id_to_readers[reader_node_id]), + remote_reader_refs.append(reader_ref) + remote_reader_ids.append(reader_id) + remote_reader_node_ids.append(reader_node_id) + remote_num_readers_per_node.append( + len(self._node_id_to_readers[reader_node_id]) ) + + assert ( + len(remote_reader_node_ids) + == len(remote_reader_ids) + == len(remote_reader_refs) + == len(remote_num_readers_per_node) + ) + + self._worker.core_worker.experimental_channel_register_writer( + self._writer_ref, + self._writer_node_id, + remote_reader_refs, + remote_reader_node_ids, + remote_reader_ids, + remote_num_readers_per_node, + ) self._writer_registered = True def ensure_registered_as_reader(self) -> None: @@ -332,8 +364,9 @@ def ensure_registered_as_reader(self) -> None: for reader_node_id, reader_ref_and_reader_id in self._reader_refs.items(): if self.is_local_node(reader_node_id): + reader_ref = reader_ref_and_reader_id[0] self._worker.core_worker.experimental_channel_register_reader( - reader_ref_and_reader_id[0], + reader_ref, ) self._reader_registered = True @@ -391,26 +424,30 @@ def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): self._typ.buffer_size_bytes = size # TODO(jhumphri): Free the current writer ref once the reference to it is # destroyed below. + # TODO(sang): Support different policies such as 2X buffer size. prev_writer_ref = self._writer_ref self._writer_ref = _create_channel_ref(self, self._typ.buffer_size_bytes) - - # SANG-TODO add tests. Should only handle reader that's relevant. self._create_reader_refs( self._reader_and_node_list, self._typ.buffer_size_bytes ) + self._local_reader_ref = self._get_local_reader_ref(self._reader_refs) # Write a special message to the channel so that the readers know to # stop using the current reader_ref. - special_message = _ResizeChannel(self._reader_ref) + special_message = _ResizeChannel(self._reader_refs) special_message_serialized = ( self._worker.get_serialization_context().serialize(special_message) ) self._worker.core_worker.experimental_channel_put_serialized( special_message_serialized, prev_writer_ref, - self._num_readers, + self._num_local_readers, timeout_ms, ) + # # Clean the previous ref since it won't be used. + # self._worker.core_worker.experimental_channel_set_error( + # prev_writer_ref + # ) def write(self, value: Any, timeout: Optional[float] = None) -> None: self.ensure_registered_as_writer() @@ -446,7 +483,7 @@ def write(self, value: Any, timeout: Optional[float] = None) -> None: self._worker.core_worker.experimental_channel_put_serialized( serialized_value, self._writer_ref, - self._num_readers, + self._num_local_readers, timeout_ms, ) @@ -458,12 +495,12 @@ def read(self, timeout: Optional[float] = None) -> Any: start_time = time.monotonic() ret = self._worker.get_objects( - [self._reader_ref], timeout=timeout, return_exceptions=True + [self._local_reader_ref], timeout=timeout, return_exceptions=True )[0][0] - # SANG-TODO if isinstance(ret, _ResizeChannel): - self._reader_ref = ret._reader_ref + self._reader_refs = ret._reader_refs + self._local_reader_ref = self._get_local_reader_ref(self._reader_refs) # We need to register the new reader_ref. self._reader_registered = False self.ensure_registered_as_reader() @@ -471,7 +508,7 @@ def read(self, timeout: Optional[float] = None) -> Any: timeout -= time.monotonic() - start_time timeout = max(timeout, 0) ret = self._worker.get_objects( - [self._reader_ref], timeout=timeout, return_exceptions=True + [self._local_reader_ref], timeout=timeout, return_exceptions=True )[0][0] return ret @@ -484,13 +521,16 @@ def close(self) -> None: self._worker.core_worker.experimental_channel_set_error(self._writer_ref) is_local_node_reader = False - for node_id in self._reader_node_id: + for node_id in self._reader_node_ids: if self.is_local_node(node_id): is_local_node_reader = True if is_local_node_reader: self.ensure_registered_as_reader() - self._worker.core_worker.experimental_channel_set_error(self._reader_ref) + for reader_ref_and_reader in self._reader_refs.values(): + self._worker.core_worker.experimental_channel_set_error( + reader_ref_and_reader[0] + ) @PublicAPI(stability="alpha") diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 235ae16d34b5..6ac8f067ceab 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -265,11 +265,15 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus ExperimentalChannelSetError( const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectWriter( - const CObjectID &object_id, const CNodeID *node_id) + const CObjectID &writer_object_id, + const CNodeID &writer_node_id, + const c_vector[CNodeID] &remote_reader_node_ids) CRayStatus ExperimentalRegisterMutableObjectReader(const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectReaderRemote( - const CObjectID &object_id, const CActorID &reader_actor, - int64_t num_readers, const CObjectID &reader_ref) + const CObjectID &object_id, + const c_vector[CActorID] &remote_reader_actors, + c_vector[int64_t] remote_num_readers, + const c_vector[CObjectID] &remote_reader_refs) CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, const unique_ptr[CAddress] &owner_address) CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9668b97589a6..ffe1503ab78d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1487,58 +1487,64 @@ Status CoreWorker::ExperimentalChannelReadRelease( return experimental_mutable_object_provider_->ReadRelease(object_ids[0]); } -Status CoreWorker::ExperimentalRegisterMutableObjectWriter(const ObjectID &object_id, - const NodeID *node_id) { - experimental_mutable_object_provider_->RegisterWriterChannel(object_id, node_id); +Status CoreWorker::ExperimentalRegisterMutableObjectWriter( + const ObjectID &writer_object_id, + const NodeID &writer_node_id, + const std::vector &remote_reader_node_ids) { + experimental_mutable_object_provider_->RegisterWriterChannel( + writer_object_id, writer_node_id, remote_reader_node_ids); return Status::OK(); } Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( const ObjectID &writer_object_id, - const ActorID &reader_actor, - int64_t num_readers, - const ObjectID &reader_object_id) { - rpc::Address addr; - { - std::promise promise; - // SANG-TODO Should introduce the timeout or pass it. - RAY_CHECK(gcs_client_->Actors() - .AsyncGet(reader_actor, - [&addr, &promise]( - Status status, - const std::optional &result) { - RAY_CHECK(result); - if (result) { - addr.set_ip_address(result->address().ip_address()); - addr.set_port(result->address().port()); - addr.set_worker_id(result->address().worker_id()); - } - promise.set_value(); - }) - .ok()); - promise.get_future().wait(); + const std::vector &remote_reader_actors, + std::vector remote_num_readers, + const std::vector &remote_reader_object_ids) { + if (remote_reader_actors.size() == 0) { + return Status::OK(); } - { + std::vector addrs; + for (const auto &actor_id : remote_reader_actors) { + const auto &addr = actor_task_submitter_->GetActorAddress(actor_id); + // It can happen if an actor is not created yet. We assume the API is called only when + // an actor is alive, which is true now. + RAY_CHECK(addr.has_value()); + addrs.push_back(*addr); + } + + std::shared_ptr num_replied = std::make_shared(0); + size_t num_requests = addrs.size(); + RAY_CHECK_EQ(addrs.size(), remote_reader_object_ids.size()); + std::promise promise; + for (auto i = 0; i < addrs.size(); i++) { + const auto &addr = addrs[i]; + const auto &reader_object_id = remote_reader_object_ids[i]; + const auto &num_reader = remote_num_readers[i]; + std::shared_ptr conn = core_worker_client_pool_->GetOrConnect(addr); rpc::RegisterMutableObjectReaderRequest req; req.set_writer_object_id(writer_object_id.Binary()); - req.set_num_readers(num_readers); + req.set_num_readers(num_reader); req.set_reader_object_id(reader_object_id.Binary()); rpc::RegisterMutableObjectReaderReply reply; - std::promise promise; + // TODO(sang): Add timeout. conn->RegisterMutableObjectReader( req, - [&promise](const Status &status, - const rpc::RegisterMutableObjectReaderReply &reply) { + [&promise, num_replied, num_requests, addr]( + const Status &status, const rpc::RegisterMutableObjectReaderReply &reply) { RAY_CHECK(status.ok()); - promise.set_value(); + *num_replied += 1; + if (*num_replied == num_requests) { + promise.set_value(); + } }); - promise.get_future().wait(); } + promise.get_future().wait(); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e281c7879df9..993e92b7e6d7 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -736,13 +736,20 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Experimental method for mutable objects. Registers a writer channel. /// + /// The API is not idempotent. + /// + /// SANG-TODO update. /// \param[in] object_id The ID of the object. /// \param[in] node_id If non-NULL, sends each write to the readers on node `node_id`. - Status ExperimentalRegisterMutableObjectWriter(const ObjectID &object_id, - const NodeID *node_id); + Status ExperimentalRegisterMutableObjectWriter( + const ObjectID &writer_object_id, + const NodeID &writer_node_id, + const std::vector &remote_reader_node_ids); /// Experimental method for mutable objects. Registers a reader channel. /// + /// The API is not idempotent. + /// /// \param[in] object_id The ID of the object. Status ExperimentalRegisterMutableObjectReader(const ObjectID &object_id); @@ -750,15 +757,17 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// that is written to on this node to the corresponding mutable object that is read on /// the node that `reader_actor` is on. /// + /// SANG-TODO update docstring. /// \param[in] writer_object_id The ID of the object that is written on this node. /// \param[in] reader_actor The actor that reads the object. /// \param[in] num_readers The total number of readers. /// \param[in] reader_object_id The ID of the corresponding object that is read on the /// remote node. - Status ExperimentalRegisterMutableObjectReaderRemote(const ObjectID &writer_object_id, - const ActorID &reader_actor, - int64_t num_readers, - const ObjectID &reader_object_id); + Status ExperimentalRegisterMutableObjectReaderRemote( + const ObjectID &writer_object_id, + const std::vector &remote_reader_actors, + std::vector remote_num_readers, + const std::vector &remote_reader_object_ids); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. diff --git a/src/ray/core_worker/experimental_mutable_object_manager.cc b/src/ray/core_worker/experimental_mutable_object_manager.cc index 4e8aa20190ed..ef2120a80a1c 100644 --- a/src/ray/core_worker/experimental_mutable_object_manager.cc +++ b/src/ray/core_worker/experimental_mutable_object_manager.cc @@ -331,7 +331,7 @@ Status MutableObjectManager::ReadAcquire(const ObjectID &object_id, channel->reading = true; int64_t version_read = 0; Status s = object->header->ReadAcquire( - sem, channel->next_version_to_read, version_read, timeout_point); + object_id, sem, channel->next_version_to_read, version_read, timeout_point); if (!s.ok()) { RAY_LOG(DEBUG) << "ReadAcquire error was set, returning " << object_id; // Failed because the error bit was set on the mutable object. diff --git a/src/ray/core_worker/experimental_mutable_object_provider.cc b/src/ray/core_worker/experimental_mutable_object_provider.cc index 4433cf4bf7ab..9229a084fcd3 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.cc +++ b/src/ray/core_worker/experimental_mutable_object_provider.cc @@ -39,41 +39,58 @@ MutableObjectProvider::~MutableObjectProvider() { } } -void MutableObjectProvider::RegisterWriterChannel(const ObjectID &object_id, - const NodeID *node_id) { +void MutableObjectProvider::RegisterWriterChannel( + const ObjectID &writer_object_id, + const NodeID &writer_node_id, + const std::vector &remote_reader_node_ids) { { - std::unique_ptr object; - RAY_CHECK_OK(plasma_->GetExperimentalMutableObject(object_id, &object)); - RAY_CHECK_OK( - object_manager_->RegisterChannel(object_id, std::move(object), /*reader=*/false)); + std::unique_ptr writer_object; + RAY_CHECK_OK(plasma_->GetExperimentalMutableObject(writer_object_id, &writer_object)); + RAY_CHECK_OK(object_manager_->RegisterChannel( + writer_object_id, std::move(writer_object), /*reader=*/false)); // `object` is now a nullptr. } - // SANG-TODO Share a thread instead of having it per node. - if (node_id) { - // Start a thread that repeatedly listens for values on this object and then sends - // them via RPC to the remote reader. - io_contexts_.push_back(std::make_unique()); - instrumented_io_context &io_context = *io_contexts_.back(); - io_works_.push_back( - std::make_unique< - boost::asio::executor_work_guard>( - io_context.get_executor())); + if (remote_reader_node_ids.size() == 0) { + return; + } + + std::shared_ptr>> + remote_readers = + std::make_shared>>(); + // TODO(sang): Currently, these attributes are not cleaned up. + // Start a thread that repeatedly listens for values on this object and then sends + // them via RPC to the remote reader. + io_contexts_.push_back(std::make_unique()); + instrumented_io_context &io_context = *io_contexts_.back(); + io_works_.push_back( + std::make_unique< + boost::asio::executor_work_guard>( + io_context.get_executor())); + + // Find remote readers. + for (const auto &node_id : remote_reader_node_ids) { client_call_managers_.push_back(std::make_unique(io_context)); std::shared_ptr reader = - raylet_client_factory_(*node_id, *client_call_managers_.back()); + raylet_client_factory_(node_id, *client_call_managers_.back()); RAY_CHECK(reader); - // TODO(jhumphri): Extend this to support multiple channels. Currently, we must have - // one thread per channel because the thread blocks on the channel semaphore. - - io_context.post( - [this, &io_context, object_id, reader]() { - PollWriterClosure(io_context, object_id, reader); - }, - "experimental::MutableObjectProvider.PollWriter"); - io_threads_.push_back(std::make_unique( - &MutableObjectProvider::RunIOContext, this, std::ref(io_context))); + remote_readers->push_back(reader); } + + // TODO(jhumphri): Extend this to support multiple channels. Currently, we must have + // one thread per channel because the thread blocks on the channel semaphore. + // TODO(sang): We currently create a thread per object id. It is not scalable. + // We should instead just use a pool of threads. + io_context.post( + [this, + &io_context, + writer_object_id, + remote_readers = std::move(remote_readers)]() { + PollWriterClosure(io_context, writer_object_id, remote_readers); + }, + "experimental::MutableObjectProvider.PollWriter"); + io_threads_.push_back(std::make_unique( + &MutableObjectProvider::RunIOContext, this, std::ref(io_context))); } void MutableObjectProvider::RegisterReaderChannel(const ObjectID &object_id) { @@ -124,7 +141,7 @@ void MutableObjectProvider::HandlePushMutableObject( tmp_written_so_far = written_so_far_[writer_object_id]; written_so_far_[writer_object_id] += chunk_size; if (written_so_far_[writer_object_id] == total_size) { - written_so_far_[writer_object_id] = 0; + written_so_far_.erase(written_so_far_.find(writer_object_id)); } } @@ -200,12 +217,14 @@ Status MutableObjectProvider::GetChannelStatus(const ObjectID &object_id, void MutableObjectProvider::PollWriterClosure( instrumented_io_context &io_context, - const ObjectID &object_id, - std::shared_ptr reader) { + const ObjectID &writer_object_id, + std::shared_ptr>> + remote_readers) { + // NOTE: There's only 1 PollWriterClosure at any time in a single thread. std::shared_ptr object; // The corresponding ReadRelease() will be automatically called when // `object` goes out of scope. - Status status = object_manager_->ReadAcquire(object_id, object); + Status status = object_manager_->ReadAcquire(writer_object_id, object); // Check if the thread returned from ReadAcquire() because the process is exiting, not // because there is something to read. if (status.code() == StatusCode::ChannelError) { @@ -217,19 +236,31 @@ void MutableObjectProvider::PollWriterClosure( RAY_CHECK(object->GetData()); RAY_CHECK(object->GetMetadata()); - reader->PushMutableObject( - object_id, - object->GetData()->Size(), - object->GetMetadata()->Size(), - object->GetData()->Data(), - [this, &io_context, object_id, reader](const Status &status, - const rpc::PushMutableObjectReply &reply) { - io_context.post( - [this, &io_context, object_id, reader]() { - PollWriterClosure(io_context, object_id, reader); - }, - "experimental::MutableObjectProvider.PollWriter"); - }); + std::shared_ptr num_replied = std::make_shared(0); + for (const auto &reader : *remote_readers) { + reader->PushMutableObject( + writer_object_id, + object->GetData()->Size(), + object->GetMetadata()->Size(), + object->GetData()->Data(), + [this, &io_context, writer_object_id, remote_readers, num_replied]( + const Status &status, const rpc::PushMutableObjectReply &reply) { + *num_replied += 1; + if (!status.ok()) { + RAY_LOG(ERROR) + << "Failed to transfer object to a remote node for an object id " + << writer_object_id << ". It can cause hang."; + } + + if (*num_replied == remote_readers->size()) { + io_context.post( + [this, &io_context, writer_object_id, remote_readers]() { + PollWriterClosure(io_context, writer_object_id, remote_readers); + }, + "experimental::MutableObjectProvider.PollWriter"); + } + }); + } } void MutableObjectProvider::RunIOContext(instrumented_io_context &io_context) { diff --git a/src/ray/core_worker/experimental_mutable_object_provider.h b/src/ray/core_worker/experimental_mutable_object_provider.h index ddf248f3994e..a923adadec23 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.h +++ b/src/ray/core_worker/experimental_mutable_object_provider.h @@ -44,9 +44,12 @@ class MutableObjectProvider { /// Registers a writer channel for `object_id` on this node. On each write to this /// channel, the write will be sent via RPC to node `node_id`. + /// SANG-TODO change it. /// \param[in] object_id The ID of the object. /// \param[in] node_id The ID of the node to write to. - void RegisterWriterChannel(const ObjectID &object_id, const NodeID *node_id); + void RegisterWriterChannel(const ObjectID &writer_object_id, + const NodeID &writer_node_id, + const std::vector &remote_reader_node_ids); /// Handles an RPC request from another note to register a mutable object on this node. /// The remote node writes the object and this node reads the object. This node is @@ -149,9 +152,12 @@ class MutableObjectProvider { // Listens for local changes to `object_id` and sends the changes to remote nodes via // the network. - void PollWriterClosure(instrumented_io_context &io_context, - const ObjectID &object_id, - std::shared_ptr reader); + // SANG-TODO update the docstring. + void PollWriterClosure( + instrumented_io_context &io_context, + const ObjectID &writer_object_id, + std::shared_ptr>> + remote_readers); // Kicks off `io_context`. void RunIOContext(instrumented_io_context &io_context); diff --git a/src/ray/core_worker/transport/actor_task_submitter.cc b/src/ray/core_worker/transport/actor_task_submitter.cc index c4bda9007ca1..ccda65a625c8 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.cc +++ b/src/ray/core_worker/transport/actor_task_submitter.cc @@ -701,6 +701,23 @@ bool ActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const { return (iter != client_queues_.end() && iter->second.rpc_client); } +std::optional ActorTaskSubmitter::GetActorAddress( + const ActorID &actor_id) const { + absl::MutexLock lock(&mu_); + + auto iter = client_queues_.find(actor_id); + if (iter == client_queues_.end()) { + return std::nullopt; + } + + const auto &rpc_client = iter->second.rpc_client; + if (rpc_client == nullptr) { + return std::nullopt; + } + + return iter->second.rpc_client->Addr(); +} + bool ActorTaskSubmitter::PendingTasksFull(const ActorID &actor_id) const { absl::MutexLock lock(&mu_); auto it = client_queues_.find(actor_id); diff --git a/src/ray/core_worker/transport/actor_task_submitter.h b/src/ray/core_worker/transport/actor_task_submitter.h index b5d470730ac4..e5d801e79cce 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.h +++ b/src/ray/core_worker/transport/actor_task_submitter.h @@ -183,6 +183,9 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// \return Whether this actor is alive. bool IsActorAlive(const ActorID &actor_id) const; + /// SANG-TODO update docstring. + std::optional GetActorAddress(const ActorID &actor_id) const; + /// Get the local actor state. nullopt if the state is unknown. std::optional GetLocalActorState( const ActorID &actor_id) const; diff --git a/src/ray/object_manager/common.cc b/src/ray/object_manager/common.cc index 1d8664351d05..c67f18a227a6 100644 --- a/src/ray/object_manager/common.cc +++ b/src/ray/object_manager/common.cc @@ -162,6 +162,7 @@ Status PlasmaObjectHeader::WriteRelease(Semaphores &sem) { } Status PlasmaObjectHeader::ReadAcquire( + const ObjectID &object_id, Semaphores &sem, int64_t version_to_read, int64_t &version_read, @@ -177,8 +178,8 @@ Status PlasmaObjectHeader::ReadAcquire( sched_yield(); // We need to get the desired version before timeout if (timeout_point && std::chrono::steady_clock::now() >= *timeout_point) { - return Status::ChannelTimeoutError( - "Timed out waiting for object available to read."); + return Status::ChannelTimeoutError(absl::StrCat( + "Timed out waiting for object available to read. ObjectID: ", object_id.Hex())); } RAY_RETURN_NOT_OK(TryToAcquireSemaphore(sem.header_sem, timeout_point)); } @@ -255,6 +256,7 @@ Status PlasmaObjectHeader::WriteRelease(Semaphores &sem) { } Status PlasmaObjectHeader::ReadAcquire( + const ObjectID &object_id, Semaphores &sem, int64_t version_to_read, int64_t &version_read, diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index b3b5f791b55d..1f9fa91dd4b1 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -152,6 +152,7 @@ struct PlasmaObjectHeader { /// Blocks until the given version is ready to read. Returns false if the /// maximum number of readers have already read the requested version. /// + /// \param[in] object_id ObjectID to acquire a lock from. /// \param[in] sem The semaphores for this channel. /// \param[in] read_version The version to read. /// \param[out] version_read For normal immutable objects, this will be set to @@ -163,7 +164,8 @@ struct PlasmaObjectHeader { /// TimedOut immediately without blocking. /// \return Whether the correct version was read and there were still /// reads remaining. - Status ReadAcquire(Semaphores &sem, + Status ReadAcquire(const ObjectID &object_id, + Semaphores &sem, int64_t version_to_read, int64_t &version_read, const std::unique_ptr diff --git a/src/ray/object_manager/plasma/test/mutable_object_test.cc b/src/ray/object_manager/plasma/test/mutable_object_test.cc index ecdbc2edfa7c..68d12f320e51 100644 --- a/src/ray/object_manager/plasma/test/mutable_object_test.cc +++ b/src/ray/object_manager/plasma/test/mutable_object_test.cc @@ -79,7 +79,8 @@ void Read(PlasmaObjectHeader *header, int64_t version_to_read = 1; for (size_t i = 0; i < num_reads; i++) { int64_t version_read = 0; - if (!header->ReadAcquire(sem, version_to_read, version_read).ok()) { + if (!header->ReadAcquire(ObjectID::FromRandom(), sem, version_to_read, version_read) + .ok()) { data_results.push_back("error"); metadata_results.push_back("error"); return; diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index f1ecb239e55d..938f20fab80e 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -426,7 +426,7 @@ void raylet::RayletClient::PushMutableObject( const ray::rpc::ClientCallback &callback) { // Ray sets the gRPC max payload size to ~512 MiB. We set the max chunk size to a // slightly lower value to allow extra padding just in case. - static constexpr uint64_t kMaxGrpcPayloadSize = 1024 * 1024 * 500; // 500 MiB. + uint64_t kMaxGrpcPayloadSize = RayConfig::instance().max_grpc_message_size() * 0.98; uint64_t total_size = data_size + metadata_size; uint64_t total_num_chunks = total_size / kMaxGrpcPayloadSize; // If `total_size` is not a multiple of `kMaxGrpcPayloadSize`, then we need to send an From d947ccb178ef40fe761ee7e40f42eebbd2a3e828 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Fri, 6 Sep 2024 01:36:04 -0700 Subject: [PATCH 05/12] done --- python/ray/_raylet.pyx | 3 --- .../tests/experimental/test_multi_node_dag.py | 4 ++-- .../channel/shared_memory_channel.py | 6 +++--- python/ray/includes/libcoreworker.pxd | 1 - src/ray/core_worker/core_worker.cc | 8 +++----- src/ray/core_worker/core_worker.h | 20 +++++++++---------- .../experimental_mutable_object_provider.cc | 4 +--- .../experimental_mutable_object_provider.h | 14 +++++++------ .../test/mutable_object_provider_test.cc | 10 +++++----- .../transport/actor_task_submitter.h | 3 ++- 10 files changed, 34 insertions(+), 39 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 06065734b928..64017481d12c 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3687,14 +3687,12 @@ cdef class CoreWorker: def experimental_channel_register_writer(self, ObjectRef writer_ref, - writer_node, remote_reader_refs, remote_reader_nodes, remote_readers, remote_num_readers_per_node): cdef: CObjectID c_writer_ref = writer_ref.native() - CNodeID c_writer_node = CNodeID.FromHex(writer_node) c_vector[CObjectID] c_remote_reader_refs c_vector[CNodeID] c_remote_reader_nodes c_vector[CActorID] c_remote_readers @@ -3715,7 +3713,6 @@ cdef class CoreWorker: check_status(CCoreWorkerProcess.GetCoreWorker() .ExperimentalRegisterMutableObjectWriter( c_writer_ref, - c_writer_node, c_remote_reader_nodes, )) check_status( diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 124514684711..8c1758dcc0f1 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -7,7 +7,8 @@ from ray.dag import InputNode, MultiOutputNode import ray.remote_function from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy -from ray.tests.conftest import wait_for_condition # noqa +from ray.tests.conftest import * # noqa +from ray.tests.conftest import wait_for_condition if sys.platform != "linux" and sys.platform != "darwin": pytest.skip("Skipping, requires Linux or Mac.", allow_module_level=True) @@ -44,7 +45,6 @@ def double_and_inc(self, x): return self.i def echo(self, x): - print("Echo run!") self.count += 1 self._fail_if_needed() return x diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index e2301b020f35..3eb349aedb5c 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -17,7 +17,6 @@ # entry/init points. logger = logging.getLogger(__name__) -# SANG-TODO this is a bug. DEFAULT_MAX_BUFFER_SIZE = int(1e6) # 100 mB # The min buffer size must be large enough to at least fit an instance of the # _ResizeChannel class along with any metadata. @@ -350,7 +349,6 @@ def ensure_registered_as_writer(self) -> None: self._worker.core_worker.experimental_channel_register_writer( self._writer_ref, - self._writer_node_id, remote_reader_refs, remote_reader_node_ids, remote_reader_ids, @@ -444,7 +442,9 @@ def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): self._num_local_readers, timeout_ms, ) - # # Clean the previous ref since it won't be used. + # TODO(sang): Clean the previous ref that won't be used. + # Right now, if we just close it here, it will not work because + # of race conditions. # self._worker.core_worker.experimental_channel_set_error( # prev_writer_ref # ) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 6ac8f067ceab..995603d55ed6 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -266,7 +266,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectWriter( const CObjectID &writer_object_id, - const CNodeID &writer_node_id, const c_vector[CNodeID] &remote_reader_node_ids) CRayStatus ExperimentalRegisterMutableObjectReader(const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectReaderRemote( diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index ffe1503ab78d..f9ddb3d98f66 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1488,11 +1488,9 @@ Status CoreWorker::ExperimentalChannelReadRelease( } Status CoreWorker::ExperimentalRegisterMutableObjectWriter( - const ObjectID &writer_object_id, - const NodeID &writer_node_id, - const std::vector &remote_reader_node_ids) { - experimental_mutable_object_provider_->RegisterWriterChannel( - writer_object_id, writer_node_id, remote_reader_node_ids); + const ObjectID &writer_object_id, const std::vector &remote_reader_node_ids) { + experimental_mutable_object_provider_->RegisterWriterChannel(writer_object_id, + remote_reader_node_ids); return Status::OK(); } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 993e92b7e6d7..f6036484b02a 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -738,12 +738,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// The API is not idempotent. /// - /// SANG-TODO update. - /// \param[in] object_id The ID of the object. - /// \param[in] node_id If non-NULL, sends each write to the readers on node `node_id`. + /// \param[in] writer_object_id The ID of the object. + /// \param[in] remote_reader_node_ids The list of remote reader's node ids. Status ExperimentalRegisterMutableObjectWriter( const ObjectID &writer_object_id, - const NodeID &writer_node_id, const std::vector &remote_reader_node_ids); /// Experimental method for mutable objects. Registers a reader channel. @@ -755,14 +753,16 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Experimental method for mutable objects. Registers a mapping from a mutable object /// that is written to on this node to the corresponding mutable object that is read on - /// the node that `reader_actor` is on. + /// the node that `remote_reader_actors` is on. + /// + /// The API assumes `remote_reader_actors`, `remote_num_readers`, and + /// `remote_reader_object_ids` has the same order and length. /// - /// SANG-TODO update docstring. /// \param[in] writer_object_id The ID of the object that is written on this node. - /// \param[in] reader_actor The actor that reads the object. - /// \param[in] num_readers The total number of readers. - /// \param[in] reader_object_id The ID of the corresponding object that is read on the - /// remote node. + /// \param[in] remote_reader_actors The list of actors that read the object in remote + /// nodes. \param[in] remote_num_readers A list of the total number of readers per each + /// remote node. \param[in] remote_reader_object_ids A list of IDs of the corresponding + /// object that is read on the remote node. Status ExperimentalRegisterMutableObjectReaderRemote( const ObjectID &writer_object_id, const std::vector &remote_reader_actors, diff --git a/src/ray/core_worker/experimental_mutable_object_provider.cc b/src/ray/core_worker/experimental_mutable_object_provider.cc index 9229a084fcd3..9dea315d957a 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.cc +++ b/src/ray/core_worker/experimental_mutable_object_provider.cc @@ -40,9 +40,7 @@ MutableObjectProvider::~MutableObjectProvider() { } void MutableObjectProvider::RegisterWriterChannel( - const ObjectID &writer_object_id, - const NodeID &writer_node_id, - const std::vector &remote_reader_node_ids) { + const ObjectID &writer_object_id, const std::vector &remote_reader_node_ids) { { std::unique_ptr writer_object; RAY_CHECK_OK(plasma_->GetExperimentalMutableObject(writer_object_id, &writer_object)); diff --git a/src/ray/core_worker/experimental_mutable_object_provider.h b/src/ray/core_worker/experimental_mutable_object_provider.h index a923adadec23..c0fcd86bd040 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.h +++ b/src/ray/core_worker/experimental_mutable_object_provider.h @@ -44,11 +44,10 @@ class MutableObjectProvider { /// Registers a writer channel for `object_id` on this node. On each write to this /// channel, the write will be sent via RPC to node `node_id`. - /// SANG-TODO change it. + /// /// \param[in] object_id The ID of the object. - /// \param[in] node_id The ID of the node to write to. + /// \param[in] remote_reader_node_ids The list of remote reader's node ids. void RegisterWriterChannel(const ObjectID &writer_object_id, - const NodeID &writer_node_id, const std::vector &remote_reader_node_ids); /// Handles an RPC request from another note to register a mutable object on this node. @@ -150,9 +149,12 @@ class MutableObjectProvider { ObjectID local_object_id; }; - // Listens for local changes to `object_id` and sends the changes to remote nodes via - // the network. - // SANG-TODO update the docstring. + /// Listens for local changes to `object_id` and sends the changes to remote nodes via + /// the network. + /// + /// \param[in] io_context The IO context. + /// \param[in] writer_object_id The object ID of the writer. + /// \param[in] remote_readers A list of remote reader clients. void PollWriterClosure( instrumented_io_context &io_context, const ObjectID &writer_object_id, diff --git a/src/ray/core_worker/test/mutable_object_provider_test.cc b/src/ray/core_worker/test/mutable_object_provider_test.cc index faaaed59a737..761b9d893eaf 100644 --- a/src/ray/core_worker/test/mutable_object_provider_test.cc +++ b/src/ray/core_worker/test/mutable_object_provider_test.cc @@ -153,7 +153,7 @@ TEST(MutableObjectProvider, RegisterWriterChannel) { MutableObjectProvider provider( plasma, /*factory=*/absl::bind_front(GetTestInterface, interface)); - provider.RegisterWriterChannel(object_id, &node_id); + provider.RegisterWriterChannel(object_id, {&node_id}); std::shared_ptr data; EXPECT_EQ(provider @@ -179,7 +179,7 @@ TEST(MutableObjectProvider, MutableObjectBufferReadRelease) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider @@ -239,7 +239,7 @@ TEST(MutableObjectProvider, MutableObjectBufferSetError) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider @@ -294,7 +294,7 @@ TEST(MutableObjectProvider, MutableObjectBufferSetErrorBeforeWriteRelease) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider @@ -349,7 +349,7 @@ TEST(MutableObjectProvider, MutableObjectBufferSetErrorBeforeReadRelease) { auto plasma = std::make_shared(); MutableObjectProvider provider(plasma, /*factory=*/nullptr); - provider.RegisterWriterChannel(object_id, nullptr); + provider.RegisterWriterChannel(object_id, {}); std::shared_ptr data; EXPECT_EQ(provider diff --git a/src/ray/core_worker/transport/actor_task_submitter.h b/src/ray/core_worker/transport/actor_task_submitter.h index e5d801e79cce..6355e62a4781 100644 --- a/src/ray/core_worker/transport/actor_task_submitter.h +++ b/src/ray/core_worker/transport/actor_task_submitter.h @@ -183,7 +183,8 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface { /// \return Whether this actor is alive. bool IsActorAlive(const ActorID &actor_id) const; - /// SANG-TODO update docstring. + /// Get the given actor id's address. + /// It returns nullopt if the actor's address is not reported. std::optional GetActorAddress(const ActorID &actor_id) const; /// Get the local actor state. nullopt if the state is unknown. From d7f9433e5403bceeda2d92b7d0a9d484c3528807 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 9 Sep 2024 00:09:32 -0700 Subject: [PATCH 06/12] Addressed code review. --- .../tests/experimental/test_multi_node_dag.py | 18 ++-- .../channel/shared_memory_channel.py | 85 +++++++++++-------- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 8c1758dcc0f1..c58b4ee46615 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -33,7 +33,6 @@ def _fail_if_needed(self): raise RuntimeError("injected fault") def inc(self, x): - print(self.i) self.i += x self.count += 1 self._fail_if_needed() @@ -73,7 +72,7 @@ def test_readers_on_different_nodes(ray_start_cluster): # one of the readers. cluster.add_node(num_cpus=1) ray.init(address=cluster.address) - # This node is for the other reader. + # 2 more nodes for other readers. cluster.add_node(num_cpus=1) cluster.add_node(num_cpus=1) cluster.wait_for_nodes() @@ -88,8 +87,8 @@ def test_readers_on_different_nodes(ray_start_cluster): def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() - nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - assert len(set(nodes_check)) == 3 + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == 3 with InputNode() as inp: x = a.inc.bind(inp) @@ -109,9 +108,10 @@ def test_bunch_readers_on_different_nodes(ray_start_cluster): cluster = ray_start_cluster ACTORS_PER_NODE = 2 NUM_REMOTE_NODES = 2 + # driver node cluster.add_node(num_cpus=ACTORS_PER_NODE) ray.init(address=cluster.address) - # This node is for the other two readers. + # additional nodes for multi readers in multi nodes for _ in range(NUM_REMOTE_NODES): cluster.add_node(num_cpus=ACTORS_PER_NODE) cluster.wait_for_nodes() @@ -126,8 +126,8 @@ def test_bunch_readers_on_different_nodes(ray_start_cluster): def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() - nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - assert len(set(nodes_check)) == NUM_REMOTE_NODES + 1 + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == NUM_REMOTE_NODES + 1 with InputNode() as inp: outputs = [] @@ -268,8 +268,8 @@ def test_multi_node_multi_reader_large_payload( def _get_node_id(self) -> "ray.NodeID": return ray.get_runtime_context().get_node_id() - nodes_check = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) - assert len(set(nodes_check)) == NUM_REMOTE_NODES + 1 + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == NUM_REMOTE_NODES + 1 with InputNode() as inp: outputs = [] diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 3eb349aedb5c..bb316c2095f4 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -1,7 +1,7 @@ import io import logging import time -from collections import defaultdict +from collections import defaultdict, namedtuple from typing import Any, Dict, List, Optional, Set, Tuple, Union import ray @@ -74,6 +74,10 @@ def _get_self_actor() -> Optional["ray.actor.ActorHandle"]: return None +# A tuple of object reference and its corresponding actor that holds it. +ReaderInfo = namedtuple("ReaderInfo", ["reader_ref", "reader_id"]) + + class _ResizeChannel: """ When a channel must be resized, the channel backing store must be resized on both @@ -84,9 +88,15 @@ class _ResizeChannel: def __init__( self, - _reader_refs: Dict[str, "ray.ObjectRef"] = None, + _node_id_to_reader_info: Dict[str, ReaderInfo] = None, ): - self._reader_refs = _reader_refs + """ + Args: + _node_id_to_reader_info: node_id -> reader info. + Each node should have only 1 reader actor and corresponding reference. + # that's shared by all readers. + """ + self._node_id_to_reader_info = _node_id_to_reader_info class SharedMemoryType(ChannelOutputType): @@ -162,7 +172,7 @@ def __init__( _writer_node_id: Optional["ray.NodeID"] = None, _reader_node_ids: Optional[Set["ray.NodeID"]] = None, _writer_ref: Optional["ray.ObjectRef"] = None, - _reader_refs: Optional[Dict[str, "ray.ObjectRef"]] = None, + _node_id_to_reader_info: Optional[Dict[str, ReaderInfo]] = None, _writer_registered: bool = False, _reader_registered: bool = False, ): @@ -210,8 +220,8 @@ def __init__( self._reader_node_ids = _reader_node_ids or set() # node_id -> reader references. Each node should have only 1 reader reference # that's shared by all readers. - self._reader_refs: Dict[str, Tuple["ray.ObjectRef", ray.ActorID]] = ( - _reader_refs or {} + self._node_id_to_reader_info: Dict[str, ReaderInfo] = ( + _node_id_to_reader_info or {} ) # Node ID -> a list of reader actors. @@ -246,15 +256,15 @@ def __init__( _writer_node_id is not None ), "_writer_node_id must also be passed to the constructor when " "_writer_ref is." - assert _reader_refs is not None, ( - "_reader_refs must also be passed to the constructor " + assert _node_id_to_reader_info is not None, ( + "_node_id_to_reader_info must also be passed to the constructor " "when _writer_ref is." ) self._writer_ref = _writer_ref self._writer_node_id = _writer_node_id self._reader_node_ids = _reader_node_ids - self._reader_refs = _reader_refs + self._node_id_to_reader_info = _node_id_to_reader_info assert self._num_local_readers == 0 remote_node_exists = False @@ -271,20 +281,19 @@ def __init__( assert self._num_local_readers > 0 self._local_reader_ref: Optional["ray.ObjectRef"] = self._get_local_reader_ref( - self._reader_refs + self._node_id_to_reader_info ) def _get_local_reader_ref( - self, reader_refs: Dict[str, Tuple["ray.ObjectRef", ray.ActorID]] + self, node_id_to_reader_info: Dict[str, ReaderInfo] ) -> Optional["ray.ObjectRef"]: - for reader_node_id, reader_ref_and_reader_id in reader_refs.items(): + for reader_node_id, reader_info in node_id_to_reader_info.items(): if self.is_local_node(reader_node_id): - return reader_ref_and_reader_id[0] + return reader_info.reader_ref return None def _create_reader_refs( self, - reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], buffer_size_bytes: int, ): # TODO(jhumphri): Free the current reader ref once the reference to it is @@ -296,16 +305,20 @@ def _create_reader_refs( # shared by all readers. reader = readers[0] fn = reader.__ray_call__ - self._reader_refs[node_id] = ( - ray.get(fn.remote(_create_channel_ref, buffer_size_bytes)), - reader._actor_id, + self._node_id_to_reader_info[node_id] = ReaderInfo( + reader_ref=ray.get( + fn.remote(_create_channel_ref, buffer_size_bytes) + ), + reader_id=reader._actor_id, ) else: writer_id = ray.ActorID.nil() if self._writer is not None: writer_id = self._writer._actor_id - self._reader_refs[node_id] = (self._writer_ref, writer_id) - assert len(self._reader_refs) == len(self._node_id_to_readers) + self._node_id_to_reader_info[node_id] = ReaderInfo( + reader_ref=self._writer_ref, reader_id=writer_id + ) + assert len(self._node_id_to_reader_info) == len(self._node_id_to_readers) # We need to register the new writer_ref. self._writer_registered = False @@ -360,11 +373,10 @@ def ensure_registered_as_reader(self) -> None: if self._reader_registered: return - for reader_node_id, reader_ref_and_reader_id in self._reader_refs.items(): + for reader_node_id, reader_info in self._node_id_to_reader_info.items(): if self.is_local_node(reader_node_id): - reader_ref = reader_ref_and_reader_id[0] self._worker.core_worker.experimental_channel_register_reader( - reader_ref, + reader_info.reader_ref, ) self._reader_registered = True @@ -376,7 +388,7 @@ def _deserialize_reader_channel( writer_node_id, reader_node_ids, writer_ref: "ray.ObjectRef", - reader_refs: "ray.ObjectRef", + node_id_to_reader_info: "ray.ObjectRef", writer_registered: bool, reader_registered: bool, ) -> "Channel": @@ -387,14 +399,14 @@ def _deserialize_reader_channel( _writer_node_id=writer_node_id, _reader_node_ids=reader_node_ids, _writer_ref=writer_ref, - _reader_refs=reader_refs, + _node_id_to_reader_info=node_id_to_reader_info, _writer_registered=writer_registered, _reader_registered=reader_registered, ) return chan def __reduce__(self): - assert self._reader_refs is not None + assert self._node_id_to_reader_info is not None return self._deserialize_reader_channel, ( self._writer, self._reader_and_node_list, @@ -402,14 +414,15 @@ def __reduce__(self): self._writer_node_id, self._reader_node_ids, self._writer_ref, - self._reader_refs, + self._node_id_to_reader_info, self._writer_registered, self._reader_registered, ) def __str__(self) -> str: return ( - f"Channel(_reader_refs={self._reader_refs}, _writer_ref={self._writer_ref})" + f"Channel(_node_id_to_reader_info={self._node_id_to_reader_info}, " + f"_writer_ref={self._writer_ref})" ) def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): @@ -425,14 +438,14 @@ def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): # TODO(sang): Support different policies such as 2X buffer size. prev_writer_ref = self._writer_ref self._writer_ref = _create_channel_ref(self, self._typ.buffer_size_bytes) - self._create_reader_refs( - self._reader_and_node_list, self._typ.buffer_size_bytes + self._create_reader_refs(self._typ.buffer_size_bytes) + self._local_reader_ref = self._get_local_reader_ref( + self._node_id_to_reader_info ) - self._local_reader_ref = self._get_local_reader_ref(self._reader_refs) # Write a special message to the channel so that the readers know to # stop using the current reader_ref. - special_message = _ResizeChannel(self._reader_refs) + special_message = _ResizeChannel(self._node_id_to_reader_info) special_message_serialized = ( self._worker.get_serialization_context().serialize(special_message) ) @@ -499,8 +512,10 @@ def read(self, timeout: Optional[float] = None) -> Any: )[0][0] if isinstance(ret, _ResizeChannel): - self._reader_refs = ret._reader_refs - self._local_reader_ref = self._get_local_reader_ref(self._reader_refs) + self._node_id_to_reader_info = ret._node_id_to_reader_info + self._local_reader_ref = self._get_local_reader_ref( + self._node_id_to_reader_info + ) # We need to register the new reader_ref. self._reader_registered = False self.ensure_registered_as_reader() @@ -527,9 +542,9 @@ def close(self) -> None: if is_local_node_reader: self.ensure_registered_as_reader() - for reader_ref_and_reader in self._reader_refs.values(): + for reader_info in self._node_id_to_reader_info.values(): self._worker.core_worker.experimental_channel_set_error( - reader_ref_and_reader[0] + reader_info.reader_ref ) From e3dac1d7f0a520c3828199d16869f7eb2f08a6de Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 9 Sep 2024 00:29:11 -0700 Subject: [PATCH 07/12] working now. --- .../channel/shared_memory_channel.py | 6 +-- python/ray/tests/test_channel.py | 50 +++++++++++-------- src/ray/core_worker/core_worker.cc | 2 +- .../test/mutable_object_provider_test.cc | 2 +- 4 files changed, 35 insertions(+), 25 deletions(-) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index bb316c2095f4..7ca0c4c9956c 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -250,7 +250,7 @@ def __init__( ) self._writer_ref = _create_channel_ref(self, typ.buffer_size_bytes) - self._create_reader_refs(reader_and_node_list, typ.buffer_size_bytes) + self._create_reader_refs(typ.buffer_size_bytes) else: assert ( _writer_node_id is not None @@ -342,10 +342,10 @@ def ensure_registered_as_writer(self) -> None: remote_reader_node_ids: List[str] = [] remote_reader_ids: List[str] = [] remote_num_readers_per_node: List[int] = [] - for reader_node_id, reader_ref_and_reader_id in self._reader_refs.items(): + for reader_node_id, reader_info in self._node_id_to_reader_info.items(): if self.is_local_node(reader_node_id): continue - reader_ref, reader_id = reader_ref_and_reader_id + reader_ref, reader_id = reader_info remote_reader_refs.append(reader_ref) remote_reader_ids.append(reader_id) remote_reader_node_ids.append(reader_node_id) diff --git a/python/ray/tests/test_channel.py b/python/ray/tests/test_channel.py index 97e119d87df5..bfb3d1e890f1 100644 --- a/python/ray/tests/test_channel.py +++ b/python/ray/tests/test_channel.py @@ -1221,6 +1221,10 @@ class Actor: def get_node_id(self): return ray.get_runtime_context().get_node_id() + def read(self, channel, val): + assert channel.read() == val + return val + def create_actor(node): return Actor.options( scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) @@ -1238,12 +1242,12 @@ def create_actor(node): driver_actor = create_driver_actor() driver_node = get_actor_node_id(driver_actor) - with pytest.raises( - ValueError, match="All reader actors must be on the same node.*" - ): - ray_channel.Channel( - None, [(driver_actor, driver_node), (a, a_node), (b, b_node)], 1000 - ) + ch = ray_channel.Channel( + None, [(driver_actor, driver_node), (a, a_node), (b, b_node)], 1000 + ) + val = 1 + ch.write(val) + assert ray.get([a.read.remote(ch, val) for a in actors]) == [val, val] @pytest.mark.skipif( @@ -1272,6 +1276,10 @@ class Actor: def get_node_id(self): return ray.get_runtime_context().get_node_id() + def read(self, channel, val): + assert channel.read() == val + return val + def create_actor(node): return Actor.options( scheduling_strategy=NodeAffinitySchedulingStrategy(node, soft=False) @@ -1295,20 +1303,22 @@ def create_actor(node): driver_actor = create_driver_actor() driver_node = get_actor_node_id(driver_actor) - with pytest.raises( - ValueError, match="All reader actors must be on the same node.*" - ): - ray_channel.Channel( - None, - [ - (driver_actor, driver_node), - (a, a_node), - (b, b_node), - (c, c_node), - (d, d_node), - ], - 1000, - ) + ch = ray_channel.Channel( + None, + [ + (driver_actor, driver_node), + (a, a_node), + (b, b_node), + (c, c_node), + (d, d_node), + ], + 1000, + ) + i = 1 + ch.write(i) + assert ray.get([a.read.remote(ch, i) for a in actors]) == [ + i for _ in range(len(actors)) + ] if __name__ == "__main__": diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f9ddb3d98f66..38c54f68f355 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1516,7 +1516,7 @@ Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( size_t num_requests = addrs.size(); RAY_CHECK_EQ(addrs.size(), remote_reader_object_ids.size()); std::promise promise; - for (auto i = 0; i < addrs.size(); i++) { + for (size_t i = 0; i < addrs.size(); i++) { const auto &addr = addrs[i]; const auto &reader_object_id = remote_reader_object_ids[i]; const auto &num_reader = remote_num_readers[i]; diff --git a/src/ray/core_worker/test/mutable_object_provider_test.cc b/src/ray/core_worker/test/mutable_object_provider_test.cc index 761b9d893eaf..2b5fdd139597 100644 --- a/src/ray/core_worker/test/mutable_object_provider_test.cc +++ b/src/ray/core_worker/test/mutable_object_provider_test.cc @@ -153,7 +153,7 @@ TEST(MutableObjectProvider, RegisterWriterChannel) { MutableObjectProvider provider( plasma, /*factory=*/absl::bind_front(GetTestInterface, interface)); - provider.RegisterWriterChannel(object_id, {&node_id}); + provider.RegisterWriterChannel(object_id, {node_id}); std::shared_ptr data; EXPECT_EQ(provider From b07715cb14e79acf899580c475742e593490f749 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 9 Sep 2024 10:22:22 -0700 Subject: [PATCH 08/12] done --- .../channel/shared_memory_channel.py | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 7ca0c4c9956c..6b75498dcfb8 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -75,7 +75,7 @@ def _get_self_actor() -> Optional["ray.actor.ActorHandle"]: # A tuple of object reference and its corresponding actor that holds it. -ReaderInfo = namedtuple("ReaderInfo", ["reader_ref", "reader_id"]) +ReaderInfo = namedtuple("ReaderInfo", ["reader_ref", "reader_actor_id"]) class _ResizeChannel: @@ -88,13 +88,13 @@ class _ResizeChannel: def __init__( self, - _node_id_to_reader_info: Dict[str, ReaderInfo] = None, + _node_id_to_reader_info: Dict[str, ReaderInfo], ): """ Args: _node_id_to_reader_info: node_id -> reader info. - Each node should have only 1 reader actor and corresponding reference. - # that's shared by all readers. + Each node should have only 1 reader actor and corresponding reference + that's shared by all readers. """ self._node_id_to_reader_info = _node_id_to_reader_info @@ -170,7 +170,6 @@ def __init__( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], typ: Optional[Union[int, SharedMemoryType]] = None, _writer_node_id: Optional["ray.NodeID"] = None, - _reader_node_ids: Optional[Set["ray.NodeID"]] = None, _writer_ref: Optional["ray.ObjectRef"] = None, _node_id_to_reader_info: Optional[Dict[str, ReaderInfo]] = None, _writer_registered: bool = False, @@ -217,7 +216,6 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered - self._reader_node_ids = _reader_node_ids or set() # node_id -> reader references. Each node should have only 1 reader reference # that's shared by all readers. self._node_id_to_reader_info: Dict[str, ReaderInfo] = ( @@ -242,9 +240,6 @@ def __init__( self_actor = _get_self_actor() assert writer == self_actor - for reader, node_id in reader_and_node_list: - self._reader_node_ids.add(node_id) - self._writer_node_id = ( ray.runtime_context.get_runtime_context().get_node_id() ) @@ -263,7 +258,6 @@ def __init__( self._writer_ref = _writer_ref self._writer_node_id = _writer_node_id - self._reader_node_ids = _reader_node_ids self._node_id_to_reader_info = _node_id_to_reader_info assert self._num_local_readers == 0 @@ -309,14 +303,14 @@ def _create_reader_refs( reader_ref=ray.get( fn.remote(_create_channel_ref, buffer_size_bytes) ), - reader_id=reader._actor_id, + reader_actor_id=reader._actor_id, ) else: writer_id = ray.ActorID.nil() if self._writer is not None: writer_id = self._writer._actor_id self._node_id_to_reader_info[node_id] = ReaderInfo( - reader_ref=self._writer_ref, reader_id=writer_id + reader_ref=self._writer_ref, reader_actor_id=writer_id ) assert len(self._node_id_to_reader_info) == len(self._node_id_to_readers) @@ -386,7 +380,6 @@ def _deserialize_reader_channel( reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], typ: int, writer_node_id, - reader_node_ids, writer_ref: "ray.ObjectRef", node_id_to_reader_info: "ray.ObjectRef", writer_registered: bool, @@ -397,7 +390,6 @@ def _deserialize_reader_channel( reader_and_node_list, typ, _writer_node_id=writer_node_id, - _reader_node_ids=reader_node_ids, _writer_ref=writer_ref, _node_id_to_reader_info=node_id_to_reader_info, _writer_registered=writer_registered, @@ -412,7 +404,6 @@ def __reduce__(self): self._reader_and_node_list, self._typ, self._writer_node_id, - self._reader_node_ids, self._writer_ref, self._node_id_to_reader_info, self._writer_registered, @@ -536,7 +527,7 @@ def close(self) -> None: self._worker.core_worker.experimental_channel_set_error(self._writer_ref) is_local_node_reader = False - for node_id in self._reader_node_ids: + for node_id in self._node_id_to_readers.keys(): if self.is_local_node(node_id): is_local_node_reader = True if is_local_node_reader: From 5b35173195bb1fa91a93795a23c4cdf9fe9f2e38 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 9 Sep 2024 12:25:40 -0700 Subject: [PATCH 09/12] done --- python/ray/_raylet.pyx | 36 +++--- .../channel/shared_memory_channel.py | 116 ++++++++---------- python/ray/includes/libcoreworker.pxd | 12 +- src/ray/core_worker/core_worker.cc | 30 ++--- src/ray/core_worker/core_worker.h | 13 +- .../experimental_mutable_object_manager.h | 11 ++ 6 files changed, 101 insertions(+), 117 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 64017481d12c..b442e013c39a 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -157,6 +157,7 @@ from ray.includes.libcoreworker cimport ( CFiberEvent, CActorHandle, CGeneratorBackpressureWaiter, + CReaderRefInfo, ) from ray.includes.ray_config cimport RayConfig @@ -3687,27 +3688,22 @@ cdef class CoreWorker: def experimental_channel_register_writer(self, ObjectRef writer_ref, - remote_reader_refs, - remote_reader_nodes, - remote_readers, - remote_num_readers_per_node): + remote_reader_ref_info): cdef: CObjectID c_writer_ref = writer_ref.native() - c_vector[CObjectID] c_remote_reader_refs c_vector[CNodeID] c_remote_reader_nodes - c_vector[CActorID] c_remote_readers - c_vector[int64_t] c_remote_num_readers - - for ref, node, reader, num_readers in zip( - remote_reader_refs, - remote_reader_nodes, - remote_readers, - remote_num_readers_per_node): - c_remote_reader_refs.push_back((ref).native()) - c_remote_reader_nodes.push_back(CNodeID.FromHex(node)) - c_remote_readers.push_back((reader).native()) - assert num_readers != 0 - c_remote_num_readers.push_back(num_readers) + c_vector[CReaderRefInfo] c_remote_reader_ref_info + CReaderRefInfo c_reader_ref_info + + for node_id, reader_ref_info in remote_reader_ref_info.items(): + c_reader_ref_info = CReaderRefInfo() + c_reader_ref_info.reader_ref_id = (reader_ref_info.reader_ref).native() + c_reader_ref_info.owner_reader_actor_id = (reader_ref_info.ref_owner_actor_id).native() + num_reader_actors = reader_ref_info.num_reader_actors + assert num_reader_actors != 0 + c_reader_ref_info.num_reader_actors = num_reader_actors + c_remote_reader_ref_info.push_back(c_reader_ref_info) + c_remote_reader_nodes.push_back(CNodeID.FromHex(node_id)) with nogil: check_status(CCoreWorkerProcess.GetCoreWorker() @@ -3719,9 +3715,7 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker() .ExperimentalRegisterMutableObjectReaderRemote( c_writer_ref, - c_remote_readers, - c_remote_num_readers, - c_remote_reader_refs + c_remote_reader_ref_info, )) def experimental_channel_register_reader(self, ObjectRef object_ref): diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index 6b75498dcfb8..9b23bd7a9ca6 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -74,8 +74,13 @@ def _get_self_actor() -> Optional["ray.actor.ActorHandle"]: return None -# A tuple of object reference and its corresponding actor that holds it. -ReaderInfo = namedtuple("ReaderInfo", ["reader_ref", "reader_actor_id"]) +# aDAG maintains 1 reader object reference (also called buffer) per node. +# reader_ref: The object reference. +# ref_owner_actor_id: The actor who created the object reference. +# num_readers: The number of reader actors who reads this object reference. +ReaderRefInfo = namedtuple( + "ReaderRefInfo", ["reader_ref", "ref_owner_actor_id", "num_reader_actors"] +) class _ResizeChannel: @@ -88,15 +93,13 @@ class _ResizeChannel: def __init__( self, - _node_id_to_reader_info: Dict[str, ReaderInfo], + _node_id_to_reader_ref_info: Dict[str, ReaderRefInfo], ): """ Args: - _node_id_to_reader_info: node_id -> reader info. - Each node should have only 1 reader actor and corresponding reference - that's shared by all readers. + _node_id_to_reader_ref_info: A node id to ReaderRefInfo. """ - self._node_id_to_reader_info = _node_id_to_reader_info + self._node_id_to_reader_ref_info = _node_id_to_reader_ref_info class SharedMemoryType(ChannelOutputType): @@ -171,7 +174,7 @@ def __init__( typ: Optional[Union[int, SharedMemoryType]] = None, _writer_node_id: Optional["ray.NodeID"] = None, _writer_ref: Optional["ray.ObjectRef"] = None, - _node_id_to_reader_info: Optional[Dict[str, ReaderInfo]] = None, + _node_id_to_reader_ref_info: Optional[Dict[str, ReaderRefInfo]] = None, _writer_registered: bool = False, _reader_registered: bool = False, ): @@ -216,10 +219,9 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered - # node_id -> reader references. Each node should have only 1 reader reference - # that's shared by all readers. - self._node_id_to_reader_info: Dict[str, ReaderInfo] = ( - _node_id_to_reader_info or {} + # A list of reader ref. + self._node_id_to_reader_ref_info: Dict[str, ReaderRefInfo] = ( + _node_id_to_reader_ref_info or {} ) # Node ID -> a list of reader actors. @@ -251,14 +253,14 @@ def __init__( _writer_node_id is not None ), "_writer_node_id must also be passed to the constructor when " "_writer_ref is." - assert _node_id_to_reader_info is not None, ( - "_node_id_to_reader_info must also be passed to the constructor " + assert _node_id_to_reader_ref_info is not None, ( + "_node_id_to_reader_ref_info must also be passed to the constructor " "when _writer_ref is." ) self._writer_ref = _writer_ref self._writer_node_id = _writer_node_id - self._node_id_to_reader_info = _node_id_to_reader_info + self._node_id_to_reader_ref_info = _node_id_to_reader_ref_info assert self._num_local_readers == 0 remote_node_exists = False @@ -275,15 +277,15 @@ def __init__( assert self._num_local_readers > 0 self._local_reader_ref: Optional["ray.ObjectRef"] = self._get_local_reader_ref( - self._node_id_to_reader_info + self._node_id_to_reader_ref_info ) def _get_local_reader_ref( - self, node_id_to_reader_info: Dict[str, ReaderInfo] + self, _node_id_to_reader_ref_info: Dict[str, ReaderRefInfo] ) -> Optional["ray.ObjectRef"]: - for reader_node_id, reader_info in node_id_to_reader_info.items(): - if self.is_local_node(reader_node_id): - return reader_info.reader_ref + for node_id, reader_ref_info in _node_id_to_reader_ref_info.items(): + if self.is_local_node(node_id): + return reader_ref_info.reader_ref return None def _create_reader_refs( @@ -296,23 +298,28 @@ def _create_reader_refs( for node_id, readers in self._node_id_to_readers.items(): if not self.is_local_node(node_id): # Find 1 reader in a remote node to create a reference that's - # shared by all readers. + # shared by all readers. When a new value is written to a reference, + # it is sent to this reference. reader = readers[0] fn = reader.__ray_call__ - self._node_id_to_reader_info[node_id] = ReaderInfo( + self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo( reader_ref=ray.get( fn.remote(_create_channel_ref, buffer_size_bytes) ), - reader_actor_id=reader._actor_id, + ref_owner_actor_id=reader._actor_id, + num_reader_actors=len(readers), ) else: writer_id = ray.ActorID.nil() if self._writer is not None: writer_id = self._writer._actor_id - self._node_id_to_reader_info[node_id] = ReaderInfo( - reader_ref=self._writer_ref, reader_actor_id=writer_id + self._node_id_to_reader_ref_info[node_id] = ReaderRefInfo( + reader_ref=self._writer_ref, + ref_owner_actor_id=writer_id, + num_reader_actors=len(readers), ) - assert len(self._node_id_to_reader_info) == len(self._node_id_to_readers) + # There must be only 1 node reader reference per node. + assert len(self._node_id_to_reader_ref_info) == len(self._node_id_to_readers) # We need to register the new writer_ref. self._writer_registered = False @@ -332,34 +339,15 @@ def ensure_registered_as_writer(self) -> None: "the writer is on." ) - remote_reader_refs: List["ray.ObjectRef"] = [] - remote_reader_node_ids: List[str] = [] - remote_reader_ids: List[str] = [] - remote_num_readers_per_node: List[int] = [] - for reader_node_id, reader_info in self._node_id_to_reader_info.items(): - if self.is_local_node(reader_node_id): + remote_reader_ref_info: Dict[str, ReaderRefInfo] = {} + for node_id, reader_ref_info in self._node_id_to_reader_ref_info.items(): + if self.is_local_node(node_id): continue - reader_ref, reader_id = reader_info - remote_reader_refs.append(reader_ref) - remote_reader_ids.append(reader_id) - remote_reader_node_ids.append(reader_node_id) - remote_num_readers_per_node.append( - len(self._node_id_to_readers[reader_node_id]) - ) - - assert ( - len(remote_reader_node_ids) - == len(remote_reader_ids) - == len(remote_reader_refs) - == len(remote_num_readers_per_node) - ) + remote_reader_ref_info[node_id] = reader_ref_info self._worker.core_worker.experimental_channel_register_writer( self._writer_ref, - remote_reader_refs, - remote_reader_node_ids, - remote_reader_ids, - remote_num_readers_per_node, + remote_reader_ref_info, ) self._writer_registered = True @@ -367,10 +355,10 @@ def ensure_registered_as_reader(self) -> None: if self._reader_registered: return - for reader_node_id, reader_info in self._node_id_to_reader_info.items(): - if self.is_local_node(reader_node_id): + for node_id, reader_ref_info in self._node_id_to_reader_ref_info.items(): + if self.is_local_node(node_id): self._worker.core_worker.experimental_channel_register_reader( - reader_info.reader_ref, + reader_ref_info.reader_ref, ) self._reader_registered = True @@ -381,7 +369,7 @@ def _deserialize_reader_channel( typ: int, writer_node_id, writer_ref: "ray.ObjectRef", - node_id_to_reader_info: "ray.ObjectRef", + node_id_to_reader_ref_info: Dict[str, ReaderRefInfo], writer_registered: bool, reader_registered: bool, ) -> "Channel": @@ -391,28 +379,28 @@ def _deserialize_reader_channel( typ, _writer_node_id=writer_node_id, _writer_ref=writer_ref, - _node_id_to_reader_info=node_id_to_reader_info, + _node_id_to_reader_ref_info=node_id_to_reader_ref_info, _writer_registered=writer_registered, _reader_registered=reader_registered, ) return chan def __reduce__(self): - assert self._node_id_to_reader_info is not None + assert self._node_id_to_reader_ref_info is not None return self._deserialize_reader_channel, ( self._writer, self._reader_and_node_list, self._typ, self._writer_node_id, self._writer_ref, - self._node_id_to_reader_info, + self._node_id_to_reader_ref_info, self._writer_registered, self._reader_registered, ) def __str__(self) -> str: return ( - f"Channel(_node_id_to_reader_info={self._node_id_to_reader_info}, " + f"Channel(_node_id_to_reader_ref_info={self._node_id_to_reader_ref_info}, " f"_writer_ref={self._writer_ref})" ) @@ -431,12 +419,12 @@ def _resize_channel_if_needed(self, serialized_value: str, timeout_ms: int): self._writer_ref = _create_channel_ref(self, self._typ.buffer_size_bytes) self._create_reader_refs(self._typ.buffer_size_bytes) self._local_reader_ref = self._get_local_reader_ref( - self._node_id_to_reader_info + self._node_id_to_reader_ref_info ) # Write a special message to the channel so that the readers know to # stop using the current reader_ref. - special_message = _ResizeChannel(self._node_id_to_reader_info) + special_message = _ResizeChannel(self._node_id_to_reader_ref_info) special_message_serialized = ( self._worker.get_serialization_context().serialize(special_message) ) @@ -503,9 +491,9 @@ def read(self, timeout: Optional[float] = None) -> Any: )[0][0] if isinstance(ret, _ResizeChannel): - self._node_id_to_reader_info = ret._node_id_to_reader_info + self._node_id_to_reader_ref_info = ret._node_id_to_reader_ref_info self._local_reader_ref = self._get_local_reader_ref( - self._node_id_to_reader_info + self._node_id_to_reader_ref_info ) # We need to register the new reader_ref. self._reader_registered = False @@ -533,9 +521,9 @@ def close(self) -> None: if is_local_node_reader: self.ensure_registered_as_reader() - for reader_info in self._node_id_to_reader_info.values(): + for reader_ref_info in self._node_id_to_reader_ref_info.values(): self._worker.core_worker.experimental_channel_set_error( - reader_info.reader_ref + reader_ref_info.reader_ref ) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 995603d55ed6..67e95215c063 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -81,6 +81,14 @@ cdef extern from "ray/core_worker/fiber.h" nogil: void Wait() void Notify() +cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil: + cdef cppclass CReaderRefInfo "ray::experimental::ReaderRefInfo": + CReaderRefInfo() + CObjectID reader_ref_id; + CActorID owner_reader_actor_id; + int64_t num_reader_actors; + + cdef extern from "ray/core_worker/context.h" nogil: cdef cppclass CWorkerContext "ray::core::WorkerContext": c_bool CurrentActorIsAsync() @@ -270,9 +278,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus ExperimentalRegisterMutableObjectReader(const CObjectID &object_id) CRayStatus ExperimentalRegisterMutableObjectReaderRemote( const CObjectID &object_id, - const c_vector[CActorID] &remote_reader_actors, - c_vector[int64_t] remote_num_readers, - const c_vector[CObjectID] &remote_reader_refs) + const c_vector[CReaderRefInfo] &remote_reader_ref_info) CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, const unique_ptr[CAddress] &owner_address) CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 38c54f68f355..f9f5329048bf 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1496,33 +1496,25 @@ Status CoreWorker::ExperimentalRegisterMutableObjectWriter( Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote( const ObjectID &writer_object_id, - const std::vector &remote_reader_actors, - std::vector remote_num_readers, - const std::vector &remote_reader_object_ids) { - if (remote_reader_actors.size() == 0) { + const std::vector &remote_reader_ref_info) { + if (remote_reader_ref_info.size() == 0) { return Status::OK(); } - std::vector addrs; - for (const auto &actor_id : remote_reader_actors) { - const auto &addr = actor_task_submitter_->GetActorAddress(actor_id); + std::shared_ptr num_replied = std::make_shared(0); + size_t num_requests = remote_reader_ref_info.size(); + std::promise promise; + for (const auto &reader_ref_info : remote_reader_ref_info) { + const auto &owner_reader_actor_id = reader_ref_info.owner_reader_actor_id; + const auto &reader_object_id = reader_ref_info.reader_ref_id; + const auto &num_reader = reader_ref_info.num_reader_actors; + const auto &addr = actor_task_submitter_->GetActorAddress(owner_reader_actor_id); // It can happen if an actor is not created yet. We assume the API is called only when // an actor is alive, which is true now. RAY_CHECK(addr.has_value()); - addrs.push_back(*addr); - } - - std::shared_ptr num_replied = std::make_shared(0); - size_t num_requests = addrs.size(); - RAY_CHECK_EQ(addrs.size(), remote_reader_object_ids.size()); - std::promise promise; - for (size_t i = 0; i < addrs.size(); i++) { - const auto &addr = addrs[i]; - const auto &reader_object_id = remote_reader_object_ids[i]; - const auto &num_reader = remote_num_readers[i]; std::shared_ptr conn = - core_worker_client_pool_->GetOrConnect(addr); + core_worker_client_pool_->GetOrConnect(*addr); rpc::RegisterMutableObjectReaderRequest req; req.set_writer_object_id(writer_object_id.Binary()); diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index f6036484b02a..251b3cbc96bb 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -755,19 +755,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// that is written to on this node to the corresponding mutable object that is read on /// the node that `remote_reader_actors` is on. /// - /// The API assumes `remote_reader_actors`, `remote_num_readers`, and - /// `remote_reader_object_ids` has the same order and length. - /// /// \param[in] writer_object_id The ID of the object that is written on this node. - /// \param[in] remote_reader_actors The list of actors that read the object in remote - /// nodes. \param[in] remote_num_readers A list of the total number of readers per each - /// remote node. \param[in] remote_reader_object_ids A list of IDs of the corresponding - /// object that is read on the remote node. + /// \param[in] remote_reader_ref_info The remote reader reference info. There's + /// 1 reader reference per node. Status ExperimentalRegisterMutableObjectReaderRemote( const ObjectID &writer_object_id, - const std::vector &remote_reader_actors, - std::vector remote_num_readers, - const std::vector &remote_reader_object_ids); + const std::vector &remote_reader_ref_info); /// Get a list of objects from the object store. Objects that failed to be retrieved /// will be returned as nullptrs. diff --git a/src/ray/core_worker/experimental_mutable_object_manager.h b/src/ray/core_worker/experimental_mutable_object_manager.h index dd24ffcac43d..2ddafebc7085 100644 --- a/src/ray/core_worker/experimental_mutable_object_manager.h +++ b/src/ray/core_worker/experimental_mutable_object_manager.h @@ -34,6 +34,17 @@ namespace ray { namespace experimental { +struct ReaderRefInfo { + ReaderRefInfo() = default; + + // The ObjectID of the reader reference. + ObjectID reader_ref_id; + // The actor id of the owner of the reference. + ActorID owner_reader_actor_id; + // The number of reader actors reading this buffer. + int64_t num_reader_actors; +}; + class MutableObjectManager : public std::enable_shared_from_this { public: /// Buffer for a mutable object. This buffer wraps a shared memory buffer of From 26ffedf6628856496578dcc4aa0baae437ae895e Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 9 Sep 2024 12:26:23 -0700 Subject: [PATCH 10/12] . --- python/ray/_raylet.pyx | 6 ++++-- python/ray/includes/libcoreworker.pxd | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index b442e013c39a..1f5d29f0fd88 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3697,8 +3697,10 @@ cdef class CoreWorker: for node_id, reader_ref_info in remote_reader_ref_info.items(): c_reader_ref_info = CReaderRefInfo() - c_reader_ref_info.reader_ref_id = (reader_ref_info.reader_ref).native() - c_reader_ref_info.owner_reader_actor_id = (reader_ref_info.ref_owner_actor_id).native() + c_reader_ref_info.reader_ref_id = ( + reader_ref_info.reader_ref).native() + c_reader_ref_info.owner_reader_actor_id = ( + reader_ref_info.ref_owner_actor_id).native() num_reader_actors = reader_ref_info.num_reader_actors assert num_reader_actors != 0 c_reader_ref_info.num_reader_actors = num_reader_actors diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 67e95215c063..242c5f10dd49 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -84,9 +84,9 @@ cdef extern from "ray/core_worker/fiber.h" nogil: cdef extern from "ray/core_worker/experimental_mutable_object_manager.h" nogil: cdef cppclass CReaderRefInfo "ray::experimental::ReaderRefInfo": CReaderRefInfo() - CObjectID reader_ref_id; - CActorID owner_reader_actor_id; - int64_t num_reader_actors; + CObjectID reader_ref_id + CActorID owner_reader_actor_id + int64_t num_reader_actors cdef extern from "ray/core_worker/context.h" nogil: From 14a47f9617df9190d8b556f357432b250dec4002 Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Mon, 9 Sep 2024 20:22:08 -0700 Subject: [PATCH 11/12] fixed --- python/ray/experimental/channel/shared_memory_channel.py | 3 ++- src/ray/raylet_client/raylet_client.cc | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index d44253add5c7..44da4736a475 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -222,7 +222,8 @@ def __init__( self._writer_registered = _writer_registered self._reader_registered = _reader_registered - # A list of reader ref. + # NodeID -> ReaderRefInfo on that node. Note that there's only 1 + # reader ref per node. self._node_id_to_reader_ref_info: Dict[str, ReaderRefInfo] = ( _node_id_to_reader_ref_info or {} ) diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index 0227df764263..938f20fab80e 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -426,8 +426,7 @@ void raylet::RayletClient::PushMutableObject( const ray::rpc::ClientCallback &callback) { // Ray sets the gRPC max payload size to ~512 MiB. We set the max chunk size to a // slightly lower value to allow extra padding just in case. - uint64_t kMaxGrpcPayloadSize = - RayConfig::instance().max_grpc_message_size() * 0.98; + uint64_t kMaxGrpcPayloadSize = RayConfig::instance().max_grpc_message_size() * 0.98; uint64_t total_size = data_size + metadata_size; uint64_t total_num_chunks = total_size / kMaxGrpcPayloadSize; // If `total_size` is not a multiple of `kMaxGrpcPayloadSize`, then we need to send an From b6ed67e58a7bf8117328a25af498bda322e2318b Mon Sep 17 00:00:00 2001 From: SangBin Cho Date: Tue, 10 Sep 2024 08:06:02 -0700 Subject: [PATCH 12/12] . --- python/ray/dag/compiled_dag_node.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 9504350ed72c..9526a242f331 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -115,7 +115,6 @@ def do_exec_tasks( if done: break for operation in schedule: - print("SANG-TODO operation: ", operation) done = tasks[operation.exec_task_idx].exec_operation( self, operation.type )