Skip to content

Commit

Permalink
Update the code to throw system exception upon IOError in free_objects
Browse files Browse the repository at this point in the history
Signed-off-by: Mengjin Yan <mengjinyan3@gmail.com>
  • Loading branch information
MengjinYan committed Nov 8, 2024
1 parent 9d57b29 commit 2029d36
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
9 changes: 6 additions & 3 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3829,10 +3829,13 @@ cdef class CoreWorker:
def free_objects(self, object_refs, c_bool local_only):
cdef:
c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs)

with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().Delete(
free_ids, local_only))
status = CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only)
if status.IsIOError():
check_status(CRayStatus.UnexpectedSystemExit(status.ToString()))
else:
check_status(status)

def get_local_ongoing_lineage_reconstruction_tasks(self):
cdef:
Expand Down
9 changes: 7 additions & 2 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ Status raylet::RayletConnection::AtomicRequestReply(MessageType request_type,
}

void raylet::RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) {
if ((!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) ||
status.IsIOError()) {
if (!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) {
RAY_LOG(WARNING) << "The connection is failed because the local raylet has been "
"dead or is unreachable. Terminate the process. Status: "
<< status;
Expand Down Expand Up @@ -184,6 +183,12 @@ Status raylet::RayletClient::Disconnect(
auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb);
// Don't be too strict for disconnection errors.
// Just create logs and prevent it from crash.
// TODO (myan): In the current implementation, if raylet is already terminated in the
// "WriteMessage" function above, the worker process will exit early in the function
// and will not reach here. However, the code path here is shared between graceful
// shutdown and force termination. We need to make sure the above early exit
// shouldn't happen during the graceful shutdown scenario and there shouldn't be any
// leak if early exit is triggered
if (!status.ok()) {
RAY_LOG(WARNING)
<< status.ToString()
Expand Down

0 comments on commit 2029d36

Please sign in to comment.