Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use mmap for external memory. #9282

Merged
merged 53 commits into from
Jun 19, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b1573d4
use mmap for external memory.
trivialfis Jun 9, 2023
23a89b0
reduce size.
trivialfis Jun 9, 2023
fa5d460
cleanup.
trivialfis Jun 9, 2023
9ebd4ef
abstract into a dmlc stream.
trivialfis Jun 9, 2023
3832fd5
cleanup.
trivialfis Jun 9, 2023
18c3544
Cleanup.
trivialfis Jun 9, 2023
a04dc39
macos.
trivialfis Jun 9, 2023
1e0405e
debug.
trivialfis Jun 11, 2023
ba358af
Fix.
trivialfis Jun 11, 2023
a6202d0
cleanup.
trivialfis Jun 11, 2023
117fb97
Cleanup.
trivialfis Jun 12, 2023
9ee1643
Skip python test.
trivialfis Jun 12, 2023
da00b6d
lint.
trivialfis Jun 12, 2023
05ce49b
Add test.
trivialfis Jun 12, 2023
ed635d3
rename.
trivialfis Jun 12, 2023
9b5c686
cleanup.
trivialfis Jun 12, 2023
1b0dab2
Remove page in grad-based sampling.
trivialfis Jun 12, 2023
f383f76
remove page in uniform sampling.
trivialfis Jun 12, 2023
68b838d
remove in no sampling.
trivialfis Jun 12, 2023
4b5d38f
GPU initialization.
trivialfis Jun 12, 2023
4521f04
use ctx.
trivialfis Jun 12, 2023
39ed218
comment.
trivialfis Jun 12, 2023
a736125
lint.
trivialfis Jun 12, 2023
a61a079
doc.
trivialfis Jun 12, 2023
4989269
windows mmap
trivialfis Jun 13, 2023
9068cc8
Merge branch 'ext-mmap-win' into ext-mmap
trivialfis Jun 13, 2023
341c8fb
compile
trivialfis Jun 13, 2023
2660c66
Pad the file for windows.
trivialfis Jun 13, 2023
58c0d99
Avoid padding the data.
trivialfis Jun 13, 2023
5baf5ca
Cleanup.
trivialfis Jun 13, 2023
925245c
debug.
trivialfis Jun 13, 2023
bcf4cdb
Fix.
trivialfis Jun 13, 2023
c195db1
Cleanup.
trivialfis Jun 13, 2023
9bbecf5
Cleanup.
trivialfis Jun 13, 2023
d3987e8
cleanup.
trivialfis Jun 13, 2023
788f2b6
GPU compilation.
trivialfis Jun 13, 2023
e88f561
lint.
trivialfis Jun 13, 2023
6a02601
log time.
trivialfis Jun 13, 2023
9dd5812
improve the tests.
trivialfis Jun 13, 2023
94b8a0d
Timer.
trivialfis Jun 14, 2023
a4e11d3
fix win leak
trivialfis Jun 14, 2023
8cdbb87
mingw
trivialfis Jun 14, 2023
22ae3f6
reduce page number.
trivialfis Jun 14, 2023
3d76acc
Merge remote-tracking branch 'jiamingy/ext-mmap' into ext-mmap
trivialfis Jun 14, 2023
c8726c3
polishing.
trivialfis Jun 14, 2023
b5b57a0
Improve test.
trivialfis Jun 14, 2023
076a788
fix.
trivialfis Jun 14, 2023
8b993ff
Forbid pointer to bool cast.
trivialfis Jun 14, 2023
6169fdc
cleanup.
trivialfis Jun 15, 2023
914a186
read-only.
trivialfis Jun 15, 2023
f3e39ac
read-only.
trivialfis Jun 15, 2023
de4f71c
fix.
trivialfis Jun 15, 2023
6552242
Don't blame.
trivialfis Jun 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions demo/guide-python/external_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def main(tmpdir: str) -> xgboost.Booster:
missing = np.NaN
Xy = xgboost.DMatrix(it, missing=missing, enable_categorical=False)

# Other tree methods including ``hist`` and ``gpu_hist`` also work, see tutorial in
# doc for details.
# Other tree methods including ``approx``, ``hist``, and ``gpu_hist`` are supported,
# see tutorial in doc for details.
booster = xgboost.train(
{"tree_method": "approx", "max_depth": 2},
{"tree_method": "hist", "max_depth": 4},
Xy,
evals=[(Xy, "Train")],
num_boost_round=10,
Expand Down
13 changes: 13 additions & 0 deletions doc/tutorials/external_memory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@ The feature is still experimental and not yet ready for production use. In this
we will introduce both methods. Please note that training on data from external memory is
not supported by ``exact`` tree method.

.. warning::

The implementation of external memory uses ``mmap`` and is not tested against system
errors like disconnected network devices (`SIGBUS`). In addition, Windows is not yet
supported.

.. note::

When externel memory is used, the CPU training performance is IO bounded. Meaning, the
training speed is almost exclusively determined by the disk IO speed. For GPU, please
read on and see the gradient-based sampling with external memory. During benchmark, we
used a NVME connected to a PCIE slot, the performance is "usable" with ``hist`` on CPU.

*************
Data Iterator
*************
Expand Down
8 changes: 8 additions & 0 deletions python-package/xgboost/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def no_ipv6() -> PytestSkip:
return {"condition": not has_ipv6(), "reason": "IPv6 is required to be enabled."}


def no_unix() -> PytestSkip:
"""PyTest skip mark for non-unix."""
return {
"condition": system() == "Windows",
"reason": "unix system is required to be enabled.",
}


def no_ubjson() -> PytestSkip:
return no_mod("ubjson")

Expand Down
2 changes: 1 addition & 1 deletion rabit/include/rabit/internal/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ struct MemoryFixSizeBuffer : public SeekStream {
return curr_ptr_ == buffer_size_;
}

private:
protected:
/*! \brief in memory buffer */
char *p_buffer_;
/*! \brief current pointer */
Expand Down
63 changes: 53 additions & 10 deletions src/common/io.cc
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
/*!
* Copyright (c) by XGBoost Contributors 2019-2022
/**
* Copyright 2019-2023, by XGBoost Contributors
*/
#if defined(__unix__)
#if defined(__unix__) || defined(__APPLE__)
#include <fcntl.h> // for open, O_RDONLY
#include <sys/mman.h> // for mmap, mmap64, munmap
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#endif // defined(__unix__)
#include <unistd.h> // for close
#endif // defined(__unix__)
#include <algorithm>
#include <cerrno> // for errno
#include <cstdio>
#include <fstream>
#include <string>
#include <limits> // for numeric_limits
#include <memory>
#include <string>
#include <utility>
#include <cstdio>

#include "xgboost/logging.h"
#include "io.h"
#include "xgboost/logging.h"

namespace xgboost {
namespace common {

size_t PeekableInStream::Read(void* dptr, size_t size) {
size_t nbuffer = buffer_.length() - buffer_ptr_;
if (nbuffer == 0) return strm_->Read(dptr, size);
Expand Down Expand Up @@ -155,5 +157,46 @@ std::string FileExtension(std::string fname, bool lower) {
return "";
}
}

std::size_t PadPageForMmap(std::size_t file_bytes, dmlc::Stream* fo) {
decltype(file_bytes) page_size = getpagesize();
CHECK(page_size != 0 && page_size % 2 == 0) << "Failed to get page size on the current system.";
CHECK_NE(file_bytes, 0) << "Empty page encountered.";
auto n_pages = file_bytes / page_size + !!(file_bytes % page_size != 0);
auto padded = n_pages * page_size;
auto padding = padded - file_bytes;
std::vector<std::uint8_t> padding_bytes(padding, 0);
fo->Write(padding_bytes.data(), padding_bytes.size());
return padded;
}

void* PrivateMmapStream::Open(StringView path, bool read_only, std::size_t offset,
std::size_t length) {
fd_ = open(path.c_str(), O_RDONLY);
CHECK_GE(fd_, 0) << "Failed to open:" << path << ". " << strerror(errno);

char* ptr{nullptr};
int prot{PROT_READ};
if (!read_only) {
prot |= PROT_WRITE;
}
#if defined(__linux__) || defined(__GLIBC__)
ptr = reinterpret_cast<char*>(mmap64(nullptr, length, prot, MAP_PRIVATE, fd_, offset));
#elif defined(_MSC_VER)
LOG(FATAL) << "External memory is not implemented for Windows.";
#else
CHECK_LE(offset, std::numeric_limits<off_t>::max())
<< "File size has exceeded the limit on the current system.";
ptr = reinterpret_cast<char*>(mmap(nullptr, length, prot, MAP_PRIVATE, fd_, offset));
#endif // defined(__linux__)
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << path << ". " << strerror(errno);
return ptr;
}

PrivateMmapStream::~PrivateMmapStream() {
CHECK_NE(munmap(p_buffer_, buffer_size_), -1)
<< "Faled to munmap." << path_ << ". " << strerror(errno);
CHECK_NE(close(fd_), -1) << "Faled to close: " << path_ << ". " << strerror(errno);
}
} // namespace common
} // namespace xgboost
43 changes: 40 additions & 3 deletions src/common/io.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*!
* Copyright by XGBoost Contributors 2014-2022
/**
* Copyright 2014-2023, XGBoost Contributors
* \file io.h
* \brief general stream interface for serialization, I/O
* \author Tianqi Chen
Expand All @@ -10,9 +10,11 @@

#include <dmlc/io.h>
#include <rabit/rabit.h>
#include <string>
#include <xgboost/string_view.h>

#include <cstring>
#include <fstream>
#include <string> // for string

#include "common.h"

Expand Down Expand Up @@ -127,6 +129,41 @@ inline std::string ReadAll(std::string const &path) {
return content;
}

/**
* @brief Pad the output file for a page to make it mmap compatible.
*
* @param file_bytes The size of the output file
* @param fo Stream used to write the file.
*
* @return The file size after being padded.
*/
std::size_t PadPageForMmap(std::size_t file_bytes, dmlc::Stream* fo);

/**
* @brief Private mmap file, copy-on-write. File must be properly aligned by `PadPageForMmap()`.
*/
class PrivateMmapStream : public MemoryFixSizeBuffer {
std::int32_t fd_;
std::string path_;

void* Open(StringView path, bool read_only, std::size_t offset, std::size_t length);

public:
/**
* @brief Construct a private mmap stream.
*
* @param path File path.
* @param read_only See the `prot` parameter of `mmap` for details.
* @param offset See the `offset` parameter of `mmap` for details.
* @param length See the `length` parameter of `mmap` for details.
*/
explicit PrivateMmapStream(std::string path, bool read_only, std::size_t offset,
std::size_t length)
: MemoryFixSizeBuffer{Open(StringView{path}, read_only, offset, length), length},
path_{path} {}

~PrivateMmapStream() override;
};
} // namespace common
} // namespace xgboost
#endif // XGBOOST_COMMON_IO_H_
92 changes: 55 additions & 37 deletions src/data/sparse_page_source.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
/*!
* Copyright 2014-2022 by XGBoost Contributors
/**
* Copyright 2014-2023, XGBoost Contributors
* \file sparse_page_source.h
*/
#ifndef XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_
#define XGBOOST_DATA_SPARSE_PAGE_SOURCE_H_

#include <algorithm> // std::min
#include <algorithm> // for min
#include <future> // async
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <future>
#include <thread>
#include <map>
#include <memory>

#include "xgboost/base.h"
#include "xgboost/data.h"

#include "adapter.h"
#include "sparse_page_writer.h"
#include "proxy_dmatrix.h"

#include "../common/common.h"
#include "../common/io.h" // for PrivateMmapStream, PadPageForMMAP
#include "../common/timer.h"
#include "adapter.h"
#include "dmlc/common.h" // OMPException
#include "proxy_dmatrix.h"
#include "sparse_page_writer.h"
#include "xgboost/base.h"
#include "xgboost/data.h"

namespace xgboost {
namespace data {
Expand Down Expand Up @@ -54,6 +54,9 @@ struct Cache {
std::string ShardName() {
return ShardName(this->name, this->format);
}
void Push(std::size_t n_bytes) {
offset.push_back(n_bytes);
}

// The write is completed.
void Commit() {
Expand Down Expand Up @@ -95,20 +98,19 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
uint32_t n_batches_ {0};

std::shared_ptr<Cache> cache_info_;
std::unique_ptr<dmlc::Stream> fo_;

using Ring = std::vector<std::future<std::shared_ptr<S>>>;
// A ring storing futures to data. Since the DMatrix iterator is forward only, so we
// can pre-fetch data in a ring.
std::unique_ptr<Ring> ring_{new Ring};
dmlc::OMPException exec_;

bool ReadCache() {
CHECK(!at_end_);
if (!cache_info_->written) {
return false;
}
if (fo_) {
fo_.reset(); // flush the data to disk.
if (ring_->empty()) {
ring_->resize(n_batches_);
}
// An heuristic for number of pre-fetched batches. We can make it part of BatchParam
Expand All @@ -117,34 +119,43 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {

size_t n_prefetch_batches = std::min(kPreFetch, n_batches_);
CHECK_GT(n_prefetch_batches, 0) << "total batches:" << n_batches_;
size_t fetch_it = count_;
std::size_t fetch_it = count_;

exec_.Rethrow();

for (size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
for (std::size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
fetch_it %= n_batches_; // ring
if (ring_->at(fetch_it).valid()) {
continue;
}
auto const *self = this; // make sure it's const
auto const* self = this; // make sure it's const
CHECK_LT(fetch_it, cache_info_->offset.size());
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self]() {
common::Timer timer;
timer.Start();
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
auto n = self->cache_info_->ShardName();
size_t offset = self->cache_info_->offset.at(fetch_it);
std::unique_ptr<dmlc::SeekStream> fi{dmlc::SeekStream::CreateForRead(n.c_str())};
fi->Seek(offset);
CHECK_EQ(fi->Tell(), offset);
ring_->at(fetch_it) = std::async(std::launch::async, [fetch_it, self, this]() {
auto page = std::make_shared<S>();
CHECK(fmt->Read(page.get(), fi.get()));
LOG(INFO) << "Read a page in " << timer.ElapsedSeconds() << " seconds.";
this->exec_.Run([&] {
common::Timer timer;
timer.Start();
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
auto n = self->cache_info_->ShardName();

std::uint64_t offset = self->cache_info_->offset.at(fetch_it);
std::uint64_t length = self->cache_info_->offset.at(fetch_it + 1) - offset;

auto fi = std::make_unique<common::PrivateMmapStream>(n, true, offset, length);
CHECK(fmt->Read(page.get(), fi.get()));
LOG(INFO) << "Read a page in " << timer.ElapsedSeconds() << " seconds.";
});
return page;
});
}

CHECK_EQ(std::count_if(ring_->cbegin(), ring_->cend(), [](auto const& f) { return f.valid(); }),
n_prefetch_batches)
<< "Sparse DMatrix assumes forward iteration.";
page_ = (*ring_)[count_].get();
CHECK(!(*ring_)[count_].valid());
exec_.Rethrow();

return true;
}

Expand All @@ -153,16 +164,22 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
common::Timer timer;
timer.Start();
std::unique_ptr<SparsePageFormat<S>> fmt{CreatePageFormat<S>("raw")};
if (!fo_) {
auto n = cache_info_->ShardName();
fo_.reset(dmlc::Stream::Create(n.c_str(), "w"));

auto name = cache_info_->ShardName();
std::unique_ptr<dmlc::Stream> fo;
if (this->Iter() == 0) {
fo.reset(dmlc::Stream::Create(name.c_str(), "w"));
} else {
fo.reset(dmlc::Stream::Create(name.c_str(), "a"));
}
auto bytes = fmt->Write(*page_, fo_.get());
timer.Stop();

auto bytes = fmt->Write(*page_, fo.get());
auto padded = common::PadPageForMmap(bytes, fo.get());

timer.Stop();
LOG(INFO) << static_cast<double>(bytes) / 1024.0 / 1024.0 << " MB written in "
<< timer.ElapsedSeconds() << " seconds.";
cache_info_->offset.push_back(bytes);
cache_info_->Push(padded);
}

virtual void Fetch() = 0;
Expand Down Expand Up @@ -259,6 +276,7 @@ class SparsePageSource : public SparsePageSourceImpl<SparsePage> {
}

if (at_end_) {
CHECK_EQ(cache_info_->offset.size(), n_batches_ + 1);
cache_info_->Commit();
if (n_batches_ != 0) {
CHECK_EQ(count_, n_batches_);
Expand Down
Loading