Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-387: [C++] Verify zero-copy Buffer slices from BufferReader retain reference to parent Buffer #266

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 194 additions & 14 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

#endif // _MSC_VER

// defines that
// defines that don't exist in MinGW
#if defined(__MINGW32__)
#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
#elif defined(_MSC_VER) // Visual Studio
Expand Down Expand Up @@ -174,7 +174,8 @@ static inline Status FileOpenReadable(const std::string& filename, int* fd) {
return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
}

static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
static inline Status FileOpenWriteable(
const std::string& filename, bool write_only, bool truncate, int* fd) {
int ret;
errno_t errno_actual = 0;

Expand All @@ -186,13 +187,31 @@ static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
memcpy(wpath.data(), filename.data(), filename.size());
memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));

errno_actual = _wsopen_s(fd, wpath.data(), _O_WRONLY | _O_CREAT | _O_BINARY | _O_TRUNC,
_SH_DENYNO, _S_IWRITE);
int oflag = _O_CREAT | _O_BINARY;

if (truncate) { oflag |= _O_TRUNC; }

if (write_only) {
oflag |= _O_WRONLY;
} else {
oflag |= _O_RDWR;
}

errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, _S_IWRITE);
ret = *fd;

#else
ret = *fd =
open(filename.c_str(), O_WRONLY | O_CREAT | O_BINARY | O_TRUNC, ARROW_WRITE_SHMODE);
int oflag = O_CREAT | O_BINARY;

if (truncate) { oflag |= O_TRUNC; }

if (write_only) {
oflag |= O_WRONLY;
} else {
oflag |= O_RDWR;
}

ret = *fd = open(filename.c_str(), oflag, ARROW_WRITE_SHMODE);
#endif
return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
}
Expand Down Expand Up @@ -296,22 +315,27 @@ class OSFile {

~OSFile() {}

Status OpenWritable(const std::string& path) {
RETURN_NOT_OK(FileOpenWriteable(path, &fd_));
Status OpenWriteable(const std::string& path, bool append, bool write_only) {
RETURN_NOT_OK(FileOpenWriteable(path, write_only, !append, &fd_));
path_ = path;
is_open_ = true;
mode_ = write_only ? FileMode::READ : FileMode::READWRITE;

if (append) {
RETURN_NOT_OK(FileGetSize(fd_, &size_));
} else {
size_ = 0;
}
return Status::OK();
}

Status OpenReadable(const std::string& path) {
RETURN_NOT_OK(FileOpenReadable(path, &fd_));
RETURN_NOT_OK(FileGetSize(fd_, &size_));

// The position should be 0 after GetSize
// RETURN_NOT_OK(Seek(0));

path_ = path;
is_open_ = true;
mode_ = FileMode::READ;
return Status::OK();
}

Expand Down Expand Up @@ -346,12 +370,14 @@ class OSFile {

int64_t size() const { return size_; }

private:
protected:
std::string path_;

// File descriptor
int fd_;

FileMode::type mode_;

bool is_open_;
int64_t size_;
};
Expand Down Expand Up @@ -440,7 +466,9 @@ int ReadableFile::file_descriptor() const {

class FileOutputStream::FileOutputStreamImpl : public OSFile {
public:
Status Open(const std::string& path) { return OpenWritable(path); }
Status Open(const std::string& path, bool append) {
return OpenWriteable(path, append, true);
}
};

FileOutputStream::FileOutputStream() {
Expand All @@ -453,9 +481,14 @@ FileOutputStream::~FileOutputStream() {

Status FileOutputStream::Open(
const std::string& path, std::shared_ptr<FileOutputStream>* file) {
return Open(path, false, file);
}

Status FileOutputStream::Open(
const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file) {
// private ctor
*file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
return (*file)->impl_->Open(path);
return (*file)->impl_->Open(path, append);
}

Status FileOutputStream::Close() {
Expand All @@ -474,5 +507,152 @@ int FileOutputStream::file_descriptor() const {
return impl_->fd();
}

// ----------------------------------------------------------------------
// Implement MemoryMappedFile

class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
public:
MemoryMappedFileImpl() : OSFile(), data_(nullptr) {}

~MemoryMappedFileImpl() {
if (is_open_) {
munmap(data_, size_);
OSFile::Close();
}
}

Status Open(const std::string& path, FileMode::type mode) {
int prot_flags = PROT_READ;

if (mode != FileMode::READ) {
prot_flags |= PROT_WRITE;
const bool append = true;
RETURN_NOT_OK(OSFile::OpenWriteable(path, append, mode == FileMode::WRITE));
} else {
RETURN_NOT_OK(OSFile::OpenReadable(path));
}

void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fd(), 0);
if (result == MAP_FAILED) {
std::stringstream ss;
ss << "Memory mapping file failed, errno: " << errno;
return Status::IOError(ss.str());
}
data_ = reinterpret_cast<uint8_t*>(result);
position_ = 0;

return Status::OK();
}

int64_t size() const { return size_; }

Status Seek(int64_t position) {
if (position < 0 || position >= size_) {
return Status::Invalid("position is out of bounds");
}
position_ = position;
return Status::OK();
}

int64_t position() { return position_; }

void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }

uint8_t* data() { return data_; }

uint8_t* head() { return data_ + position_; }

bool writable() { return mode_ != FileMode::READ; }

bool opened() { return is_open_; }

private:
int64_t position_;

// The memory map
uint8_t* data_;
};

MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
ReadableFileInterface::set_mode(mode);
}

MemoryMappedFile::~MemoryMappedFile() {}

Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out) {
std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));

result->impl_.reset(new MemoryMappedFileImpl());
RETURN_NOT_OK(result->impl_->Open(path, mode));

*out = result;
return Status::OK();
}

Status MemoryMappedFile::GetSize(int64_t* size) {
*size = impl_->size();
return Status::OK();
}

Status MemoryMappedFile::Tell(int64_t* position) {
*position = impl_->position();
return Status::OK();
}

Status MemoryMappedFile::Seek(int64_t position) {
return impl_->Seek(position);
}

Status MemoryMappedFile::Close() {
// munmap handled in pimpl dtor
return Status::OK();
}

Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
nbytes = std::min(nbytes, impl_->size() - impl_->position());
std::memcpy(out, impl_->head(), nbytes);
*bytes_read = nbytes;
impl_->advance(nbytes);
return Status::OK();
}

Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
nbytes = std::min(nbytes, impl_->size() - impl_->position());
*out = std::make_shared<Buffer>(impl_->head(), nbytes);
impl_->advance(nbytes);
return Status::OK();
}

bool MemoryMappedFile::supports_zero_copy() const {
return true;
}

Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
if (!impl_->opened() || !impl_->writable()) {
return Status::IOError("Unable to write");
}

RETURN_NOT_OK(impl_->Seek(position));
return WriteInternal(data, nbytes);
}

Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
if (!impl_->opened() || !impl_->writable()) {
return Status::IOError("Unable to write");
}
if (nbytes + impl_->position() > impl_->size()) {
return Status::Invalid("Cannot write past end of memory map");
}

return WriteInternal(data, nbytes);
}

Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
memcpy(impl_->head(), data, nbytes);
impl_->advance(nbytes);
return Status::OK();
}

} // namespace io
} // namespace arrow
49 changes: 49 additions & 0 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
public:
~FileOutputStream();

// When opening a new file, any existing file with the indicated path is
// truncated to 0 bytes, deleting any existing memory
static Status Open(const std::string& path, std::shared_ptr<FileOutputStream>* file);

static Status Open(
const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file);

// OutputStream interface
Status Close() override;
Status Tell(int64_t* position) override;
Expand Down Expand Up @@ -88,6 +93,50 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
std::unique_ptr<ReadableFileImpl> impl_;
};

// A file interface that uses memory-mapped files for memory interactions,
// supporting zero copy reads. The same class is used for both reading and
// writing.
//
// If opening a file in a writeable mode, it is not truncated first as with
// FileOutputStream
class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
public:
~MemoryMappedFile();

static Status Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out);

Status Close() override;

Status Tell(int64_t* position) override;

Status Seek(int64_t position) override;

// Required by ReadableFileInterface, copies memory into out
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;

// Zero copy read
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

bool supports_zero_copy() const override;

Status Write(const uint8_t* data, int64_t nbytes) override;

Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;

// @return: the size in bytes of the memory source
Status GetSize(int64_t* size) override;

private:
explicit MemoryMappedFile(FileMode::type mode);

Status WriteInternal(const uint8_t* data, int64_t nbytes);

// Hide the internal details of this class for now
class ARROW_NO_EXPORT MemoryMappedFileImpl;
std::unique_ptr<MemoryMappedFileImpl> impl_;
};

} // namespace io
} // namespace arrow

Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,10 @@ Status ReadableFileInterface::ReadAt(
return Read(nbytes, out);
}

Status Writeable::Write(const std::string& data) {
return Write(reinterpret_cast<const uint8_t*>(data.c_str()),
static_cast<int64_t>(data.size()));
}

} // namespace io
} // namespace arrow
5 changes: 4 additions & 1 deletion cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cstdint>
#include <memory>
#include <string>

#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
Expand Down Expand Up @@ -67,9 +68,11 @@ class Seekable {
virtual Status Seek(int64_t position) = 0;
};

class Writeable {
class ARROW_EXPORT Writeable {
public:
virtual Status Write(const uint8_t* data, int64_t nbytes) = 0;

Status Write(const std::string& data);
};

class Readable {
Expand Down
Loading