Skip to content

Commit

Permalink
ARROW-294: [C++] Do not use platform-dependent fopen/fclose functions…
Browse files Browse the repository at this point in the history
… for MemoryMappedFile

Also adds a test case for ARROW-340.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #265 from wesm/ARROW-294 and squashes the following commits:

42a83a4 [Wes McKinney] Remove duplicated includes
3928ab0 [Wes McKinney] Base MemoryMappedFile implementation on common OSFile interface. Add test case for ARROW-340.
  • Loading branch information
wesm authored and xhochy committed Jan 3, 2017
1 parent 9f7d4ae commit d9df556
Show file tree
Hide file tree
Showing 7 changed files with 359 additions and 323 deletions.
208 changes: 194 additions & 14 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@

#endif // _MSC_VER

// defines that
// defines that don't exist in MinGW
#if defined(__MINGW32__)
#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR
#elif defined(_MSC_VER) // Visual Studio
Expand Down Expand Up @@ -174,7 +174,8 @@ static inline Status FileOpenReadable(const std::string& filename, int* fd) {
return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
}

static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
static inline Status FileOpenWriteable(
const std::string& filename, bool write_only, bool truncate, int* fd) {
int ret;
errno_t errno_actual = 0;

Expand All @@ -186,13 +187,31 @@ static inline Status FileOpenWriteable(const std::string& filename, int* fd) {
memcpy(wpath.data(), filename.data(), filename.size());
memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t));

errno_actual = _wsopen_s(fd, wpath.data(), _O_WRONLY | _O_CREAT | _O_BINARY | _O_TRUNC,
_SH_DENYNO, _S_IWRITE);
int oflag = _O_CREAT | _O_BINARY;

if (truncate) { oflag |= _O_TRUNC; }

if (write_only) {
oflag |= _O_WRONLY;
} else {
oflag |= _O_RDWR;
}

errno_actual = _wsopen_s(fd, wpath.data(), oflag, _SH_DENYNO, _S_IWRITE);
ret = *fd;

#else
ret = *fd =
open(filename.c_str(), O_WRONLY | O_CREAT | O_BINARY | O_TRUNC, ARROW_WRITE_SHMODE);
int oflag = O_CREAT | O_BINARY;

if (truncate) { oflag |= O_TRUNC; }

if (write_only) {
oflag |= O_WRONLY;
} else {
oflag |= O_RDWR;
}

ret = *fd = open(filename.c_str(), oflag, ARROW_WRITE_SHMODE);
#endif
return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size());
}
Expand Down Expand Up @@ -296,22 +315,27 @@ class OSFile {

~OSFile() {}

Status OpenWritable(const std::string& path) {
RETURN_NOT_OK(FileOpenWriteable(path, &fd_));
Status OpenWriteable(const std::string& path, bool append, bool write_only) {
RETURN_NOT_OK(FileOpenWriteable(path, write_only, !append, &fd_));
path_ = path;
is_open_ = true;
mode_ = write_only ? FileMode::READ : FileMode::READWRITE;

if (append) {
RETURN_NOT_OK(FileGetSize(fd_, &size_));
} else {
size_ = 0;
}
return Status::OK();
}

Status OpenReadable(const std::string& path) {
RETURN_NOT_OK(FileOpenReadable(path, &fd_));
RETURN_NOT_OK(FileGetSize(fd_, &size_));

// The position should be 0 after GetSize
// RETURN_NOT_OK(Seek(0));

path_ = path;
is_open_ = true;
mode_ = FileMode::READ;
return Status::OK();
}

Expand Down Expand Up @@ -346,12 +370,14 @@ class OSFile {

int64_t size() const { return size_; }

private:
protected:
std::string path_;

// File descriptor
int fd_;

FileMode::type mode_;

bool is_open_;
int64_t size_;
};
Expand Down Expand Up @@ -440,7 +466,9 @@ int ReadableFile::file_descriptor() const {

class FileOutputStream::FileOutputStreamImpl : public OSFile {
public:
Status Open(const std::string& path) { return OpenWritable(path); }
Status Open(const std::string& path, bool append) {
return OpenWriteable(path, append, true);
}
};

FileOutputStream::FileOutputStream() {
Expand All @@ -453,9 +481,14 @@ FileOutputStream::~FileOutputStream() {

Status FileOutputStream::Open(
const std::string& path, std::shared_ptr<FileOutputStream>* file) {
return Open(path, false, file);
}

Status FileOutputStream::Open(
const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file) {
// private ctor
*file = std::shared_ptr<FileOutputStream>(new FileOutputStream());
return (*file)->impl_->Open(path);
return (*file)->impl_->Open(path, append);
}

Status FileOutputStream::Close() {
Expand All @@ -474,5 +507,152 @@ int FileOutputStream::file_descriptor() const {
return impl_->fd();
}

// ----------------------------------------------------------------------
// Implement MemoryMappedFile

class MemoryMappedFile::MemoryMappedFileImpl : public OSFile {
public:
MemoryMappedFileImpl() : OSFile(), data_(nullptr) {}

~MemoryMappedFileImpl() {
if (is_open_) {
munmap(data_, size_);
OSFile::Close();
}
}

Status Open(const std::string& path, FileMode::type mode) {
int prot_flags = PROT_READ;

if (mode != FileMode::READ) {
prot_flags |= PROT_WRITE;
const bool append = true;
RETURN_NOT_OK(OSFile::OpenWriteable(path, append, mode == FileMode::WRITE));
} else {
RETURN_NOT_OK(OSFile::OpenReadable(path));
}

void* result = mmap(nullptr, size_, prot_flags, MAP_SHARED, fd(), 0);
if (result == MAP_FAILED) {
std::stringstream ss;
ss << "Memory mapping file failed, errno: " << errno;
return Status::IOError(ss.str());
}
data_ = reinterpret_cast<uint8_t*>(result);
position_ = 0;

return Status::OK();
}

int64_t size() const { return size_; }

Status Seek(int64_t position) {
if (position < 0 || position >= size_) {
return Status::Invalid("position is out of bounds");
}
position_ = position;
return Status::OK();
}

int64_t position() { return position_; }

void advance(int64_t nbytes) { position_ = std::min(size_, position_ + nbytes); }

uint8_t* data() { return data_; }

uint8_t* head() { return data_ + position_; }

bool writable() { return mode_ != FileMode::READ; }

bool opened() { return is_open_; }

private:
int64_t position_;

// The memory map
uint8_t* data_;
};

MemoryMappedFile::MemoryMappedFile(FileMode::type mode) {
ReadableFileInterface::set_mode(mode);
}

MemoryMappedFile::~MemoryMappedFile() {}

Status MemoryMappedFile::Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out) {
std::shared_ptr<MemoryMappedFile> result(new MemoryMappedFile(mode));

result->impl_.reset(new MemoryMappedFileImpl());
RETURN_NOT_OK(result->impl_->Open(path, mode));

*out = result;
return Status::OK();
}

Status MemoryMappedFile::GetSize(int64_t* size) {
*size = impl_->size();
return Status::OK();
}

Status MemoryMappedFile::Tell(int64_t* position) {
*position = impl_->position();
return Status::OK();
}

Status MemoryMappedFile::Seek(int64_t position) {
return impl_->Seek(position);
}

Status MemoryMappedFile::Close() {
// munmap handled in pimpl dtor
return Status::OK();
}

Status MemoryMappedFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
nbytes = std::min(nbytes, impl_->size() - impl_->position());
std::memcpy(out, impl_->head(), nbytes);
*bytes_read = nbytes;
impl_->advance(nbytes);
return Status::OK();
}

Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
nbytes = std::min(nbytes, impl_->size() - impl_->position());
*out = std::make_shared<Buffer>(impl_->head(), nbytes);
impl_->advance(nbytes);
return Status::OK();
}

bool MemoryMappedFile::supports_zero_copy() const {
return true;
}

Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
if (!impl_->opened() || !impl_->writable()) {
return Status::IOError("Unable to write");
}

RETURN_NOT_OK(impl_->Seek(position));
return WriteInternal(data, nbytes);
}

Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
if (!impl_->opened() || !impl_->writable()) {
return Status::IOError("Unable to write");
}
if (nbytes + impl_->position() > impl_->size()) {
return Status::Invalid("Cannot write past end of memory map");
}

return WriteInternal(data, nbytes);
}

Status MemoryMappedFile::WriteInternal(const uint8_t* data, int64_t nbytes) {
memcpy(impl_->head(), data, nbytes);
impl_->advance(nbytes);
return Status::OK();
}

} // namespace io
} // namespace arrow
49 changes: 49 additions & 0 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ class ARROW_EXPORT FileOutputStream : public OutputStream {
public:
~FileOutputStream();

// When opening a new file, any existing file with the indicated path is
// truncated to 0 bytes, deleting any existing memory
static Status Open(const std::string& path, std::shared_ptr<FileOutputStream>* file);

static Status Open(
const std::string& path, bool append, std::shared_ptr<FileOutputStream>* file);

// OutputStream interface
Status Close() override;
Status Tell(int64_t* position) override;
Expand Down Expand Up @@ -88,6 +93,50 @@ class ARROW_EXPORT ReadableFile : public ReadableFileInterface {
std::unique_ptr<ReadableFileImpl> impl_;
};

// A file interface that uses memory-mapped files for memory interactions,
// supporting zero copy reads. The same class is used for both reading and
// writing.
//
// If opening a file in a writeable mode, it is not truncated first as with
// FileOutputStream
class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
public:
~MemoryMappedFile();

static Status Open(const std::string& path, FileMode::type mode,
std::shared_ptr<MemoryMappedFile>* out);

Status Close() override;

Status Tell(int64_t* position) override;

Status Seek(int64_t position) override;

// Required by ReadableFileInterface, copies memory into out
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;

// Zero copy read
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

bool supports_zero_copy() const override;

Status Write(const uint8_t* data, int64_t nbytes) override;

Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;

// @return: the size in bytes of the memory source
Status GetSize(int64_t* size) override;

private:
explicit MemoryMappedFile(FileMode::type mode);

Status WriteInternal(const uint8_t* data, int64_t nbytes);

// Hide the internal details of this class for now
class ARROW_NO_EXPORT MemoryMappedFileImpl;
std::unique_ptr<MemoryMappedFileImpl> impl_;
};

} // namespace io
} // namespace arrow

Expand Down
Loading

0 comments on commit d9df556

Please sign in to comment.