diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index dae746e3c1ed..24a71d7b87ec 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -860,10 +860,16 @@ def get_objects( debugger_breakpoint = metadata_fields[1][ len(ray_constants.OBJECT_METADATA_DEBUG_PREFIX) : ] - return ( - self.deserialize_objects(data_metadata_pairs, object_refs), - debugger_breakpoint, - ) + values = self.deserialize_objects(data_metadata_pairs, object_refs) + for i, value in enumerate(values): + if isinstance(value, RayError): + if isinstance(value, ray.exceptions.ObjectLostError): + global_worker.core_worker.dump_object_store_memory_usage() + if isinstance(value, RayTaskError): + raise value.as_instanceof_cause() + else: + raise value + return values, debugger_breakpoint def main_loop(self): """The main loop a worker runs to receive and execute tasks.""" diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 7b32ce1a3363..7d83797cd723 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3517,7 +3517,7 @@ cdef class CoreWorker: def experimental_mutable_object_put_serialized(self, serialized_object, ObjectRef object_ref, - num_readers, + num_readers ): cdef: CObjectID c_object_id = object_ref.native() @@ -3542,6 +3542,13 @@ cdef class CoreWorker: c_object_id, )) + def experimental_mutable_object_set_error(self, ObjectRef object_ref): + cdef: + CObjectID c_object_id = object_ref.native() + + check_status(CCoreWorkerProcess.GetCoreWorker() + .ExperimentalMutableObjectSetError(c_object_id)) + def experimental_mutable_object_read_release(self, object_refs): """ For experimental.channel.Channel. diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 4e86b5686f53..7c217bc7fa99 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1,24 +1,22 @@ -import logging -from typing import Any, Dict, List, Tuple, Union, Optional - from collections import defaultdict +from typing import Any, Dict, List, Tuple, Union, Optional +import logging +import threading +import traceback import ray -import ray.experimental.channel as ray_channel +from ray.exceptions import RayTaskError +from ray.experimental.channel import Channel from ray.util.annotations import DeveloperAPI MAX_BUFFER_SIZE = int(100 * 1e6) # 100MB -ChannelType = "ray.experimental.channel.Channel" - logger = logging.getLogger(__name__) @DeveloperAPI -def do_allocate_channel( - self, buffer_size_bytes: int, num_readers: int = 1 -) -> ChannelType: +def do_allocate_channel(self, buffer_size_bytes: int, num_readers: int = 1) -> Channel: """Generic actor method to allocate an output channel. Args: @@ -28,14 +26,14 @@ def do_allocate_channel( Returns: The allocated channel. """ - self._output_channel = ray_channel.Channel(buffer_size_bytes, num_readers) + self._output_channel = Channel(buffer_size_bytes, num_readers) return self._output_channel @DeveloperAPI def do_exec_compiled_task( self, - inputs: List[Union[Any, ChannelType]], + inputs: List[Union[Any, Channel]], actor_method_name: str, ) -> None: """Generic actor method to begin executing a compiled DAG. This runs an @@ -51,14 +49,17 @@ def do_exec_compiled_task( actor_method_name: The name of the actual actor method to execute in the loop. """ + self._dag_cancelled = False + try: + self._input_channels = [i for i in inputs if isinstance(i, Channel)] method = getattr(self, actor_method_name) resolved_inputs = [] input_channel_idxs = [] # Add placeholders for input channels. for idx, inp in enumerate(inputs): - if isinstance(inp, ray_channel.Channel): + if isinstance(inp, Channel): input_channel_idxs.append((idx, inp)) resolved_inputs.append(None) else: @@ -68,17 +69,42 @@ def do_exec_compiled_task( for idx, channel in input_channel_idxs: resolved_inputs[idx] = channel.begin_read() - output_val = method(*resolved_inputs) + try: + output_val = method(*resolved_inputs) + except Exception as exc: + backtrace = ray._private.utils.format_error_message( + "".join( + traceback.format_exception(type(exc), exc, exc.__traceback__) + ), + task_exception=True, + ) + wrapped = RayTaskError( + function_name="do_exec_compiled_task", + traceback_str=backtrace, + cause=exc, + ) + self._output_channel.write(wrapped) + else: + if self._dag_cancelled: + raise RuntimeError("DAG execution cancelled") + self._output_channel.write(output_val) - self._output_channel.write(output_val) for _, channel in input_channel_idxs: channel.end_read() - except Exception as e: - logging.warn(f"Compiled DAG task aborted with exception: {e}") + except Exception: + logging.exception("Compiled DAG task exited with exception") raise +@DeveloperAPI +def do_cancel_compiled_task(self): + self._dag_cancelled = True + for channel in self._input_channels: + channel.close() + self._output_channel.close() + + @DeveloperAPI class CompiledTask: """Wraps the normal Ray DAGNode with some metadata.""" @@ -147,11 +173,13 @@ def __init__(self, buffer_size_bytes: Optional[int]): self.actor_task_count: Dict["ray._raylet.ActorID", int] = defaultdict(int) # Cached attributes that are set during compilation. - self.dag_input_channel: Optional[ChannelType] = None - self.dag_output_channels: Optional[ChannelType] = None + self.dag_input_channel: Optional[Channel] = None + self.dag_output_channels: Optional[Channel] = None # ObjectRef for each worker's task. The task is an infinite loop that # repeatedly executes the method specified in the DAG. self.worker_task_refs: List["ray.ObjectRef"] = [] + # Set of actors present in the DAG. + self.actor_refs = set() def _add_node(self, node: "ray.dag.DAGNode") -> None: idx = self.counter @@ -254,7 +282,7 @@ def _preprocess(self) -> None: def _get_or_compile( self, - ) -> Tuple[ChannelType, Union[ChannelType, List[ChannelType]]]: + ) -> Tuple[Channel, Union[Channel, List[Channel]]]: """Compile an execution path. This allocates channels for adjacent tasks to send/receive values. An infinite task is submitted to each actor in the DAG that repeatedly receives from input channel(s) and @@ -276,7 +304,6 @@ def _get_or_compile( if self.dag_input_channel is not None: assert self.dag_output_channels is not None - # Driver should ray.put on input, ray.get/release on output return ( self.dag_input_channel, self.dag_output_channels, @@ -303,8 +330,9 @@ def _get_or_compile( num_readers=task.num_readers, ) ) + self.actor_refs.add(task.dag_node._get_actor_handle()) elif isinstance(task.dag_node, InputNode): - task.output_channel = ray_channel.Channel( + task.output_channel = Channel( buffer_size_bytes=self._buffer_size_bytes, num_readers=task.num_readers, ) @@ -345,7 +373,7 @@ def _get_or_compile( # Assign the task with the correct input and output buffers. worker_fn = task.dag_node._get_remote_method("__ray_call__") self.worker_task_refs.append( - worker_fn.remote( + worker_fn.options(concurrency_group="_ray_system").remote( do_exec_compiled_task, resolved_args, task.dag_node.get_method_name(), @@ -373,13 +401,60 @@ def _get_or_compile( self.dag_output_channels = self.dag_output_channels[0] # Driver should ray.put on input, ray.get/release on output - return (self.dag_input_channel, self.dag_output_channels) + self._monitor = self._monitor_failures() + return (self.dag_input_channel, self.dag_output_channels, self._monitor) + + def _monitor_failures(self): + outer = self + + class Monitor(threading.Thread): + def __init__(self): + super().__init__(daemon=True) + self.in_teardown = False + + def teardown(self): + if self.in_teardown: + return + logger.info("Tearing down compiled DAG") + self.in_teardown = True + for actor in outer.actor_refs: + logger.info(f"Cancelling compiled worker on actor: {actor}") + try: + ray.get(actor.__ray_call__.remote(do_cancel_compiled_task)) + except Exception: + logger.exception("Error cancelling worker task") + pass + logger.info("Waiting for worker tasks to exit") + for ref in outer.worker_task_refs: + try: + ray.get(ref) + except Exception: + pass + logger.info("Teardown complete") + + def run(self): + try: + ray.get(outer.worker_task_refs) + except Exception as e: + logger.debug(f"Handling exception from worker tasks: {e}") + if self.in_teardown: + return + if isinstance(outer.dag_output_channels, list): + for output_channel in outer.dag_output_channels: + output_channel.close() + else: + outer.dag_output_channels.close() + self.teardown() + + monitor = Monitor() + monitor.start() + return monitor def execute( self, *args, **kwargs, - ) -> Union[ChannelType, List[ChannelType]]: + ) -> Union[Channel, List[Channel]]: """Execute this DAG using the compiled execution path. Args: @@ -400,6 +475,10 @@ def execute( input_channel.write(args[0]) return output_channels + def teardown(self): + """Teardown and cancel all worker tasks for this DAG.""" + self._monitor.teardown() + @DeveloperAPI def build_compiled_dag_from_ray_dag( diff --git a/python/ray/dag/tests/test_accelerated_dag.py b/python/ray/dag/tests/test_accelerated_dag.py index b293ee894026..49fd5e09230e 100644 --- a/python/ray/dag/tests/test_accelerated_dag.py +++ b/python/ray/dag/tests/test_accelerated_dag.py @@ -1,15 +1,17 @@ # coding: utf-8 import logging import os +import random import sys +import time import pytest import ray import ray.cluster_utils +from ray.exceptions import RaySystemError from ray.dag import InputNode, MultiOutputNode from ray.tests.conftest import * # noqa -from ray._private.test_utils import wait_for_condition logger = logging.getLogger(__name__) @@ -20,12 +22,21 @@ @ray.remote class Actor: - def __init__(self, init_value): + def __init__(self, init_value, fail_after=None, sys_exit=False): print("__init__ PID", os.getpid()) self.i = init_value + self.fail_after = fail_after + self.sys_exit = sys_exit def inc(self, x): self.i += x + if self.fail_after and self.i > self.fail_after: + # Randomize the failures to better cover multi actor scenarios. + if random.random() > 0.5: + if self.sys_exit: + os._exit(1) + else: + raise ValueError("injected fault") return self.i def append_to(self, lst): @@ -37,6 +48,10 @@ def inc_two(self, x, y): self.i += y return self.i + def sleep(self, x): + time.sleep(x) + return x + def test_basic(ray_start_regular): a = Actor.remote(0) @@ -52,6 +67,10 @@ def test_basic(ray_start_regular): assert result == i + 1 output_channel.end_read() + # Note: must teardown before starting a new Ray session, otherwise you'll get + # a segfault from the dangling monitor thread upon the new Ray init. + compiled_dag.teardown() + def test_regular_args(ray_start_regular): # Test passing regular args to .bind in addition to DAGNode args. @@ -68,6 +87,8 @@ def test_regular_args(ray_start_regular): assert result == (i + 1) * 3 output_channel.end_read() + compiled_dag.teardown() + @pytest.mark.parametrize("num_actors", [1, 4]) def test_scatter_gather_dag(ray_start_regular, num_actors): @@ -86,6 +107,8 @@ def test_scatter_gather_dag(ray_start_regular, num_actors): for chan in output_channels: chan.end_read() + compiled_dag.teardown() + @pytest.mark.parametrize("num_actors", [1, 4]) def test_chain_dag(ray_start_regular, num_actors): @@ -104,6 +127,8 @@ def test_chain_dag(ray_start_regular, num_actors): assert result == list(range(num_actors)) output_channel.end_read() + compiled_dag.teardown() + def test_dag_exception(ray_start_regular, capsys): a = Actor.remote(0) @@ -111,10 +136,17 @@ def test_dag_exception(ray_start_regular, capsys): dag = a.inc.bind(inp) compiled_dag = dag.experimental_compile() - compiled_dag.execute("hello") - wait_for_condition( - lambda: "Compiled DAG task aborted with exception" in capsys.readouterr().err - ) + output_channel = compiled_dag.execute("hello") + with pytest.raises(TypeError): + output_channel.begin_read() + output_channel.end_read() + + # Can do it multiple times. + output_channel = compiled_dag.execute("hello") + with pytest.raises(TypeError): + output_channel.begin_read() + + compiled_dag.teardown() def test_dag_errors(ray_start_regular): @@ -176,6 +208,85 @@ def f(x): dag.experimental_compile() +def test_dag_fault_tolerance(ray_start_regular_shared): + actors = [Actor.remote(0, fail_after=100, sys_exit=False) for _ in range(4)] + with InputNode() as i: + out = [a.inc.bind(i) for a in actors] + dag = MultiOutputNode(out) + + compiled_dag = dag.experimental_compile() + + for i in range(99): + output_channels = compiled_dag.execute(1) + # TODO(swang): Replace with fake ObjectRef. + results = [chan.begin_read() for chan in output_channels] + assert results == [i + 1] * 4 + for chan in output_channels: + chan.end_read() + + with pytest.raises(ValueError): + for i in range(99): + output_channels = compiled_dag.execute(1) + for chan in output_channels: + chan.begin_read() + for chan in output_channels: + chan.end_read() + + compiled_dag.teardown() + + +def test_dag_fault_tolerance_sys_exit(ray_start_regular_shared): + actors = [Actor.remote(0, fail_after=100, sys_exit=True) for _ in range(4)] + with InputNode() as i: + out = [a.inc.bind(i) for a in actors] + dag = MultiOutputNode(out) + + compiled_dag = dag.experimental_compile() + + for i in range(99): + output_channels = compiled_dag.execute(1) + # TODO(swang): Replace with fake ObjectRef. + results = [chan.begin_read() for chan in output_channels] + assert results == [i + 1] * 4 + for chan in output_channels: + chan.end_read() + + with pytest.raises(RaySystemError, match="channel closed"): + for i in range(99): + output_channels = compiled_dag.execute(1) + for chan in output_channels: + chan.begin_read() + for chan in output_channels: + chan.end_read() + + +def test_dag_teardown_while_running(ray_start_regular_shared): + a = Actor.remote(0) + + with InputNode() as inp: + dag = a.sleep.bind(inp) + + compiled_dag = dag.experimental_compile() + chan = compiled_dag.execute(3) # 3-second slow task running async + compiled_dag.teardown() + try: + chan.begin_read() # Sanity check the channel doesn't block. + except Exception: + pass + + # Check we can still use the actor after first DAG teardown. + with InputNode() as inp: + dag = a.sleep.bind(inp) + + compiled_dag = dag.experimental_compile() + chan = compiled_dag.execute(0.1) + result = chan.begin_read() + assert result == 0.1 + chan.end_read() + + compiled_dag.teardown() + + if __name__ == "__main__": if os.environ.get("PARALLEL_CI"): sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) diff --git a/python/ray/experimental/channel.py b/python/ray/experimental/channel.py index d8443f6a9db1..42a781aafe64 100644 --- a/python/ray/experimental/channel.py +++ b/python/ray/experimental/channel.py @@ -159,3 +159,13 @@ def end_read(self): self._worker.core_worker.experimental_mutable_object_read_release( [self._base_ref] ) + + def close(self) -> None: + """ + Close this channel by setting the error bit on the object. + + Does not block. Any existing values in the channel may be lost after the + channel is closed. + """ + logger.debug(f"Setting error bit on channel: {self._base_ref}") + self._worker.core_worker.experimental_mutable_object_set_error(self._base_ref) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index de5dd651d557..5168eb4efa3f 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -252,6 +252,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: shared_ptr[CBuffer] *data) CRayStatus ExperimentalMutableObjectWriteRelease( const CObjectID &object_id) + CRayStatus ExperimentalMutableObjectSetError( + const CObjectID &object_id) CRayStatus SealOwned(const CObjectID &object_id, c_bool pin_object, const unique_ptr[CAddress] &owner_address) CRayStatus SealExisting(const CObjectID &object_id, c_bool pin_object, diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index eeb961dfa66a..6e8e11d36602 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1350,6 +1350,10 @@ Status CoreWorker::ExperimentalMutableObjectWriteRelease(const ObjectID &object_ return plasma_store_provider_->ExperimentalMutableObjectWriteRelease(object_id); } +Status CoreWorker::ExperimentalMutableObjectSetError(const ObjectID &object_id) { + return plasma_store_provider_->ExperimentalMutableObjectSetError(object_id); +} + Status CoreWorker::SealOwned(const ObjectID &object_id, bool pin_object, const std::unique_ptr &owner_address) { diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index e945555e550a..6b7542081e47 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -708,6 +708,12 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] object_id The ID of the object. Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + /// Experimental method for mutable objects. Sets the error bit, causing all + /// future readers and writers to raise an error on acquire. + /// + /// \param[in] object_id The ID of the object. + Status ExperimentalMutableObjectSetError(const ObjectID &object_id); + /// Experimental method for mutable objects. Releases the objects, allowing them /// to be written again. If the caller did not previously Get the objects, /// then this first blocks until the latest value is available to read, then diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.cc b/src/ray/core_worker/store_provider/plasma_store_provider.cc index d4c97b9e0ef0..25e45d0b6417 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.cc +++ b/src/ray/core_worker/store_provider/plasma_store_provider.cc @@ -128,6 +128,11 @@ Status CoreWorkerPlasmaStoreProvider::ExperimentalMutableObjectWriteRelease( return store_client_.ExperimentalMutableObjectWriteRelease(object_id); } +Status CoreWorkerPlasmaStoreProvider::ExperimentalMutableObjectSetError( + const ObjectID &object_id) { + return store_client_.ExperimentalMutableObjectSetError(object_id); +} + Status CoreWorkerPlasmaStoreProvider::Create(const std::shared_ptr &metadata, const size_t data_size, const ObjectID &object_id, diff --git a/src/ray/core_worker/store_provider/plasma_store_provider.h b/src/ray/core_worker/store_provider/plasma_store_provider.h index 365665562382..60c7a34ddaa9 100644 --- a/src/ray/core_worker/store_provider/plasma_store_provider.h +++ b/src/ray/core_worker/store_provider/plasma_store_provider.h @@ -207,6 +207,12 @@ class CoreWorkerPlasmaStoreProvider { /// \param[in] object_id The ID of the object. Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + /// Experimental method for mutable objects. Sets the error bit, causing all + /// future readers and writers to raise an error on acquire. + /// + /// \param[in] object_id The ID of the object. + Status ExperimentalMutableObjectSetError(const ObjectID &object_id); + /// Experimental method for mutable objects. Releases the objects, allowing them /// to be written again. If the caller did not previously Get the objects, /// then this first blocks until the latest value is available to read, then diff --git a/src/ray/object_manager/common.cc b/src/ray/object_manager/common.cc index 0ba2b37a13bb..2cfa276dfe21 100644 --- a/src/ray/object_manager/common.cc +++ b/src/ray/object_manager/common.cc @@ -26,19 +26,12 @@ void PlasmaObjectHeader::Init() { pthread_mutex_init(&wr_mut, &mutex_attr); sem_init(&rw_semaphore, PTHREAD_PROCESS_SHARED, 1); - - // Condition is shared between writer and readers. - pthread_condattr_t cond_attr; - pthread_condattr_init(&cond_attr); - pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED); - pthread_cond_init(&cond, &cond_attr); #endif } void PlasmaObjectHeader::Destroy() { #ifdef __linux__ RAY_CHECK(pthread_mutex_destroy(&wr_mut) == 0); - RAY_CHECK(pthread_cond_destroy(&cond) == 0); RAY_CHECK(sem_destroy(&rw_semaphore) == 0); #endif } @@ -57,25 +50,43 @@ void PrintPlasmaObjectHeader(const PlasmaObjectHeader *header) { << "metadata_size: " << header->metadata_size << "\n"; } -void PlasmaObjectHeader::WriteAcquire(int64_t write_version, - uint64_t write_data_size, - uint64_t write_metadata_size, - int64_t write_num_readers) { - RAY_LOG(DEBUG) << "WriteAcquire. version: " << write_version << ", data size " - << write_data_size << ", metadata size " << write_metadata_size - << ", num readers: " << write_num_readers; - sem_wait(&rw_semaphore); - RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); +Status PlasmaObjectHeader::TryAcquireWriterMutex() { + // Try to acquire the lock, checking every 1s for the error bit. + struct timespec ts; + do { + if (has_error) { + return Status::IOError("channel closed"); + } + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + } while (pthread_mutex_timedlock(&wr_mut, &ts)); + + return Status::OK(); +} + +Status PlasmaObjectHeader::WriteAcquire(uint64_t write_data_size, + uint64_t write_metadata_size, + int64_t write_num_readers) { + RAY_LOG(DEBUG) << "WriteAcquire. data size " << write_data_size << ", metadata size " + << write_metadata_size << ", num readers: " << write_num_readers; + + // Try to acquire the semaphore, checking every 1s for the error bit. + struct timespec ts; + do { + if (has_error) { + return Status::IOError("channel closed"); + } + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += 1; + } while (sem_timedwait(&rw_semaphore, &ts)); + + RAY_RETURN_NOT_OK(TryAcquireWriterMutex()); PrintPlasmaObjectHeader(this); RAY_CHECK(num_read_acquires_remaining == 0); RAY_CHECK(num_read_releases_remaining == 0); - RAY_CHECK(write_version == version + 1) - << "Write version " << write_version - << " is more than 1 greater than current version " << version - << ". Are you sure this is the only writer?"; - version = write_version; + version++; is_sealed = false; data_size = write_data_size; metadata_size = write_metadata_size; @@ -84,19 +95,15 @@ void PlasmaObjectHeader::WriteAcquire(int64_t write_version, RAY_LOG(DEBUG) << "WriteAcquire done"; PrintPlasmaObjectHeader(this); RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); + return Status::OK(); } -void PlasmaObjectHeader::WriteRelease(int64_t write_version) { - RAY_LOG(DEBUG) << "WriteRelease Waiting. version: " << write_version; - RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); - RAY_LOG(DEBUG) << "WriteRelease " << write_version; +Status PlasmaObjectHeader::WriteRelease() { + RAY_LOG(DEBUG) << "WriteRelease Waiting. version: " << version; + RAY_RETURN_NOT_OK(TryAcquireWriterMutex()); + RAY_LOG(DEBUG) << "WriteRelease " << version; PrintPlasmaObjectHeader(this); - RAY_CHECK(version == write_version) - << "Write version " << write_version << " no longer matches current version " - << version << ". Are you sure this is the only writer?"; - - version = write_version; is_sealed = true; RAY_CHECK(num_readers != 0) << num_readers; num_read_acquires_remaining = num_readers; @@ -105,19 +112,25 @@ void PlasmaObjectHeader::WriteRelease(int64_t write_version) { RAY_LOG(DEBUG) << "WriteRelease done, num_readers: " << num_readers; PrintPlasmaObjectHeader(this); RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); - // Signal to all readers. - RAY_CHECK(pthread_cond_broadcast(&cond) == 0); + return Status::OK(); } -bool PlasmaObjectHeader::ReadAcquire(int64_t version_to_read, int64_t *version_read) { +Status PlasmaObjectHeader::ReadAcquire(int64_t version_to_read, int64_t *version_read) { RAY_LOG(DEBUG) << "ReadAcquire waiting version " << version_to_read; - RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); + RAY_RETURN_NOT_OK(TryAcquireWriterMutex()); RAY_LOG(DEBUG) << "ReadAcquire " << version_to_read; PrintPlasmaObjectHeader(this); // Wait for the requested version (or a more recent one) to be sealed. + int tries = 0; while (version < version_to_read || !is_sealed) { - RAY_CHECK(pthread_cond_wait(&cond, &wr_mut) == 0); + RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); + // Lower values than 100k seem to start impacting perf compared + // to mutex. + if (tries++ > 100000) { + std::this_thread::yield(); // Too many tries, yield thread. + } + RAY_RETURN_NOT_OK(TryAcquireWriterMutex()); } bool success = false; @@ -145,15 +158,17 @@ bool PlasmaObjectHeader::ReadAcquire(int64_t version_to_read, int64_t *version_r PrintPlasmaObjectHeader(this); RAY_CHECK(pthread_mutex_unlock(&wr_mut) == 0); - // Signal to other readers that they may read. - RAY_CHECK(pthread_cond_signal(&cond) == 0); - return success; + if (!success) { + return Status::Invalid( + "Reader missed a value. Are you sure there are num_readers many readers?"); + } + return Status::OK(); } -void PlasmaObjectHeader::ReadRelease(int64_t read_version) { +Status PlasmaObjectHeader::ReadRelease(int64_t read_version) { bool all_readers_done = false; RAY_LOG(DEBUG) << "ReadRelease Waiting" << read_version; - RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0); + RAY_RETURN_NOT_OK(TryAcquireWriterMutex()); PrintPlasmaObjectHeader(this); RAY_LOG(DEBUG) << "ReadRelease " << read_version << " version is currently " << version; @@ -174,6 +189,7 @@ void PlasmaObjectHeader::ReadRelease(int64_t read_version) { if (all_readers_done) { sem_post(&rw_semaphore); } + return Status::OK(); } #endif diff --git a/src/ray/object_manager/common.h b/src/ray/object_manager/common.h index 793b46394a43..ecc13e25e5d3 100644 --- a/src/ray/object_manager/common.h +++ b/src/ray/object_manager/common.h @@ -55,8 +55,6 @@ struct PlasmaObjectHeader { // Protects all following state, used to signal from writer to readers. pthread_mutex_t wr_mut; - // Used to signal to readers when the writer is done writing a new version. - pthread_cond_t cond; // The object version. For immutable objects, this gets incremented to 1 on // the first write and then should never be modified. For mutable objects, // each new write must increment the version before releasing to readers. @@ -67,6 +65,9 @@ struct PlasmaObjectHeader { // has been WriteRelease'd. A reader may read the actual object value if // is_sealed=true and num_read_acquires_remaining != 0. bool is_sealed = false; + // Set to indicate an error was encountered computing the next version of + // the mutable object. Lockless access allowed. + volatile bool has_error = false; // The total number of reads allowed before the writer can write again. This // value should be set by the writer before releasing to readers. // For immutable objects, this is set to -1 and infinite reads are allowed. @@ -92,26 +93,18 @@ struct PlasmaObjectHeader { // different data/metadata size. uint64_t data_size = 0; uint64_t metadata_size = 0; - /// Blocks until all readers for the previous write have ReadRelease'd the - /// value. Protects against concurrent writers. Caller must pass consecutive - /// versions on each new write, starting with write_version=1. + /// value. Protects against concurrent writers. /// - /// \param write_version The new version for write. /// \param data_size The new data size of the object. /// \param metadata_size The new metadata size of the object. /// \param num_readers The number of readers for the object. - void WriteAcquire(int64_t write_version, - uint64_t data_size, - uint64_t metadata_size, - int64_t num_readers); + /// \return if the acquire was successful. + Status WriteAcquire(uint64_t data_size, uint64_t metadata_size, int64_t num_readers); /// Call after completing a write to signal that readers may read. /// num_readers should be set before calling this. - /// - /// \param write_version The new version for write. This must match the - /// version previously passed to WriteAcquire. - void WriteRelease(int64_t write_version); + Status WriteRelease(); // Blocks until the given version is ready to read. Returns false if the // maximum number of readers have already read the requested version. @@ -119,16 +112,16 @@ struct PlasmaObjectHeader { // \param[in] read_version The version to read. // \param[out] version_read For normal immutable objects, this will be set to // 0. Otherwise, the current version. - // \return success Whether the correct version was read and there were still + // \return Whether the correct version was read and there were still // reads remaining. - bool ReadAcquire(int64_t version_to_read, int64_t *version_read); + Status ReadAcquire(int64_t version_to_read, int64_t *version_read); // Finishes the read. If all reads are done, signals to the writer. This is // not necessary to call for objects that have num_readers=-1. /// /// \param read_version This must match the version previously passed in /// ReadAcquire. - void ReadRelease(int64_t read_version); + Status ReadRelease(int64_t read_version); #endif /// Setup synchronization primitives. @@ -136,6 +129,18 @@ struct PlasmaObjectHeader { /// Destroy synchronization primitives. void Destroy(); + + /// Helper method to acquire the writer mutex while aborting if the + /// error bit is set. + /// \return if the mutex was acquired successfully. + Status TryAcquireWriterMutex(); + + /// Set the error bit. This is a non-blocking method. + void SetErrorUnlocked() { +#ifdef __linux__ + has_error = true; +#endif + } }; /// A struct that includes info about the object. diff --git a/src/ray/object_manager/plasma/client.cc b/src/ray/object_manager/plasma/client.cc index 1af147d4f1e2..0aa5da14dbf5 100644 --- a/src/ray/object_manager/plasma/client.cc +++ b/src/ray/object_manager/plasma/client.cc @@ -110,9 +110,6 @@ struct ObjectInUseEntry { /// objects, ReadRelease resets this to false, and ReadAcquire resets to /// true. bool read_acquired = false; - /// The last version that we wrote. To write again, we must pass a newer - /// version than this. - int64_t next_version_to_write = 1; }; class PlasmaClient::Impl : public std::enable_shared_from_this { @@ -164,6 +161,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this &object_ids, int64_t timeout_ms, std::vector *object_buffers, @@ -445,8 +444,7 @@ Status PlasmaClient::Impl::ExperimentalMutableObjectWriteAcquire( ") is larger than allocated buffer size " + std::to_string(entry->object.allocated_size)); } - plasma_header->WriteAcquire( - entry->next_version_to_write, data_size, metadata_size, num_readers); + RAY_RETURN_NOT_OK(plasma_header->WriteAcquire(data_size, metadata_size, num_readers)); // Prepare the data buffer and return to the client instead of sending // the IPC to object store. @@ -474,10 +472,6 @@ Status PlasmaClient::Impl::ExperimentalMutableObjectWriteRelease( return Status::Invalid( "Plasma buffer for mutable object not in scope. Are you sure you're the writer?"); } - if (!object_entry->second->is_writer) { - return Status::Invalid( - "Mutable objects can only be written by the original creator process."); - } RAY_CHECK(object_entry != objects_in_use_.end()); auto &entry = object_entry->second; @@ -487,10 +481,25 @@ Status PlasmaClient::Impl::ExperimentalMutableObjectWriteRelease( entry->is_sealed = true; auto plasma_header = GetPlasmaObjectHeader(entry->object); - plasma_header->WriteRelease( - /*write_version=*/entry->next_version_to_write); - // The next Write must pass a higher version. - entry->next_version_to_write++; + RAY_RETURN_NOT_OK(plasma_header->WriteRelease()); +#endif + return Status::OK(); +} + +Status PlasmaClient::Impl::ExperimentalMutableObjectSetError(const ObjectID &object_id) { +#ifdef __linux__ + std::unique_lock guard(client_mutex_); + auto object_entry = objects_in_use_.find(object_id); + if (object_entry == objects_in_use_.end()) { + return Status::Invalid( + "Plasma buffer for mutable object not in scope. Are you sure you're the writer?"); + } + RAY_CHECK(object_entry != objects_in_use_.end()); + + auto &entry = object_entry->second; + RAY_CHECK(entry->object.is_experimental_mutable_object); + auto plasma_header = GetPlasmaObjectHeader(entry->object); + plasma_header->SetErrorUnlocked(); #endif return Status::OK(); } @@ -751,12 +760,14 @@ Status PlasmaClient::Impl::EnsureGetAcquired( } int64_t version_read = 0; - bool success = + + // Need to unlock the client mutex since ReadAcquire() is blocking. This is + // thread-safe since mutable plasma object are never deallocated. + client_mutex_.unlock(); + Status status = plasma_header->ReadAcquire(object_entry->next_version_to_read, &version_read); - if (!success) { - return Status::Invalid( - "Reader missed a value. Are you sure there are num_readers many readers?"); - } + client_mutex_.lock(); + RAY_RETURN_NOT_OK(status); object_entry->read_acquired = true; RAY_CHECK(version_read > 0); @@ -798,7 +809,7 @@ Status PlasmaClient::Impl::ExperimentalMutableObjectReadRelease( RAY_RETURN_NOT_OK(EnsureGetAcquired(entry)); RAY_LOG(DEBUG) << "Release shared object " << object_id; auto plasma_header = GetPlasmaObjectHeader(entry->object); - plasma_header->ReadRelease(entry->next_version_to_read); + RAY_RETURN_NOT_OK(plasma_header->ReadRelease(entry->next_version_to_read)); // The next read needs to read at least this version. entry->next_version_to_read++; entry->read_acquired = false; @@ -1071,6 +1082,10 @@ Status PlasmaClient::ExperimentalMutableObjectWriteRelease(const ObjectID &objec return impl_->ExperimentalMutableObjectWriteRelease(object_id); } +Status PlasmaClient::ExperimentalMutableObjectSetError(const ObjectID &object_id) { + return impl_->ExperimentalMutableObjectSetError(object_id); +} + Status PlasmaClient::CreateAndSpillIfNeeded(const ObjectID &object_id, const ray::rpc::Address &owner_address, bool is_experimental_mutable_object, diff --git a/src/ray/object_manager/plasma/client.h b/src/ray/object_manager/plasma/client.h index d50d1b8c5de9..fe7e0ac48ad8 100644 --- a/src/ray/object_manager/plasma/client.h +++ b/src/ray/object_manager/plasma/client.h @@ -94,7 +94,6 @@ class PlasmaClientInterface { /// \param[in] metadata_size The number of bytes to copy from the metadata /// pointer. /// \param[in] num_readers The number of readers that must read and release - /// the object before the caller can write again. /// \param[out] data The mutable object buffer in plasma that can be written to. virtual Status ExperimentalMutableObjectWriteAcquire(const ObjectID &object_id, int64_t data_size, @@ -110,6 +109,12 @@ class PlasmaClientInterface { /// \param[in] object_id The ID of the object. virtual Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id) = 0; + /// Experimental method for mutable objects. Sets the error bit, causing all + /// future readers and writers to raise an error on acquire. + /// + /// \param[in] object_id The ID of the object. + virtual Status ExperimentalMutableObjectSetError(const ObjectID &object_id) = 0; + /// Experimental method for mutable objects. Releases the objects, allowing them /// to be written again. If the caller did not previously Get the objects, /// then this first blocks until the latest value is available to read, then @@ -247,6 +252,8 @@ class PlasmaClient : public PlasmaClientInterface { Status ExperimentalMutableObjectWriteRelease(const ObjectID &object_id); + Status ExperimentalMutableObjectSetError(const ObjectID &object_id); + Status ExperimentalMutableObjectReadRelease(const ObjectID &object_id); /// Create an object in the Plasma Store. Any metadata for this object must be diff --git a/src/ray/object_manager/test/object_buffer_pool_test.cc b/src/ray/object_manager/test/object_buffer_pool_test.cc index 249f8bda3b3a..72314a6b6e88 100644 --- a/src/ray/object_manager/test/object_buffer_pool_test.cc +++ b/src/ray/object_manager/test/object_buffer_pool_test.cc @@ -62,6 +62,8 @@ class MockPlasmaClient : public plasma::PlasmaClientInterface { MOCK_METHOD1(ExperimentalMutableObjectWriteRelease, ray::Status(const ObjectID &object_id)); + MOCK_METHOD1(ExperimentalMutableObjectSetError, ray::Status(const ObjectID &object_id)); + MOCK_METHOD1(ExperimentalMutableObjectReadRelease, ray::Status(const ObjectID &object_id));