Skip to content

Commit

Permalink
ARROW-16799: [C++] Create a self-pipe abstraction (#13354)
Browse files Browse the repository at this point in the history
Also create a FileDescriptor RAII wrapper to automate the chore of closing file descriptors.

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Yibo Cai <yibo.cai@arm.com>
  • Loading branch information
pitrou committed Jun 10, 2022
1 parent 452f11c commit 1de30af
Show file tree
Hide file tree
Showing 13 changed files with 795 additions and 346 deletions.
7 changes: 4 additions & 3 deletions cpp/src/arrow/filesystem/localfs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,11 @@ Result<std::shared_ptr<io::OutputStream>> OpenOutputStreamGeneric(const std::str
ARROW_ASSIGN_OR_RAISE(auto fn, PlatformFilename::FromString(path));
const bool write_only = true;
ARROW_ASSIGN_OR_RAISE(
int fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append));
auto maybe_stream = io::FileOutputStream::Open(fd);
auto fd, ::arrow::internal::FileOpenWritable(fn, write_only, truncate, append));
int raw_fd = fd.Detach();
auto maybe_stream = io::FileOutputStream::Open(raw_fd);
if (!maybe_stream.ok()) {
ARROW_UNUSED(::arrow::internal::FileClose(fd));
ARROW_UNUSED(::arrow::internal::FileClose(raw_fd));
}
return maybe_stream;
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/filesystem/localfs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace arrow {
namespace fs {
namespace internal {

using ::arrow::internal::FileDescriptor;
using ::arrow::internal::PlatformFilename;
using ::arrow::internal::TemporaryDir;

Expand Down Expand Up @@ -237,9 +238,8 @@ class TestLocalFS : public LocalFSTestMixin {

void CheckConcreteFile(const std::string& path, int64_t expected_size) {
ASSERT_OK_AND_ASSIGN(auto fn, PlatformFilename::FromString(path));
ASSERT_OK_AND_ASSIGN(int fd, ::arrow::internal::FileOpenReadable(fn));
auto result = ::arrow::internal::FileGetSize(fd);
ASSERT_OK(::arrow::internal::FileClose(fd));
ASSERT_OK_AND_ASSIGN(FileDescriptor fd, ::arrow::internal::FileOpenReadable(fn));
auto result = ::arrow::internal::FileGetSize(fd.fd());
ASSERT_OK_AND_ASSIGN(int64_t size, result);
ASSERT_EQ(size, expected_size);
}
Expand Down
89 changes: 19 additions & 70 deletions cpp/src/arrow/flight/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@

#include "arrow/flight/server.h"

#ifdef _WIN32
#include "arrow/util/windows_compatibility.h"

#include <io.h>
#else
#include <fcntl.h>
#include <unistd.h>
#endif
#include <atomic>
#include <cerrno>
#include <chrono>
Expand All @@ -52,22 +44,16 @@

namespace arrow {
namespace flight {

namespace {
#if (ATOMIC_INT_LOCK_FREE != 2 || ATOMIC_POINTER_LOCK_FREE != 2)
#error "atomic ints and atomic pointers not always lock-free!"
#endif

using ::arrow::internal::SelfPipe;
using ::arrow::internal::SetSignalHandler;
using ::arrow::internal::SignalHandler;

#ifdef WIN32
#define PIPE_WRITE _write
#define PIPE_READ _read
#else
#define PIPE_WRITE write
#define PIPE_READ read
#endif

/// RAII guard that manages a self-pipe and a thread that listens on
/// the self-pipe, shutting down the server when a signal handler
/// writes to the pipe.
Expand All @@ -80,51 +66,22 @@ class ServerSignalHandler {
///
/// \return the fd of the write side of the pipe.
template <typename Fn>
arrow::Result<int> Init(Fn handler) {
ARROW_ASSIGN_OR_RAISE(auto pipe, arrow::internal::CreatePipe());
#ifndef WIN32
// Make write end nonblocking
int flags = fcntl(pipe.wfd, F_GETFL);
if (flags == -1) {
RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
return arrow::internal::IOErrorFromErrno(
errno, "Could not initialize self-pipe to wait for signals");
}
flags |= O_NONBLOCK;
if (fcntl(pipe.wfd, F_SETFL, flags) == -1) {
RETURN_NOT_OK(arrow::internal::FileClose(pipe.rfd));
RETURN_NOT_OK(arrow::internal::FileClose(pipe.wfd));
return arrow::internal::IOErrorFromErrno(
errno, "Could not initialize self-pipe to wait for signals");
}
#endif
self_pipe_ = pipe;
handle_signals_ = std::thread(handler, self_pipe_.rfd);
return self_pipe_.wfd;
arrow::Result<std::shared_ptr<SelfPipe>> Init(Fn handler) {
ARROW_ASSIGN_OR_RAISE(self_pipe_, SelfPipe::Make(/*signal_safe=*/true));
handle_signals_ = std::thread(handler, self_pipe_);
return self_pipe_;
}

Status Shutdown() {
if (self_pipe_.rfd == 0) {
// Already closed
return Status::OK();
}
if (PIPE_WRITE(self_pipe_.wfd, "0", 1) < 0 && errno != EAGAIN &&
errno != EWOULDBLOCK && errno != EINTR) {
return arrow::internal::IOErrorFromErrno(errno, "Could not unblock signal thread");
}
RETURN_NOT_OK(self_pipe_->Shutdown());
handle_signals_.join();
RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.rfd));
RETURN_NOT_OK(arrow::internal::FileClose(self_pipe_.wfd));
self_pipe_.rfd = 0;
self_pipe_.wfd = 0;
return Status::OK();
}

~ServerSignalHandler() { ARROW_CHECK_OK(Shutdown()); }

private:
arrow::internal::Pipe self_pipe_;
std::shared_ptr<SelfPipe> self_pipe_;
std::thread handle_signals_;
};
} // namespace
Expand All @@ -140,7 +97,7 @@ struct FlightServerBase::Impl {
static std::atomic<Impl*> running_instance_;
// We'll use the self-pipe trick to notify a thread from the signal
// handler. The thread will then shut down the server.
int self_pipe_wfd_;
std::shared_ptr<SelfPipe> self_pipe_;

// Signal handling
std::vector<int> signals_;
Expand All @@ -156,24 +113,17 @@ struct FlightServerBase::Impl {

void DoHandleSignal(int signum) {
got_signal_ = signum;
int saved_errno = errno;
if (PIPE_WRITE(self_pipe_wfd_, "0", 1) < 0) {
// Can't do much here, though, pipe is nonblocking so hopefully this doesn't happen
ARROW_LOG(WARNING) << "FlightServerBase: failed to handle signal " << signum
<< " errno: " << errno;
}
errno = saved_errno;

// Send dummy payload over self-pipe
self_pipe_->Send(/*payload=*/0);
}

static void WaitForSignals(int fd) {
// Wait for a signal handler to write to the pipe
int8_t buf[1];
while (PIPE_READ(fd, /*buf=*/buf, /*count=*/1) == -1) {
if (errno == EINTR) {
continue;
}
ARROW_CHECK_OK(arrow::internal::IOErrorFromErrno(
errno, "Error while waiting for shutdown signal"));
static void WaitForSignals(std::shared_ptr<SelfPipe> self_pipe) {
// Wait for a signal handler to wake up the pipe
auto st = self_pipe->Wait().status();
// Status::Invalid means the pipe was shutdown without any wakeup
if (!st.ok() && !st.IsInvalid()) {
ARROW_LOG(FATAL) << "Failed to wait on self-pipe: " << st.ToString();
}
auto instance = running_instance_.load();
if (instance != nullptr) {
Expand Down Expand Up @@ -232,8 +182,7 @@ Status FlightServerBase::Serve() {
impl_->running_instance_ = impl_.get();

ServerSignalHandler signal_handler;
ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_wfd_,
signal_handler.Init(&Impl::WaitForSignals));
ARROW_ASSIGN_OR_RAISE(impl_->self_pipe_, signal_handler.Init(&Impl::WaitForSignals));
// Override existing signal handlers with our own handler so as to stop the server.
for (size_t i = 0; i < impl_->signals_.size(); ++i) {
int signum = impl_->signals_[i];
Expand Down
62 changes: 20 additions & 42 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,13 @@

namespace arrow {

using internal::FileDescriptor;
using internal::IOErrorFromErrno;

namespace io {

class OSFile {
public:
OSFile() : fd_(-1), is_open_(false), size_(-1), need_seeking_(false) {}

~OSFile() {}

// Note: only one of the Open* methods below may be called on a given instance

Status OpenWritable(const std::string& path, bool truncate, bool append,
Expand All @@ -76,11 +73,10 @@ class OSFile {

ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenWritable(file_name_, write_only,
truncate, append));
is_open_ = true;
mode_ = write_only ? FileMode::WRITE : FileMode::READWRITE;

if (!truncate) {
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));
} else {
size_ = 0;
}
Expand All @@ -98,55 +94,42 @@ class OSFile {
size_ = -1;
}
RETURN_NOT_OK(SetFileName(fd));
is_open_ = true;
mode_ = FileMode::WRITE;
fd_ = fd;
fd_ = FileDescriptor(fd);
return Status::OK();
}

Status OpenReadable(const std::string& path) {
RETURN_NOT_OK(SetFileName(path));

ARROW_ASSIGN_OR_RAISE(fd_, ::arrow::internal::FileOpenReadable(file_name_));
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_));
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd_.fd()));

is_open_ = true;
mode_ = FileMode::READ;
return Status::OK();
}

Status OpenReadable(int fd) {
ARROW_ASSIGN_OR_RAISE(size_, ::arrow::internal::FileGetSize(fd));
RETURN_NOT_OK(SetFileName(fd));
is_open_ = true;
mode_ = FileMode::READ;
fd_ = fd;
fd_ = FileDescriptor(fd);
return Status::OK();
}

Status CheckClosed() const {
if (!is_open_) {
if (fd_.closed()) {
return Status::Invalid("Invalid operation on closed file");
}
return Status::OK();
}

Status Close() {
if (is_open_) {
// Even if closing fails, the fd will likely be closed (perhaps it's
// already closed).
is_open_ = false;
int fd = fd_;
fd_ = -1;
RETURN_NOT_OK(::arrow::internal::FileClose(fd));
}
return Status::OK();
}
Status Close() { return fd_.Close(); }

Result<int64_t> Read(int64_t nbytes, void* out) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(CheckPositioned());
return ::arrow::internal::FileRead(fd_, reinterpret_cast<uint8_t*>(out), nbytes);
return ::arrow::internal::FileRead(fd_.fd(), reinterpret_cast<uint8_t*>(out), nbytes);
}

Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
Expand All @@ -155,16 +138,16 @@ class OSFile {
// ReadAt() leaves the file position undefined, so require that we seek
// before calling Read() or Write().
need_seeking_.store(true);
return ::arrow::internal::FileReadAt(fd_, reinterpret_cast<uint8_t*>(out), position,
nbytes);
return ::arrow::internal::FileReadAt(fd_.fd(), reinterpret_cast<uint8_t*>(out),
position, nbytes);
}

Status Seek(int64_t pos) {
RETURN_NOT_OK(CheckClosed());
if (pos < 0) {
return Status::Invalid("Invalid position");
}
Status st = ::arrow::internal::FileSeek(fd_, pos);
Status st = ::arrow::internal::FileSeek(fd_.fd(), pos);
if (st.ok()) {
need_seeking_.store(false);
}
Expand All @@ -173,7 +156,7 @@ class OSFile {

Result<int64_t> Tell() const {
RETURN_NOT_OK(CheckClosed());
return ::arrow::internal::FileTell(fd_);
return ::arrow::internal::FileTell(fd_.fd());
}

Status Write(const void* data, int64_t length) {
Expand All @@ -184,13 +167,13 @@ class OSFile {
if (length < 0) {
return Status::IOError("Length must be non-negative");
}
return ::arrow::internal::FileWrite(fd_, reinterpret_cast<const uint8_t*>(data),
return ::arrow::internal::FileWrite(fd_.fd(), reinterpret_cast<const uint8_t*>(data),
length);
}

int fd() const { return fd_; }
int fd() const { return fd_.fd(); }

bool is_open() const { return is_open_; }
bool is_open() const { return !fd_.closed(); }

int64_t size() const { return size_; }

Expand Down Expand Up @@ -221,16 +204,11 @@ class OSFile {
::arrow::internal::PlatformFilename file_name_;

std::mutex lock_;

// File descriptor
int fd_;

FileDescriptor fd_;
FileMode::type mode_;

bool is_open_;
int64_t size_;
int64_t size_{-1};
// Whether ReadAt made the file position non-deterministic.
std::atomic<bool> need_seeking_;
std::atomic<bool> need_seeking_{false};
};

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -287,7 +265,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
for (const auto& range : ranges) {
RETURN_NOT_OK(internal::ValidateRange(range.offset, range.length));
#if defined(POSIX_FADV_WILLNEED)
int ret = posix_fadvise(fd_, range.offset, range.length, POSIX_FADV_WILLNEED);
int ret = posix_fadvise(fd_.fd(), range.offset, range.length, POSIX_FADV_WILLNEED);
if (ret) {
RETURN_NOT_OK(report_error(ret, "posix_fadvise failed"));
}
Expand All @@ -296,7 +274,7 @@ class ReadableFile::ReadableFileImpl : public OSFile {
off_t ra_offset;
int ra_count;
} radvisory{range.offset, static_cast<int>(range.length)};
if (radvisory.ra_count > 0 && fcntl(fd_, F_RDADVISE, &radvisory) == -1) {
if (radvisory.ra_count > 0 && fcntl(fd_.fd(), F_RDADVISE, &radvisory) == -1) {
RETURN_NOT_OK(report_error(errno, "fcntl(fd, F_RDADVISE, ...) failed"));
}
#else
Expand Down
Loading

0 comments on commit 1de30af

Please sign in to comment.