Skip to content

Commit

Permalink
[core] make ProcessFD, [Client,Server]Connection non copyable. (#41106)
Browse files Browse the repository at this point in the history
These classes carry system resources (FDs) and are not meant to be copied (and dup'd) so to avoid accidental copy we delete the copy constructors.

also removed a dynamic_pointer_cast since it's an upcast so static suffices.
  • Loading branch information
rynewang authored Nov 17, 2023
1 parent ca29fec commit 8209893
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 45 deletions.
6 changes: 6 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Status ConnectSocketRetry(local_stream_socket &socket,
/// can be used to write messages synchronously to the server.
class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
public:
ServerConnection(const ServerConnection &) = delete;
ServerConnection &operator=(const ServerConnection &) = delete;

/// ServerConnection destructor.
virtual ~ServerConnection();

Expand Down Expand Up @@ -187,6 +190,9 @@ class ClientConnection : public ServerConnection {
public:
using std::enable_shared_from_this<ServerConnection>::shared_from_this;

ClientConnection(const ClientConnection &) = delete;
ClientConnection &operator=(const ClientConnection &) = delete;

/// Allocate a new node client connection.
///
/// \param new_client_handler A reference to the client handler.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
worker_type == rpc::WorkerType::RESTORE_WORKER) {
RAY_CHECK(job_id.IsNil());
}
auto worker = std::dynamic_pointer_cast<WorkerInterface>(
auto worker = std::static_pointer_cast<WorkerInterface>(
std::make_shared<Worker>(job_id,
runtime_env_hash,
worker_id,
Expand Down
56 changes: 12 additions & 44 deletions src/ray/util/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ class ProcessFD {
~ProcessFD();
ProcessFD();
ProcessFD(pid_t pid, intptr_t fd = -1);
ProcessFD(const ProcessFD &other);
ProcessFD(ProcessFD &&other);
ProcessFD &operator=(const ProcessFD &other);
ProcessFD &operator=(ProcessFD &&other);
intptr_t CloneFD() const;

ProcessFD(const ProcessFD &other) = delete;
ProcessFD &operator=(const ProcessFD &other) = delete;

void CloseFD();
intptr_t GetFD() const;
pid_t GetId() const;
Expand Down Expand Up @@ -217,17 +218,7 @@ class ProcessFD {
}
};

ProcessFD::~ProcessFD() {
if (fd_ != -1) {
bool success;
#ifdef _WIN32
success = !!CloseHandle(reinterpret_cast<HANDLE>(fd_));
#else
success = close(static_cast<int>(fd_)) == 0;
#endif
RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD";
}
}
ProcessFD::~ProcessFD() { CloseFD(); }

ProcessFD::ProcessFD() : pid_(-1), fd_(-1) {}

Expand Down Expand Up @@ -277,18 +268,8 @@ ProcessFD::ProcessFD(pid_t pid, intptr_t fd) : pid_(pid), fd_(fd) {
}
}

ProcessFD::ProcessFD(const ProcessFD &other) : ProcessFD(other.pid_, other.CloneFD()) {}

ProcessFD::ProcessFD(ProcessFD &&other) : ProcessFD() { *this = std::move(other); }

ProcessFD &ProcessFD::operator=(const ProcessFD &other) {
if (this != &other) {
// Construct a copy, then call the move constructor
*this = static_cast<ProcessFD>(other);
}
return *this;
}

ProcessFD &ProcessFD::operator=(ProcessFD &&other) {
if (this != &other) {
// We use swap() to make sure the argument is actually moved from
Expand All @@ -299,32 +280,19 @@ ProcessFD &ProcessFD::operator=(ProcessFD &&other) {
return *this;
}

intptr_t ProcessFD::CloneFD() const {
intptr_t fd;
void ProcessFD::CloseFD() {
if (fd_ != -1) {
bool success;
#ifdef _WIN32
HANDLE handle;
BOOL inheritable = FALSE;
fd = DuplicateHandle(GetCurrentProcess(),
reinterpret_cast<HANDLE>(fd_),
GetCurrentProcess(),
&handle,
0,
inheritable,
DUPLICATE_SAME_ACCESS)
? reinterpret_cast<intptr_t>(handle)
: -1;
success = !!CloseHandle(reinterpret_cast<HANDLE>(fd_));
#else
fd = dup(static_cast<int>(fd_));
success = close(static_cast<int>(fd_)) == 0;
#endif
RAY_DCHECK(fd != -1);
} else {
fd = -1;
RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD";
}
return fd;
}

void ProcessFD::CloseFD() { fd_ = -1; }
fd_ = -1;
}

intptr_t ProcessFD::GetFD() const { return fd_; }

Expand Down

0 comments on commit 8209893

Please sign in to comment.