Skip to content

Commit

Permalink
comments 1
Browse files Browse the repository at this point in the history
Signed-off-by: Eric Liang <ekhliang@gmail.com>
  • Loading branch information
ericl committed Dec 15, 2023
1 parent 13e8bd7 commit b2e493b
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 41 deletions.
2 changes: 1 addition & 1 deletion python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3518,7 +3518,7 @@ cdef class CoreWorker:
def experimental_mutable_object_put_serialized(self, serialized_object,
ObjectRef object_ref,
num_readers,
try_wait=False
try_wait: bool=False
):
cdef:
CObjectID c_object_id = object_ref.native()
Expand Down
10 changes: 5 additions & 5 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def do_cancel_compiled_task(self):
cause=TaskCancelledError(),
)
for channel in self._input_channels:
channel.set_error(e)
channel.unblock_readers_with_error(e)


@DeveloperAPI
Expand Down Expand Up @@ -405,8 +405,8 @@ def teardown(self):
logger.info(f"Cancelling compiled worker on actor: {actor}")
try:
ray.get(actor.__ray_call__.remote(do_cancel_compiled_task))
except Exception as e:
logger.info(f"Error cancelling worker task: {e}")
except Exception:
logger.exception("Error cancelling worker task")
pass
logger.info("Teardown complete")

Expand All @@ -418,9 +418,9 @@ def run(self):
return
if isinstance(outer.dag_output_channels, list):
for output_channel in outer.dag_output_channels:
output_channel.set_error(e)
output_channel.unblock_readers_with_error(e)
else:
outer.dag_output_channels.set_error(e)
outer.dag_output_channels.unblock_readers_with_error(e)
self.teardown()

monitor = Monitor()
Expand Down
11 changes: 8 additions & 3 deletions python/ray/experimental/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ def end_read(self):
[self._base_ref]
)

def set_error(self, e: Exception) -> None:
def unblock_readers_with_error(self, e: Exception) -> None:
"""
Shutdown the channel with the specified error object. New readers will see
the error raised when they try to read from the channel.
If readers are blocked on the channel, shut it down by writing an error
object to the channel.
Does not block.
Expand All @@ -174,13 +174,18 @@ def set_error(self, e: Exception) -> None:
logger.debug(f"Writing error to channel: {self._base_ref}: {e}")
serialized_exc = self._worker.get_serialization_context().serialize(e)
try:
# Write an error if a reader is blocked. If a value is already available,
# no need to write anything.
self._worker.core_worker.experimental_mutable_object_put_serialized(
serialized_exc,
self._base_ref,
num_readers=1,
try_wait=True,
)
except Exception as e:
# If we get a write acquire failed error, that's expected since it means
# no reader is currently blocked for this channel.
# Raise other types of errors encountered.
if not _is_write_acquire_failed_error(e):
logger.exception("Error setting error on channel")
raise
Expand Down
15 changes: 5 additions & 10 deletions src/ray/object_manager/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ void PrintPlasmaObjectHeader(const PlasmaObjectHeader *header) {
<< "metadata_size: " << header->metadata_size << "\n";
}

bool PlasmaObjectHeader::WriteAcquire(int64_t write_version,
uint64_t write_data_size,
bool PlasmaObjectHeader::WriteAcquire(uint64_t write_data_size,
uint64_t write_metadata_size,
int64_t write_num_readers,
bool try_wait) {
auto write_version = version + 1;
RAY_LOG(DEBUG) << "WriteAcquire. version: " << write_version << ", data size "
<< write_data_size << ", metadata size " << write_metadata_size
<< ", num readers: " << write_num_readers << ", try_wait: " << try_wait;
Expand Down Expand Up @@ -91,17 +91,12 @@ bool PlasmaObjectHeader::WriteAcquire(int64_t write_version,
return true;
}

void PlasmaObjectHeader::WriteRelease(int64_t write_version) {
RAY_LOG(DEBUG) << "WriteRelease Waiting. version: " << write_version;
void PlasmaObjectHeader::WriteRelease() {
RAY_LOG(DEBUG) << "WriteRelease Waiting. version: " << version;
RAY_CHECK(pthread_mutex_lock(&wr_mut) == 0);
RAY_LOG(DEBUG) << "WriteRelease " << write_version;
RAY_LOG(DEBUG) << "WriteRelease " << version;
PrintPlasmaObjectHeader(this);

RAY_CHECK(version == write_version)
<< "Write version " << write_version << " no longer matches current version "
<< version << ". Are you sure this is the only writer?";

version = write_version;
is_sealed = true;
RAY_CHECK(num_readers != 0) << num_readers;
num_read_acquires_remaining = num_readers;
Expand Down
12 changes: 3 additions & 9 deletions src/ray/object_manager/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,27 +94,21 @@ struct PlasmaObjectHeader {
uint64_t metadata_size = 0;

/// Blocks until all readers for the previous write have ReadRelease'd the
/// value. Protects against concurrent writers. Caller must pass consecutive
/// versions on each new write, starting with write_version=1.
/// value. Protects against concurrent writers.
///
/// \param write_version The new version for write.
/// \param data_size The new data size of the object.
/// \param metadata_size The new metadata size of the object.
/// \param num_readers The number of readers for the object.
/// \param try_wait Whether to fail the acquire if this would block.
/// \return true if the acquire was successful.
bool WriteAcquire(int64_t write_version,
uint64_t data_size,
bool WriteAcquire(uint64_t data_size,
uint64_t metadata_size,
int64_t num_readers,
bool try_wait);

/// Call after completing a write to signal that readers may read.
/// num_readers should be set before calling this.
///
/// \param write_version The new version for write. This must match the
/// version previously passed to WriteAcquire.
void WriteRelease(int64_t write_version);
void WriteRelease();

// Blocks until the given version is ready to read. Returns false if the
// maximum number of readers have already read the requested version.
Expand Down
18 changes: 5 additions & 13 deletions src/ray/object_manager/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,6 @@ struct ObjectInUseEntry {
/// objects, ReadRelease resets this to false, and ReadAcquire resets to
/// true.
bool read_acquired = false;
/// The last version that we wrote. To write again, we must pass a newer
/// version than this.
int64_t next_version_to_write = 1;
};

class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Impl> {
Expand Down Expand Up @@ -447,10 +444,7 @@ Status PlasmaClient::Impl::ExperimentalMutableObjectWriteAcquire(
") is larger than allocated buffer size " +
std::to_string(entry->object.allocated_size));
}
// TODO(ekl) should we just only track the version in the plasma header?
entry->next_version_to_write = plasma_header->version + 1;
if (!plasma_header->WriteAcquire(entry->next_version_to_write,
data_size,
if (!plasma_header->WriteAcquire(data_size,
metadata_size,
num_readers,
try_wait)) {
Expand Down Expand Up @@ -492,10 +486,7 @@ Status PlasmaClient::Impl::ExperimentalMutableObjectWriteRelease(

entry->is_sealed = true;
auto plasma_header = GetPlasmaObjectHeader(entry->object);
plasma_header->WriteRelease(
/*write_version=*/entry->next_version_to_write);
// The next Write must pass a higher version.
entry->next_version_to_write++;
plasma_header->WriteRelease();
#endif
return Status::OK();
}
Expand Down Expand Up @@ -757,8 +748,9 @@ Status PlasmaClient::Impl::EnsureGetAcquired(

int64_t version_read = 0;

// Need to unlock the client mutex since ReadAcquire() is blocking.
// TODO(ekl) is this entirely thread-safe?
// Need to unlock the client mutex since ReadAcquire() is blocking. This is
// thread-safe since the plasma object cannot be deallocated while there is a
// reader active.
client_mutex_.unlock();
bool success =
plasma_header->ReadAcquire(object_entry->next_version_to_read, &version_read);
Expand Down

0 comments on commit b2e493b

Please sign in to comment.