From 9b1db158679effa177a6ff3a6db26bd05cc8305e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 13 Nov 2023 15:34:12 -0800 Subject: [PATCH 1/3] make ProcessFD non copyable. Signed-off-by: Ruiyang Wang --- src/ray/raylet/node_manager.cc | 2 +- src/ray/raylet/worker_pool.cc | 1 + src/ray/util/process.cc | 56 ++++++++-------------------------- 3 files changed, 14 insertions(+), 45 deletions(-) diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7d4a40b50c24..cb99b7d040e4 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1297,7 +1297,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( worker_type == rpc::WorkerType::RESTORE_WORKER) { RAY_CHECK(job_id.IsNil()); } - auto worker = std::dynamic_pointer_cast( + auto worker = std::static_pointer_cast( std::make_shared(job_id, runtime_env_hash, worker_id, diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index eb114c0b0501..13eb623ee3bb 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -153,6 +153,7 @@ WorkerPool::~WorkerPool() { procs_to_kill.insert(worker_process.second.proc); } } + // TODO: do something for this copy for (Process proc : procs_to_kill) { proc.Kill(); // NOTE: Avoid calling Wait() here. It fails with ECHILD, as SIGCHLD is disabled. diff --git a/src/ray/util/process.cc b/src/ray/util/process.cc index 34be370198d4..de29aadd6b08 100644 --- a/src/ray/util/process.cc +++ b/src/ray/util/process.cc @@ -88,11 +88,12 @@ class ProcessFD { ~ProcessFD(); ProcessFD(); ProcessFD(pid_t pid, intptr_t fd = -1); - ProcessFD(const ProcessFD &other); ProcessFD(ProcessFD &&other); - ProcessFD &operator=(const ProcessFD &other); ProcessFD &operator=(ProcessFD &&other); - intptr_t CloneFD() const; + + ProcessFD(const ProcessFD &other) = delete; + ProcessFD &operator=(const ProcessFD &other) = delete; + void CloseFD(); intptr_t GetFD() const; pid_t GetId() const; @@ -217,17 +218,7 @@ class ProcessFD { } }; -ProcessFD::~ProcessFD() { - if (fd_ != -1) { - bool success; -#ifdef _WIN32 - success = !!CloseHandle(reinterpret_cast(fd_)); -#else - success = close(static_cast(fd_)) == 0; -#endif - RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD"; - } -} +ProcessFD::~ProcessFD() { CloseFD(); } ProcessFD::ProcessFD() : pid_(-1), fd_(-1) {} @@ -277,18 +268,8 @@ ProcessFD::ProcessFD(pid_t pid, intptr_t fd) : pid_(pid), fd_(fd) { } } -ProcessFD::ProcessFD(const ProcessFD &other) : ProcessFD(other.pid_, other.CloneFD()) {} - ProcessFD::ProcessFD(ProcessFD &&other) : ProcessFD() { *this = std::move(other); } -ProcessFD &ProcessFD::operator=(const ProcessFD &other) { - if (this != &other) { - // Construct a copy, then call the move constructor - *this = static_cast(other); - } - return *this; -} - ProcessFD &ProcessFD::operator=(ProcessFD &&other) { if (this != &other) { // We use swap() to make sure the argument is actually moved from @@ -299,32 +280,19 @@ ProcessFD &ProcessFD::operator=(ProcessFD &&other) { return *this; } -intptr_t ProcessFD::CloneFD() const { - intptr_t fd; +void ProcessFD::CloseFD() { if (fd_ != -1) { + bool success; #ifdef _WIN32 - HANDLE handle; - BOOL inheritable = FALSE; - fd = DuplicateHandle(GetCurrentProcess(), - reinterpret_cast(fd_), - GetCurrentProcess(), - &handle, - 0, - inheritable, - DUPLICATE_SAME_ACCESS) - ? reinterpret_cast(handle) - : -1; + success = !!CloseHandle(reinterpret_cast(fd_)); #else - fd = dup(static_cast(fd_)); + success = close(static_cast(fd_)) == 0; #endif - RAY_DCHECK(fd != -1); - } else { - fd = -1; + RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD"; } - return fd; -} -void ProcessFD::CloseFD() { fd_ = -1; } + fd_ = -1; +} intptr_t ProcessFD::GetFD() const { return fd_; } From c2622f8cc9f48117df68c6e406a4576d5d31229e Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 13 Nov 2023 16:23:49 -0800 Subject: [PATCH 2/3] also make [Client,Server]Connection non copyable Signed-off-by: Ruiyang Wang --- src/ray/common/client_connection.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/ray/common/client_connection.h b/src/ray/common/client_connection.h index 89d30fbbcdbc..ec77edd79248 100644 --- a/src/ray/common/client_connection.h +++ b/src/ray/common/client_connection.h @@ -44,6 +44,9 @@ Status ConnectSocketRetry(local_stream_socket &socket, /// can be used to write messages synchronously to the server. class ServerConnection : public std::enable_shared_from_this { public: + ServerConnection(const ServerConnection &) = delete; + ServerConnection &operator=(const ServerConnection &) = delete; + /// ServerConnection destructor. virtual ~ServerConnection(); @@ -187,6 +190,9 @@ class ClientConnection : public ServerConnection { public: using std::enable_shared_from_this::shared_from_this; + ClientConnection(const ClientConnection &) = delete; + ClientConnection &operator=(const ClientConnection &) = delete; + /// Allocate a new node client connection. /// /// \param new_client_handler A reference to the client handler. From e31e7613777b9510897927da97e91d86971b1800 Mon Sep 17 00:00:00 2001 From: Ruiyang Wang Date: Mon, 13 Nov 2023 16:26:58 -0800 Subject: [PATCH 3/3] remove the todo Signed-off-by: Ruiyang Wang --- src/ray/raylet/worker_pool.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 13eb623ee3bb..eb114c0b0501 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -153,7 +153,6 @@ WorkerPool::~WorkerPool() { procs_to_kill.insert(worker_process.second.proc); } } - // TODO: do something for this copy for (Process proc : procs_to_kill) { proc.Kill(); // NOTE: Avoid calling Wait() here. It fails with ECHILD, as SIGCHLD is disabled.