From 2029d367a60c0e66b1a6c61ff50489f56a646a17 Mon Sep 17 00:00:00 2001 From: Mengjin Yan Date: Fri, 8 Nov 2024 14:43:11 -0800 Subject: [PATCH] Update the code to throw system exception upon IOError in free_objects Signed-off-by: Mengjin Yan --- python/ray/_raylet.pyx | 9 ++++++--- src/ray/raylet_client/raylet_client.cc | 9 +++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index fa49a88c691f..0b0136162ea8 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -3829,10 +3829,13 @@ cdef class CoreWorker: def free_objects(self, object_refs, c_bool local_only): cdef: c_vector[CObjectID] free_ids = ObjectRefsToVector(object_refs) - + with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().Delete( - free_ids, local_only)) + status = CCoreWorkerProcess.GetCoreWorker().Delete(free_ids, local_only) + if status.IsIOError(): + check_status(CRayStatus.UnexpectedSystemExit(status.ToString())) + else: + check_status(status) def get_local_ongoing_lineage_reconstruction_tasks(self): cdef: diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index e2ac08d5d363..35a391720788 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -85,8 +85,7 @@ Status raylet::RayletConnection::AtomicRequestReply(MessageType request_type, } void raylet::RayletConnection::ShutdownIfLocalRayletDisconnected(const Status &status) { - if ((!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) || - status.IsIOError()) { + if (!status.ok() && IsRayletFailed(RayConfig::instance().RAYLET_PID())) { RAY_LOG(WARNING) << "The connection is failed because the local raylet has been " "dead or is unreachable. Terminate the process. Status: " << status; @@ -184,6 +183,12 @@ Status raylet::RayletClient::Disconnect( auto status = conn_->WriteMessage(MessageType::DisconnectClient, &fbb); // Don't be too strict for disconnection errors. // Just create logs and prevent it from crash. + // TODO (myan): In the current implementation, if raylet is already terminated in the + // "WriteMessage" function above, the worker process will exit early in the function + // and will not reach here. However, the code path here is shared between graceful + // shutdown and force termination. We need to make sure the above early exit + // shouldn't happen during the graceful shutdown scenario and there shouldn't be any + // leak if early exit is triggered if (!status.ok()) { RAY_LOG(WARNING) << status.ToString()