diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 009a7fad5dbb..bd7c559ea89b 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -112,7 +112,6 @@ def do_exec_tasks( if done: break for operation in schedule: - print("SANG-TODO operation: ", operation) done = tasks[operation.exec_task_idx].exec_operation( self, operation.type ) diff --git a/python/ray/dag/tests/experimental/test_multi_node_dag.py b/python/ray/dag/tests/experimental/test_multi_node_dag.py index 6e29a9def0e8..70fa16d44571 100644 --- a/python/ray/dag/tests/experimental/test_multi_node_dag.py +++ b/python/ray/dag/tests/experimental/test_multi_node_dag.py @@ -233,13 +233,10 @@ def get_node_id(self): compiled_dag = dag.experimental_compile() - # Ray sets the gRPC payload max size to 512 MiB. We choose a size in this test that - # is a bit larger. size = GRPC_MAX_SIZE + (1024 * 1024 * 2) val = b"x" * size for i in range(3): - print(f"{i} iteration") ref = compiled_dag.execute(val) result = ray.get(ref) assert result == val @@ -249,6 +246,58 @@ def get_node_id(self): compiled_dag.teardown() +@pytest.mark.parametrize("num_actors", [1, 4]) +@pytest.mark.parametrize("num_nodes", [1, 4]) +def test_multi_node_multi_reader_large_payload( + ray_start_cluster, num_actors, num_nodes, monkeypatch +): + # Set max grpc size to 5mb. + GRPC_MAX_SIZE = 1024 * 1024 * 5 + monkeypatch.setenv("RAY_max_grpc_message_size", str(GRPC_MAX_SIZE)) + cluster = ray_start_cluster + ACTORS_PER_NODE = num_actors + NUM_REMOTE_NODES = num_nodes + cluster.add_node(num_cpus=ACTORS_PER_NODE) + ray.init(address=cluster.address) + # This node is for the other two readers. + for _ in range(NUM_REMOTE_NODES): + cluster.add_node(num_cpus=ACTORS_PER_NODE) + cluster.wait_for_nodes() + + wait_for_condition(lambda: len(ray.nodes()) == NUM_REMOTE_NODES + 1) + + actors = [ + Actor.options(num_cpus=1).remote(0) + for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1)) + ] + + def _get_node_id(self) -> "ray.NodeID": + return ray.get_runtime_context().get_node_id() + + node_ids = ray.get([act.__ray_call__.remote(_get_node_id) for act in actors]) + assert len(set(node_ids)) == NUM_REMOTE_NODES + 1 + + with InputNode() as inp: + outputs = [] + for actor in actors: + outputs.append(actor.echo.bind(inp)) + dag = MultiOutputNode(outputs) + + compiled_dag = dag.experimental_compile() + + # Set the object size a little bigger than the gRPC size so that + # it triggers chunking and resizing. + size = GRPC_MAX_SIZE + (1024 * 1024 * 2) + val = b"x" * size + + for _ in range(3): + ref = compiled_dag.execute(val) + result = ray.get(ref) + assert result == [val for _ in range(ACTORS_PER_NODE * (NUM_REMOTE_NODES + 1))] + + compiled_dag.teardown() + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/experimental/channel/shared_memory_channel.py b/python/ray/experimental/channel/shared_memory_channel.py index ba50e466a8d4..2225e2740ef5 100644 --- a/python/ray/experimental/channel/shared_memory_channel.py +++ b/python/ray/experimental/channel/shared_memory_channel.py @@ -631,102 +631,6 @@ def next_read_index(self): return self._next_read_index -@DeveloperAPI -class BufferedSharedMemoryChannel(ChannelInterface): - """A channel that can be read and written by Ray processes. - - It creates `num_shm_buffers` number of buffers and allows buffered read and - write APIs. I.e., read and write APIs are non-blocking as long as it can write to - next buffer or read from a next buffer. See `read` and `write` APIs for - more details. - - Args: - writer: The actor that may write to the channel. None signifies the driver. - reader_and_node_list: A list of tuples, where each tuple contains a reader - actor handle and the node ID where the actor is located. - num_shm_buffers: Number of shared memory buffers to read/write. - typ: Type information about the values passed through the channel. - Either an integer representing the max buffer size in bytes - allowed, or a SharedMemoryType. - """ - - def __init__( - self, - writer: Optional[ray.actor.ActorHandle], - reader_and_node_list: List[Tuple["ray.actor.ActorHandle", str]], - num_shm_buffers: int, - typ: Optional[Union[int, SharedMemoryType]] = None, - ): - self._num_shm_buffers = num_shm_buffers - self._buffers = [ - # We use Channel directly as a buffer implementation as - # channel only allows to have 1 shared memory buffer. - Channel(writer, reader_and_node_list, typ) - for _ in range(num_shm_buffers) - ] - # The next index to write from self._buffers. - self._next_write_index = 0 - # The next index to read from self._buffers. - self._next_read_index = 0 - - def ensure_registered_as_writer(self): - """ - Check whether the process is a valid writer. This method must be idempotent. - """ - for buffer in self._buffers: - buffer.ensure_registered_as_writer() - - def ensure_registered_as_reader(self): - """ - Check whether the process is a valid reader. This method must be idempotent. - """ - for buffer in self._buffers: - buffer.ensure_registered_as_reader() - - def write(self, value: Any, timeout: Optional[float] = None) -> None: - """Write a value to a channel. - - If the next buffer is available, it returns immediately. If the next - buffer is not read by downstream consumers, it blocks until a buffer is - available to write. If a buffer is not available within timeout, it raises - RayChannelTimeoutError. - """ - # A single channel is not supposed to read and write at the same time. - assert self._next_read_index == 0 - self._buffers[self._next_write_index].write(value, timeout) - self._next_write_index += 1 - self._next_write_index %= self._num_shm_buffers - - def read(self, timeout: Optional[float] = None) -> Any: - """Read a value from a channel. - - If the next buffer is available, it returns immediately. If the next - buffer is not written by an upstream producer, it blocks until a buffer is - available to read. If a buffer is not available within timeout, it raises - RayChannelTimeoutError. - """ - # A single channel is not supposed to read and write at the same time. - assert self._next_write_index == 0 - output = self._buffers[self._next_read_index].read(timeout) - self._next_read_index += 1 - self._next_read_index %= self._num_shm_buffers - return output - - def close(self) -> None: - for buffer in self._buffers: - buffer.close() - - @property - def next_write_index(self): - # Testing only - return self._next_write_index - - @property - def next_read_index(self): - # Testing only - return self._next_read_index - - @PublicAPI(stability="alpha") class CompositeChannel(ChannelInterface): """ diff --git a/python/ray/tests/test_channel.py b/python/ray/tests/test_channel.py index ff8a0238974f..d7f4cc0a26d4 100644 --- a/python/ray/tests/test_channel.py +++ b/python/ray/tests/test_channel.py @@ -1394,78 +1394,6 @@ def write(self, i, timeout=None) -> bool: ) -def test_buffered_channel(shutdown_only): - """Test buffered shared memory channel.""" - BUFFER_SIZE = 5 - - @ray.remote(num_cpus=1) - class Actor: - def __init__(self): - self.write_index = 0 - - def setup(self, driver_actor): - self._channel = ray_channel.BufferedSharedMemoryChannel( - ray.get_runtime_context().current_actor, - [(driver_actor, get_actor_node_id(driver_actor))], - BUFFER_SIZE, - typ=1000, - ) - - def get_channel(self): - return self._channel - - def write(self, i, timeout=None) -> bool: - """Write to a channel Return False if channel times out. - Return true otherwise. - """ - self.write_index += 1 - try: - self._channel.write(i, timeout) - except ray.exceptions.RayChannelTimeoutError: - return False - assert self._channel.next_write_index == self.write_index % BUFFER_SIZE - return True - - a = Actor.remote() - ray.get(a.setup.remote(create_driver_actor())) - chan = ray.get(a.get_channel.remote()) - - print("Test basic.") - # Iterate more than buffer size to make sure it works over and over again. - read_idx = 0 - for i in range(BUFFER_SIZE * 3): - read_idx += 1 - assert ray.get(a.write.remote(i)) - assert chan.read() == i - assert chan.next_read_index == read_idx % BUFFER_SIZE - - print("Test Write timeout.") - # Test write timeout. - for i in range(BUFFER_SIZE): - # fill the buffer withtout read. - ray.get(a.write.remote(i)) - # timeout because all the buffer is exhausted without being consumed. - assert ray.get(a.write.remote(1, timeout=1)) is False - - print("Test Read timeout.") - # Test read timeout. - for i in range(BUFFER_SIZE): - # This reads all previous writes. - assert chan.read() == i - # This read times out because there's no new write, and the call blocks. - with pytest.raises(ray.exceptions.RayChannelTimeoutError): - chan.read(timeout=1) - - print("Test serialization/deserialization works") - deserialized = pickle.loads(pickle.dumps(chan)) - assert len(chan._buffers) == len(deserialized._buffers) - for i in range(len(chan._buffers)): - assert ( - deserialized._buffers[i]._writer._actor_id - == chan._buffers[i]._writer._actor_id - ) - - if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/src/ray/core_worker/experimental_mutable_object_provider.cc b/src/ray/core_worker/experimental_mutable_object_provider.cc index ce7310fc3ede..9dea315d957a 100644 --- a/src/ray/core_worker/experimental_mutable_object_provider.cc +++ b/src/ray/core_worker/experimental_mutable_object_provider.cc @@ -234,20 +234,31 @@ void MutableObjectProvider::PollWriterClosure( RAY_CHECK(object->GetData()); RAY_CHECK(object->GetMetadata()); - RAY_LOG(ERROR) << "SANG-TODO Push mutable object! " << object_id; - reader->PushMutableObject( - object_id, - object->GetData()->Size(), - object->GetMetadata()->Size(), - object->GetData()->Data(), - [this, &io_context, object_id, reader](const Status &status, - const rpc::PushMutableObjectReply &reply) { - io_context.post( - [this, &io_context, object_id, reader]() { - PollWriterClosure(io_context, object_id, reader); - }, - "experimental::MutableObjectProvider.PollWriter"); - }); + std::shared_ptr num_replied = std::make_shared(0); + for (const auto &reader : *remote_readers) { + reader->PushMutableObject( + writer_object_id, + object->GetData()->Size(), + object->GetMetadata()->Size(), + object->GetData()->Data(), + [this, &io_context, writer_object_id, remote_readers, num_replied]( + const Status &status, const rpc::PushMutableObjectReply &reply) { + *num_replied += 1; + if (!status.ok()) { + RAY_LOG(ERROR) + << "Failed to transfer object to a remote node for an object id " + << writer_object_id << ". It can cause hang."; + } + + if (*num_replied == remote_readers->size()) { + io_context.post( + [this, &io_context, writer_object_id, remote_readers]() { + PollWriterClosure(io_context, writer_object_id, remote_readers); + }, + "experimental::MutableObjectProvider.PollWriter"); + } + }); + } } void MutableObjectProvider::RunIOContext(instrumented_io_context &io_context) { diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index dfc7ea1eb3fd..938f20fab80e 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -426,8 +426,7 @@ void raylet::RayletClient::PushMutableObject( const ray::rpc::ClientCallback &callback) { // Ray sets the gRPC max payload size to ~512 MiB. We set the max chunk size to a // slightly lower value to allow extra padding just in case. - uint64_t kMaxGrpcPayloadSize = - RayConfig::instance().max_grpc_message_size() * 0.98; // 500 MiB. + uint64_t kMaxGrpcPayloadSize = RayConfig::instance().max_grpc_message_size() * 0.98; uint64_t total_size = data_size + metadata_size; uint64_t total_num_chunks = total_size / kMaxGrpcPayloadSize; // If `total_size` is not a multiple of `kMaxGrpcPayloadSize`, then we need to send an