Skip to content

Commit

Permalink
[core] make ProcessFD, [Client,Server]Connection non copyable. (ray-p…
Browse files Browse the repository at this point in the history
…roject#41106)

These classes carry system resources (FDs) and are not meant to be copied (and dup'd) so to avoid accidental copy we delete the copy constructors.

also removed a dynamic_pointer_cast since it's an upcast so static suffices.
  • Loading branch information
rynewang authored and ujjawal-khare committed Nov 29, 2023
1 parent f608186 commit e23b4c3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 45 deletions.
6 changes: 6 additions & 0 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Status ConnectSocketRetry(local_stream_socket &socket,
/// can be used to write messages synchronously to the server.
class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
public:
ServerConnection(const ServerConnection &) = delete;
ServerConnection &operator=(const ServerConnection &) = delete;

/// ServerConnection destructor.
virtual ~ServerConnection();

Expand Down Expand Up @@ -187,6 +190,9 @@ class ClientConnection : public ServerConnection {
public:
using std::enable_shared_from_this<ServerConnection>::shared_from_this;

ClientConnection(const ClientConnection &) = delete;
ClientConnection &operator=(const ClientConnection &) = delete;

/// Allocate a new node client connection.
///
/// \param new_client_handler A reference to the client handler.
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1297,7 +1297,7 @@ void NodeManager::ProcessRegisterClientRequestMessage(
worker_type == rpc::WorkerType::RESTORE_WORKER) {
RAY_CHECK(job_id.IsNil());
}
auto worker = std::dynamic_pointer_cast<WorkerInterface>(
auto worker = std::static_pointer_cast<WorkerInterface>(
std::make_shared<Worker>(job_id,
runtime_env_hash,
worker_id,
Expand Down
56 changes: 12 additions & 44 deletions src/ray/util/process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ class ProcessFD {
~ProcessFD();
ProcessFD();
ProcessFD(pid_t pid, intptr_t fd = -1);
ProcessFD(const ProcessFD &other);
ProcessFD(ProcessFD &&other);
ProcessFD &operator=(const ProcessFD &other);
ProcessFD &operator=(ProcessFD &&other);
intptr_t CloneFD() const;

ProcessFD(const ProcessFD &other) = delete;
ProcessFD &operator=(const ProcessFD &other) = delete;

void CloseFD();
intptr_t GetFD() const;
pid_t GetId() const;
Expand Down Expand Up @@ -217,17 +218,7 @@ class ProcessFD {
}
};

ProcessFD::~ProcessFD() {
if (fd_ != -1) {
bool success;
#ifdef _WIN32
success = !!CloseHandle(reinterpret_cast<HANDLE>(fd_));
#else
success = close(static_cast<int>(fd_)) == 0;
#endif
RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD";
}
}
ProcessFD::~ProcessFD() { CloseFD(); }

ProcessFD::ProcessFD() : pid_(-1), fd_(-1) {}

Expand Down Expand Up @@ -277,18 +268,8 @@ ProcessFD::ProcessFD(pid_t pid, intptr_t fd) : pid_(pid), fd_(fd) {
}
}

ProcessFD::ProcessFD(const ProcessFD &other) : ProcessFD(other.pid_, other.CloneFD()) {}

ProcessFD::ProcessFD(ProcessFD &&other) : ProcessFD() { *this = std::move(other); }

ProcessFD &ProcessFD::operator=(const ProcessFD &other) {
if (this != &other) {
// Construct a copy, then call the move constructor
*this = static_cast<ProcessFD>(other);
}
return *this;
}

ProcessFD &ProcessFD::operator=(ProcessFD &&other) {
if (this != &other) {
// We use swap() to make sure the argument is actually moved from
Expand All @@ -299,32 +280,19 @@ ProcessFD &ProcessFD::operator=(ProcessFD &&other) {
return *this;
}

intptr_t ProcessFD::CloneFD() const {
intptr_t fd;
void ProcessFD::CloseFD() {
if (fd_ != -1) {
bool success;
#ifdef _WIN32
HANDLE handle;
BOOL inheritable = FALSE;
fd = DuplicateHandle(GetCurrentProcess(),
reinterpret_cast<HANDLE>(fd_),
GetCurrentProcess(),
&handle,
0,
inheritable,
DUPLICATE_SAME_ACCESS)
? reinterpret_cast<intptr_t>(handle)
: -1;
success = !!CloseHandle(reinterpret_cast<HANDLE>(fd_));
#else
fd = dup(static_cast<int>(fd_));
success = close(static_cast<int>(fd_)) == 0;
#endif
RAY_DCHECK(fd != -1);
} else {
fd = -1;
RAY_CHECK(success) << "error " << errno << " closing process " << pid_ << " FD";
}
return fd;
}

void ProcessFD::CloseFD() { fd_ = -1; }
fd_ = -1;
}

intptr_t ProcessFD::GetFD() const { return fd_; }

Expand Down

0 comments on commit e23b4c3

Please sign in to comment.