From abb2b69f80d4fa96c17993fb3834054d4aa32da3 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Thu, 17 Dec 2020 11:48:13 -0500 Subject: [PATCH 1/6] Add a POSIX version of file drainer to be used on non-Windows machine. This can call fdatasync() to ensure writing to disk --- source/adios2/CMakeLists.txt | 3 + .../toolkit/burstbuffer/FileDrainer.cpp | 160 ----------- .../adios2/toolkit/burstbuffer/FileDrainer.h | 48 +--- .../burstbuffer/FileDrainerIOFstream.cpp | 194 ++++++++++++++ .../burstbuffer/FileDrainerIOFstream.h | 92 +++++++ .../burstbuffer/FileDrainerIOPosix.cpp | 251 ++++++++++++++++++ .../toolkit/burstbuffer/FileDrainerIOPosix.h | 108 ++++++++ 7 files changed, 655 insertions(+), 201 deletions(-) create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 2b3a844474..e3024460a8 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -126,6 +126,9 @@ target_compile_features(adios2_core PUBLIC "$second; - } - else - { - InputFile f = std::make_shared(); - m_InputFileMap.emplace(path, f); - Open(f, path); - return f; - } -} - -OutputFile FileDrainer::GetFileForWrite(const std::string &path, bool append) -{ - auto it = m_OutputFileMap.find(path); - if (it != m_OutputFileMap.end()) - { - return it->second; - } - else - { - OutputFile f = std::make_shared(); - m_OutputFileMap.emplace(path, f); - Open(f, path, append); - return f; - } -} - -void FileDrainer::Open(InputFile &f, const std::string &path) -{ - - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::in | std::ios::binary); -} - -void FileDrainer::Open(OutputFile &f, const std::string &path, bool append) -{ - - if (append) - { - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::out | std::ios::app | std::ios::binary); - } - else - { - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::out | std::ios::trunc | std::ios::binary); - } -} - -void FileDrainer::Close(InputFile &f) { f->close(); } -void FileDrainer::Close(OutputFile &f) { f->close(); } - -bool FileDrainer::Good(InputFile &f) { return (f->good()); } -bool FileDrainer::Good(OutputFile &f) { return (f->good()); } - -void FileDrainer::CloseAll() -{ - for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) - { - // if (it->second->good()) - //{ - Close(it->second); - //} - } - m_OutputFileMap.clear(); - for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) - { - // if (it->second->good()) - //{ - Close(it->second); - //} - } - m_InputFileMap.clear(); -} - -void FileDrainer::Seek(InputFile &f, size_t offset, const std::string &path) -{ - f->seekg(offset, std::ios_base::beg); -} - -void FileDrainer::Seek(OutputFile &f, size_t offset, const std::string &path) -{ - f->seekp(offset, std::ios_base::beg); -} - -void FileDrainer::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); } - -size_t FileDrainer::GetFileSize(InputFile &f) -{ - const auto currentOffset = f->tellg(); - f->seekg(0, std::ios_base::end); - auto fileSize = f->tellg(); - f->seekg(currentOffset, std::ios_base::beg); - return static_cast(fileSize); -} - -std::pair FileDrainer::Read(InputFile &f, size_t count, - char *buffer, - const std::string &path) -{ - size_t totalRead = 0; - double totalSlept = 0.0; - const double sleepUnit = 0.01; // seconds - while (count > 0) - { - const auto currentOffset = f->tellg(); - f->read(buffer, static_cast(count)); - const auto readSize = f->gcount(); - - if (readSize < static_cast(count)) - { - if (f->eof()) - { - std::chrono::duration d(sleepUnit); - std::this_thread::sleep_for(d); - f->clear(f->rdstate() & ~std::fstream::eofbit); - totalSlept += sleepUnit; - } - else - { - throw std::ios_base::failure( - "FileDrainer couldn't read from file " + path + - " offset = " + std::to_string(currentOffset) + - " count = " + std::to_string(count) + " bytes but only " + - std::to_string(totalRead + readSize) + ".\n"); - } - } - buffer += readSize; - count -= readSize; - totalRead += readSize; - } - return std::pair(totalRead, totalSlept); -} - -size_t FileDrainer::Write(OutputFile &f, size_t count, const char *buffer, - const std::string &path) -{ - f->write(buffer, static_cast(count)); - - if (f->bad()) - { - throw std::ios_base::failure( - "FileDrainer couldn't write to file " + path + - " count = " + std::to_string(count) + " bytes\n"); - } - - return count; -} - -void FileDrainer::Delete(OutputFile &f, const std::string &path) -{ - Close(f); - std::remove(path.c_str()); -} - void FileDrainer::SetVerbose(int verboseLevel, int rank) { m_Verbose = verboseLevel; diff --git a/source/adios2/toolkit/burstbuffer/FileDrainer.h b/source/adios2/toolkit/burstbuffer/FileDrainer.h index db0da66276..e58bc60450 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainer.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainer.h @@ -23,6 +23,12 @@ #include "adios2/common/ADIOSTypes.h" +#ifdef _WIN32 +#include "FileDrainerIOFstream.h" +#else +#include "FileDrainerIOPosix.h" +#endif + namespace adios2 { namespace burstbuffer @@ -58,12 +64,7 @@ struct FileDrainOperation size_t fromOffset, size_t toOffset, const void *data); }; -typedef std::map> InputFileMap; -typedef std::map> OutputFileMap; -typedef std::shared_ptr InputFile; -typedef std::shared_ptr OutputFile; - -class FileDrainer +class FileDrainer : public FileDrainerIO { public: FileDrainer() = default; @@ -111,41 +112,6 @@ class FileDrainer int m_Rank = 0; int m_Verbose = 0; static const int errorState = -1; - - /** instead for Open, use this function */ - InputFile GetFileForRead(const std::string &path); - OutputFile GetFileForWrite(const std::string &path, bool append = false); - - /** return true if the File is usable (no previous errors) */ - bool Good(InputFile &f); - bool Good(OutputFile &f); - - void CloseAll(); - - void Seek(InputFile &f, size_t offset, const std::string &path); - void Seek(OutputFile &f, size_t offset, const std::string &path); - void SeekEnd(OutputFile &f); - - /** Read from file. Return a pair of - * - number of bytes written - * - time spent in waiting for file to be actually written to disk for this - * read to succeed. - */ - std::pair Read(InputFile &f, size_t count, char *buffer, - const std::string &path); - size_t Write(OutputFile &f, size_t count, const char *buffer, - const std::string &path); - - void Delete(OutputFile &f, const std::string &path); - -private: - InputFileMap m_InputFileMap; - OutputFileMap m_OutputFileMap; - void Open(InputFile &f, const std::string &path); - void Close(InputFile &f); - void Open(OutputFile &f, const std::string &path, bool append); - void Close(OutputFile &f); - size_t GetFileSize(InputFile &f); }; } // end namespace burstbuffer diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp new file mode 100644 index 0000000000..7661d456e4 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp @@ -0,0 +1,194 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainer.cpp + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#include "FileDrainerIOFstream.h" + +#include +#include +#include // std::memcpy +#include // std::this_thread::sleep_for + +/// \cond EXCLUDE_FROM_DOXYGEN +#include //std::ios_base::failure +/// \endcond + +namespace adios2 +{ +namespace burstbuffer +{ + +InputFile FileDrainerIO::GetFileForRead(const std::string &path) +{ + auto it = m_InputFileMap.find(path); + if (it != m_InputFileMap.end()) + { + return it->second; + } + else + { + InputFile f = std::make_shared(); + m_InputFileMap.emplace(path, f); + Open(f, path); + return f; + } +} + +OutputFile FileDrainerIO::GetFileForWrite(const std::string &path, bool append) +{ + auto it = m_OutputFileMap.find(path); + if (it != m_OutputFileMap.end()) + { + return it->second; + } + else + { + OutputFile f = std::make_shared(); + m_OutputFileMap.emplace(path, f); + Open(f, path, append); + return f; + } +} + +void FileDrainerIO::Open(InputFile &f, const std::string &path) +{ + + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::in | std::ios::binary); +} + +void FileDrainerIO::Open(OutputFile &f, const std::string &path, bool append) +{ + + if (append) + { + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::out | std::ios::app | std::ios::binary); + } + else + { + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::out | std::ios::trunc | std::ios::binary); + } +} + +void FileDrainerIO::Close(InputFile &f) { f->close(); } +void FileDrainerIO::Close(OutputFile &f) { f->close(); } + +bool FileDrainerIO::Good(InputFile &f) { return (f->good()); } +bool FileDrainerIO::Good(OutputFile &f) { return (f->good()); } + +void FileDrainerIO::CloseAll() +{ + for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_OutputFileMap.clear(); + for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_InputFileMap.clear(); +} + +void FileDrainerIO::Seek(InputFile &f, size_t offset, const std::string &path) +{ + f->seekg(offset, std::ios_base::beg); +} + +void FileDrainerIO::Seek(OutputFile &f, size_t offset, const std::string &path) +{ + f->seekp(offset, std::ios_base::beg); +} + +void FileDrainerIO::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); } + +size_t FileDrainerIO::GetFileSize(InputFile &f) +{ + const auto currentOffset = f->tellg(); + f->seekg(0, std::ios_base::end); + auto fileSize = f->tellg(); + f->seekg(currentOffset, std::ios_base::beg); + return static_cast(fileSize); +} + +std::pair FileDrainerIO::Read(InputFile &f, size_t count, + char *buffer, + const std::string &path) +{ + size_t totalRead = 0; + double totalSlept = 0.0; + const double sleepUnit = 0.01; // seconds + while (count > 0) + { + const auto currentOffset = f->tellg(); + f->read(buffer, static_cast(count)); + const auto readSize = f->gcount(); + + if (readSize < static_cast(count)) + { + if (f->eof()) + { + std::chrono::duration d(sleepUnit); + std::this_thread::sleep_for(d); + f->clear(f->rdstate() & ~std::fstream::eofbit); + totalSlept += sleepUnit; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't read from file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalRead + readSize) + ".\n"); + } + } + buffer += readSize; + count -= readSize; + totalRead += readSize; + } + return std::pair(totalRead, totalSlept); +} + +size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path) +{ + f->write(buffer, static_cast(count)); + + if (f->bad()) + { + throw std::ios_base::failure( + "FileDrainer couldn't write to file " + path + + " count = " + std::to_string(count) + " bytes\n"); + } + + return count; +} + +void FileDrainerIO::Delete(OutputFile &f, const std::string &path) +{ + Close(f); + std::remove(path.c_str()); +} + +void FileDrainerIO::SetVerbose(int verboseLevel, int rank) +{ + m_Verbose = verboseLevel; + m_Rank = rank; +} + +} // end namespace burstbuffer +} // end namespace adios2 diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h new file mode 100644 index 0000000000..fa8859bd08 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h @@ -0,0 +1,92 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerFstream.h + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#ifndef ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ +#define ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "adios2/common/ADIOSTypes.h" + +namespace adios2 +{ +namespace burstbuffer +{ + +typedef std::shared_ptr InputFile; +typedef std::shared_ptr OutputFile; + +class FileDrainerIO +{ +public: + FileDrainerIO() = default; + + virtual ~FileDrainerIO() = default; + + /** turn on verbosity. set rank to differentiate between the output of + * processes */ + void SetVerbose(int verboseLevel, int rank); + +protected: + /** rank of process just for stdout/stderr messages */ + int m_Rank = 0; + int m_Verbose = 0; + static const int errorState = -1; + + /** instead for Open, use this function */ + InputFile GetFileForRead(const std::string &path); + OutputFile GetFileForWrite(const std::string &path, bool append = false); + + /** return true if the File is usable (no previous errors) */ + bool Good(InputFile &f); + bool Good(OutputFile &f); + + void CloseAll(); + + void Seek(InputFile &f, size_t offset, const std::string &path); + void Seek(OutputFile &f, size_t offset, const std::string &path); + void SeekEnd(OutputFile &f); + + /** Read from file. Return a pair of + * - number of bytes written + * - time spent in waiting for file to be actually written to disk for this + * read to succeed. + */ + std::pair Read(InputFile &f, size_t count, char *buffer, + const std::string &path); + size_t Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path); + + void Delete(OutputFile &f, const std::string &path); + +private: + typedef std::map InputFileMap; + typedef std::map OutputFileMap; + InputFileMap m_InputFileMap; + OutputFileMap m_OutputFileMap; + void Open(InputFile &f, const std::string &path); + void Close(InputFile &f); + void Open(OutputFile &f, const std::string &path, bool append); + void Close(OutputFile &f); + size_t GetFileSize(InputFile &f); +}; + +} // end namespace burstbuffer +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ */ diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp new file mode 100644 index 0000000000..988a6617c3 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -0,0 +1,251 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerIOPosix.cpp + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#include "FileDrainerIOPosix.h" + +#ifndef _LARGEFILE64_SOURCE +#define _LARGEFILE64_SOURCE +#endif + +#include +#include // remove +#include // std::memcpy, strerror +#include // errno +#include // open +#include // write output +#include // open, fstat +#include // open +#include // std::this_thread::sleep_for +#include // write, close + +namespace adios2 +{ +namespace burstbuffer +{ + +InputFile FileDrainerIO::GetFileForRead(const std::string &path) +{ + auto it = m_InputFileMap.find(path); + if (it != m_InputFileMap.end()) + { + return it->second; + } + else + { + InputFile f = std::make_shared(); + m_InputFileMap.emplace(path, f); + Open(f, path); + return f; + } +} + +OutputFile FileDrainerIO::GetFileForWrite(const std::string &path, bool append) +{ + auto it = m_OutputFileMap.find(path); + if (it != m_OutputFileMap.end()) + { + return it->second; + } + else + { + OutputFile f = std::make_shared(); + m_OutputFileMap.emplace(path, f); + Open(f, path, append); + return f; + } +} + +std::string FileDrainerIO::SysErrMsg(const int errorNumber) const +{ + return std::string(": errno = " + std::to_string(errorNumber) + ": " + + strerror(errorNumber)); +} + +void FileDrainerIO::Open(InputFile &f, const std::string &path) +{ + errno = 0; + f->m_fd = open(path.c_str(), O_RDONLY); + f->m_Errno = errno; + f->m_Path = path; +} + +void FileDrainerIO::Open(OutputFile &f, const std::string &path, bool append) +{ + errno = 0; + if (append) + { + f->m_fd = open(path.c_str(), O_RDWR | O_CREAT, 0777); + lseek64(f->m_fd, 0, SEEK_END); + } + else + { + f->m_fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + } + f->m_Errno = errno; + f->m_Path = path; +} + +void FileDrainerIO::Close(InputFile &f) +{ + errno = 0; + close(f->m_fd); + f->m_Errno = errno; +} +void FileDrainerIO::Close(OutputFile &f) +{ + errno = 0; + close(f->m_fd); + f->m_Errno = errno; +} + +bool FileDrainerIO::Good(InputFile &f) { return (f->m_Errno == 0); } +bool FileDrainerIO::Good(OutputFile &f) { return (f->m_Errno == 0); } + +void FileDrainerIO::CloseAll() +{ + for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_OutputFileMap.clear(); + for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_InputFileMap.clear(); +} + +void FileDrainerIO::Seek(InputFile &f, size_t offset, const std::string &path) +{ + lseek64(f->m_fd, static_cast(offset), SEEK_SET); +} + +void FileDrainerIO::Seek(OutputFile &f, size_t offset, const std::string &path) +{ + lseek64(f->m_fd, static_cast(offset), SEEK_SET); +} + +void FileDrainerIO::SeekEnd(OutputFile &f) { lseek64(f->m_fd, 0, SEEK_END); } + +size_t FileDrainerIO::GetFileSize(InputFile &f) +{ + struct stat fileStat; + errno = 0; + if (fstat(f->m_fd, &fileStat) == -1) + { + f->m_Errno = errno; + throw std::ios_base::failure("ERROR: couldn't get size of file " + + f->m_Path + SysErrMsg(errno)); + } + f->m_Errno = errno; + return static_cast(fileStat.st_size); +} + +std::pair FileDrainerIO::Read(InputFile &f, size_t count, + char *buffer, + const std::string &path) +{ + size_t totalRead = 0; + double totalSlept = 0.0; + const double sleepUnit = 0.01; // seconds + while (count > 0) + { + const auto currentOffset = lseek64(f->m_fd, 0, SEEK_CUR); + size_t readCount = + (count < DefaultMaxFileBatchSize ? count : DefaultMaxFileBatchSize); + errno = 0; + const auto readSize = read(f->m_fd, buffer, readCount); + if (readSize == -1) + { + if (errno == EINTR) + { + continue; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't read from file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalRead + readSize) + ".\n"); + } + } + else if (readSize < static_cast(count)) + { + // need to wait for more data to come + std::chrono::duration d(sleepUnit); + std::this_thread::sleep_for(d); + totalSlept += sleepUnit; + } + buffer += readSize; + count -= readSize; + totalRead += readSize; + } + return std::pair(totalRead, totalSlept); +} + +size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path) +{ + size_t totalWritten = 0; + while (count > 0) + { + const auto currentOffset = lseek64(f->m_fd, 0, SEEK_CUR); + size_t writeCount = + (count < DefaultMaxFileBatchSize ? count : DefaultMaxFileBatchSize); + errno = 0; + const auto writtenSize = write(f->m_fd, buffer, writeCount); + if (writtenSize == -1) + { + if (errno == EINTR) + { + continue; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't write to file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalWritten + writtenSize) + ".\n"); + } + } + buffer += writtenSize; + count -= writtenSize; + totalWritten += writtenSize; + } +#if (_POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500) + fdatasync(f->m_fd); +#else + fsync(f->m_fd); +#endif + return totalWritten; +} + +void FileDrainerIO::Delete(OutputFile &f, const std::string &path) +{ + Close(f); + std::remove(path.c_str()); +} + +void FileDrainerIO::SetVerbose(int verboseLevel, int rank) +{ + m_Verbose = verboseLevel; + m_Rank = rank; +} + +} // end namespace burstbuffer +} // end namespace adios2 diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h new file mode 100644 index 0000000000..e95456b4b8 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h @@ -0,0 +1,108 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerFstream.h + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#ifndef ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ +#define ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "adios2/common/ADIOSTypes.h" + +namespace adios2 +{ +namespace burstbuffer +{ + +class PosixInputFile +{ +public: + int m_fd; + int m_Errno; + std::string m_Path; +}; +class PosixOutputFile +{ +public: + int m_fd; + int m_Errno; + std::string m_Path; +}; + +using InputFile = std::shared_ptr; +using OutputFile = std::shared_ptr; + +class FileDrainerIO +{ +public: + FileDrainerIO() = default; + + virtual ~FileDrainerIO() = default; + + /** turn on verbosity. set rank to differentiate between the output of + * processes */ + void SetVerbose(int verboseLevel, int rank); + +protected: + /** rank of process just for stdout/stderr messages */ + int m_Rank = 0; + int m_Verbose = 0; + static const int errorState = -1; + + /** instead for Open, use this function */ + InputFile GetFileForRead(const std::string &path); + OutputFile GetFileForWrite(const std::string &path, bool append = false); + + /** return true if the File is usable (no previous errors) */ + bool Good(InputFile &f); + bool Good(OutputFile &f); + + void CloseAll(); + + void Seek(InputFile &f, size_t offset, const std::string &path); + void Seek(OutputFile &f, size_t offset, const std::string &path); + void SeekEnd(OutputFile &f); + + /** Read from file. Return a pair of + * - number of bytes written + * - time spent in waiting for file to be actually written to disk for this + * read to succeed. + */ + std::pair Read(InputFile &f, size_t count, char *buffer, + const std::string &path); + size_t Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path); + + void Delete(OutputFile &f, const std::string &path); + +private: + using InputFileMap = std::map; + using OutputFileMap = std::map; + InputFileMap m_InputFileMap; + OutputFileMap m_OutputFileMap; + std::string SysErrMsg(const int errorNumber) const; + void Open(InputFile &f, const std::string &path); + void Close(InputFile &f); + void Open(OutputFile &f, const std::string &path, bool append); + void Close(OutputFile &f); + size_t GetFileSize(InputFile &f); +}; + +} // end namespace burstbuffer +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ */ From 2fdb1ddc5b14ee1fce5aa8673b86960af282b53d Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Thu, 17 Dec 2020 16:43:47 -0500 Subject: [PATCH 2/6] fsync data less frequently, but read/write in small pieces --- .../burstbuffer/FileDrainerIOFstream.cpp | 2 + .../burstbuffer/FileDrainerIOFstream.h | 2 + .../burstbuffer/FileDrainerIOPosix.cpp | 11 +++-- .../toolkit/burstbuffer/FileDrainerIOPosix.h | 2 + .../burstbuffer/FileDrainerSingleThread.cpp | 47 +++++++++++++++++-- .../burstbuffer/FileDrainerSingleThread.h | 9 +++- 6 files changed, 63 insertions(+), 10 deletions(-) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp index 7661d456e4..2b3ddee02f 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp @@ -178,6 +178,8 @@ size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, return count; } +int FileDrainerIO::FileSync(OutputFile &f) { return 0; } + void FileDrainerIO::Delete(OutputFile &f, const std::string &path) { Close(f); diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h index fa8859bd08..7239311913 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h @@ -72,6 +72,8 @@ class FileDrainerIO size_t Write(OutputFile &f, size_t count, const char *buffer, const std::string &path); + int FileSync(OutputFile &f); + void Delete(OutputFile &f, const std::string &path); private: diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp index 988a6617c3..d14fecd231 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -227,12 +227,17 @@ size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, count -= writtenSize; totalWritten += writtenSize; } + + return totalWritten; +} + +int FileDrainerIO::FileSync(OutputFile &f) +{ #if (_POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500) - fdatasync(f->m_fd); + return fdatasync(f->m_fd); #else - fsync(f->m_fd); + return fsync(f->m_fd); #endif - return totalWritten; } void FileDrainerIO::Delete(OutputFile &f, const std::string &path) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h index e95456b4b8..a6215c4ca6 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h @@ -87,6 +87,8 @@ class FileDrainerIO size_t Write(OutputFile &f, size_t count, const char *buffer, const std::string &path); + int FileSync(OutputFile &f); + void Delete(OutputFile &f, const std::string &path); private: diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp index b2a37e6ca4..ee36aed6b6 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp @@ -37,11 +37,13 @@ FileDrainerSingleThread::FileDrainerSingleThread() : FileDrainer() {} FileDrainerSingleThread::~FileDrainerSingleThread() { Join(); } -void FileDrainerSingleThread::SetBufferSize(size_t bufferSizeBytes) +void FileDrainerSingleThread::SetBufferSize(size_t bytes) { - bufferSize = bufferSizeBytes; + bufferSize = bytes; } +void FileDrainerSingleThread::SetFlushSize(size_t bytes) { flushSize = bytes; } + void FileDrainerSingleThread::Start() { th = std::thread(&FileDrainerSingleThread::DrainThread, this); @@ -208,15 +210,50 @@ void FileDrainerSingleThread::DrainThread() te = std::chrono::steady_clock::now(); timeWrite += te - ts; } + size_t nonflushed = 0; + std::chrono::duration sleeptime(0.001); const size_t batches = fdo.countBytes / bufferSize; - const size_t remainder = fdo.countBytes % bufferSize; + const size_t remainderBytes = fdo.countBytes % bufferSize; for (size_t b = 0; b < batches; ++b) { lf_Copy(fdo, fdr, fdw, bufferSize); + nonflushed += bufferSize; + if (nonflushed > flushSize) + { + + if (m_Verbose >= 2) + { +#ifndef NO_SANITIZE_THREAD + std::cout << "Sync " << m_Rank << ": sync " + << fdo.toFileName << " to disk " + << nonflushed << " bytes " + << std::endl; +#endif + } + + FileSync(fdw); + nonflushed = 0; + } + std::this_thread::sleep_for(sleeptime); } - if (remainder) + if (remainderBytes) { - lf_Copy(fdo, fdr, fdw, remainder); + lf_Copy(fdo, fdr, fdw, remainderBytes); + nonflushed += remainderBytes; + } + if (nonflushed > 0) + { + + if (m_Verbose >= 2) + { +#ifndef NO_SANITIZE_THREAD + std::cout << "Sync " << m_Rank << ": sync " + << fdo.toFileName << " to disk " + << nonflushed << " bytes " << std::endl; +#endif + } + FileSync(fdw); + std::this_thread::sleep_for(sleeptime); } } catch (std::ios_base::failure &e) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h index dc5924cff7..bedff81659 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h @@ -24,13 +24,17 @@ class FileDrainerSingleThread : public FileDrainer { public: - static const size_t defaultBufferSize = 4194304; // 4MB + // Size of drain buffer (read/write buffer owned by the thread) + static constexpr size_t defaultBufferSize = 4194304; // 4MB + // Sync to disk when draining reaches flush size + static constexpr size_t defaultFlushSize = 32 * defaultBufferSize; // 128MB FileDrainerSingleThread(); ~FileDrainerSingleThread(); - void SetBufferSize(size_t bufferSizeBytes); + void SetBufferSize(size_t bytes); + void SetFlushSize(size_t bytes); /** Create thread. * This will create a thread to continuously run and idle if there @@ -47,6 +51,7 @@ class FileDrainerSingleThread : public FileDrainer private: size_t bufferSize = defaultBufferSize; + size_t flushSize = defaultFlushSize; std::thread th; // created by constructor bool finish = false; std::mutex finishMutex; From 5ffc3c97ca34804e121f87ba85c41a3c238d2d8b Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Thu, 17 Dec 2020 11:48:13 -0500 Subject: [PATCH 3/6] Add a POSIX version of file drainer to be used on non-Windows machine. This can call fdatasync() to ensure writing to disk --- source/adios2/CMakeLists.txt | 3 + .../toolkit/burstbuffer/FileDrainer.cpp | 160 ----------- .../adios2/toolkit/burstbuffer/FileDrainer.h | 48 +--- .../burstbuffer/FileDrainerIOFstream.cpp | 194 ++++++++++++++ .../burstbuffer/FileDrainerIOFstream.h | 92 +++++++ .../burstbuffer/FileDrainerIOPosix.cpp | 251 ++++++++++++++++++ .../toolkit/burstbuffer/FileDrainerIOPosix.h | 108 ++++++++ 7 files changed, 655 insertions(+), 201 deletions(-) create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp create mode 100644 source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 2b3a844474..e3024460a8 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -126,6 +126,9 @@ target_compile_features(adios2_core PUBLIC "$second; - } - else - { - InputFile f = std::make_shared(); - m_InputFileMap.emplace(path, f); - Open(f, path); - return f; - } -} - -OutputFile FileDrainer::GetFileForWrite(const std::string &path, bool append) -{ - auto it = m_OutputFileMap.find(path); - if (it != m_OutputFileMap.end()) - { - return it->second; - } - else - { - OutputFile f = std::make_shared(); - m_OutputFileMap.emplace(path, f); - Open(f, path, append); - return f; - } -} - -void FileDrainer::Open(InputFile &f, const std::string &path) -{ - - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::in | std::ios::binary); -} - -void FileDrainer::Open(OutputFile &f, const std::string &path, bool append) -{ - - if (append) - { - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::out | std::ios::app | std::ios::binary); - } - else - { - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::out | std::ios::trunc | std::ios::binary); - } -} - -void FileDrainer::Close(InputFile &f) { f->close(); } -void FileDrainer::Close(OutputFile &f) { f->close(); } - -bool FileDrainer::Good(InputFile &f) { return (f->good()); } -bool FileDrainer::Good(OutputFile &f) { return (f->good()); } - -void FileDrainer::CloseAll() -{ - for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) - { - // if (it->second->good()) - //{ - Close(it->second); - //} - } - m_OutputFileMap.clear(); - for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) - { - // if (it->second->good()) - //{ - Close(it->second); - //} - } - m_InputFileMap.clear(); -} - -void FileDrainer::Seek(InputFile &f, size_t offset, const std::string &path) -{ - f->seekg(offset, std::ios_base::beg); -} - -void FileDrainer::Seek(OutputFile &f, size_t offset, const std::string &path) -{ - f->seekp(offset, std::ios_base::beg); -} - -void FileDrainer::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); } - -size_t FileDrainer::GetFileSize(InputFile &f) -{ - const auto currentOffset = f->tellg(); - f->seekg(0, std::ios_base::end); - auto fileSize = f->tellg(); - f->seekg(currentOffset, std::ios_base::beg); - return static_cast(fileSize); -} - -std::pair FileDrainer::Read(InputFile &f, size_t count, - char *buffer, - const std::string &path) -{ - size_t totalRead = 0; - double totalSlept = 0.0; - const double sleepUnit = 0.01; // seconds - while (count > 0) - { - const auto currentOffset = f->tellg(); - f->read(buffer, static_cast(count)); - const auto readSize = f->gcount(); - - if (readSize < static_cast(count)) - { - if (f->eof()) - { - std::chrono::duration d(sleepUnit); - std::this_thread::sleep_for(d); - f->clear(f->rdstate() & ~std::fstream::eofbit); - totalSlept += sleepUnit; - } - else - { - throw std::ios_base::failure( - "FileDrainer couldn't read from file " + path + - " offset = " + std::to_string(currentOffset) + - " count = " + std::to_string(count) + " bytes but only " + - std::to_string(totalRead + readSize) + ".\n"); - } - } - buffer += readSize; - count -= readSize; - totalRead += readSize; - } - return std::pair(totalRead, totalSlept); -} - -size_t FileDrainer::Write(OutputFile &f, size_t count, const char *buffer, - const std::string &path) -{ - f->write(buffer, static_cast(count)); - - if (f->bad()) - { - throw std::ios_base::failure( - "FileDrainer couldn't write to file " + path + - " count = " + std::to_string(count) + " bytes\n"); - } - - return count; -} - -void FileDrainer::Delete(OutputFile &f, const std::string &path) -{ - Close(f); - std::remove(path.c_str()); -} - void FileDrainer::SetVerbose(int verboseLevel, int rank) { m_Verbose = verboseLevel; diff --git a/source/adios2/toolkit/burstbuffer/FileDrainer.h b/source/adios2/toolkit/burstbuffer/FileDrainer.h index db0da66276..e58bc60450 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainer.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainer.h @@ -23,6 +23,12 @@ #include "adios2/common/ADIOSTypes.h" +#ifdef _WIN32 +#include "FileDrainerIOFstream.h" +#else +#include "FileDrainerIOPosix.h" +#endif + namespace adios2 { namespace burstbuffer @@ -58,12 +64,7 @@ struct FileDrainOperation size_t fromOffset, size_t toOffset, const void *data); }; -typedef std::map> InputFileMap; -typedef std::map> OutputFileMap; -typedef std::shared_ptr InputFile; -typedef std::shared_ptr OutputFile; - -class FileDrainer +class FileDrainer : public FileDrainerIO { public: FileDrainer() = default; @@ -111,41 +112,6 @@ class FileDrainer int m_Rank = 0; int m_Verbose = 0; static const int errorState = -1; - - /** instead for Open, use this function */ - InputFile GetFileForRead(const std::string &path); - OutputFile GetFileForWrite(const std::string &path, bool append = false); - - /** return true if the File is usable (no previous errors) */ - bool Good(InputFile &f); - bool Good(OutputFile &f); - - void CloseAll(); - - void Seek(InputFile &f, size_t offset, const std::string &path); - void Seek(OutputFile &f, size_t offset, const std::string &path); - void SeekEnd(OutputFile &f); - - /** Read from file. Return a pair of - * - number of bytes written - * - time spent in waiting for file to be actually written to disk for this - * read to succeed. - */ - std::pair Read(InputFile &f, size_t count, char *buffer, - const std::string &path); - size_t Write(OutputFile &f, size_t count, const char *buffer, - const std::string &path); - - void Delete(OutputFile &f, const std::string &path); - -private: - InputFileMap m_InputFileMap; - OutputFileMap m_OutputFileMap; - void Open(InputFile &f, const std::string &path); - void Close(InputFile &f); - void Open(OutputFile &f, const std::string &path, bool append); - void Close(OutputFile &f); - size_t GetFileSize(InputFile &f); }; } // end namespace burstbuffer diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp new file mode 100644 index 0000000000..7661d456e4 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp @@ -0,0 +1,194 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainer.cpp + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#include "FileDrainerIOFstream.h" + +#include +#include +#include // std::memcpy +#include // std::this_thread::sleep_for + +/// \cond EXCLUDE_FROM_DOXYGEN +#include //std::ios_base::failure +/// \endcond + +namespace adios2 +{ +namespace burstbuffer +{ + +InputFile FileDrainerIO::GetFileForRead(const std::string &path) +{ + auto it = m_InputFileMap.find(path); + if (it != m_InputFileMap.end()) + { + return it->second; + } + else + { + InputFile f = std::make_shared(); + m_InputFileMap.emplace(path, f); + Open(f, path); + return f; + } +} + +OutputFile FileDrainerIO::GetFileForWrite(const std::string &path, bool append) +{ + auto it = m_OutputFileMap.find(path); + if (it != m_OutputFileMap.end()) + { + return it->second; + } + else + { + OutputFile f = std::make_shared(); + m_OutputFileMap.emplace(path, f); + Open(f, path, append); + return f; + } +} + +void FileDrainerIO::Open(InputFile &f, const std::string &path) +{ + + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::in | std::ios::binary); +} + +void FileDrainerIO::Open(OutputFile &f, const std::string &path, bool append) +{ + + if (append) + { + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::out | std::ios::app | std::ios::binary); + } + else + { + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::out | std::ios::trunc | std::ios::binary); + } +} + +void FileDrainerIO::Close(InputFile &f) { f->close(); } +void FileDrainerIO::Close(OutputFile &f) { f->close(); } + +bool FileDrainerIO::Good(InputFile &f) { return (f->good()); } +bool FileDrainerIO::Good(OutputFile &f) { return (f->good()); } + +void FileDrainerIO::CloseAll() +{ + for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_OutputFileMap.clear(); + for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_InputFileMap.clear(); +} + +void FileDrainerIO::Seek(InputFile &f, size_t offset, const std::string &path) +{ + f->seekg(offset, std::ios_base::beg); +} + +void FileDrainerIO::Seek(OutputFile &f, size_t offset, const std::string &path) +{ + f->seekp(offset, std::ios_base::beg); +} + +void FileDrainerIO::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); } + +size_t FileDrainerIO::GetFileSize(InputFile &f) +{ + const auto currentOffset = f->tellg(); + f->seekg(0, std::ios_base::end); + auto fileSize = f->tellg(); + f->seekg(currentOffset, std::ios_base::beg); + return static_cast(fileSize); +} + +std::pair FileDrainerIO::Read(InputFile &f, size_t count, + char *buffer, + const std::string &path) +{ + size_t totalRead = 0; + double totalSlept = 0.0; + const double sleepUnit = 0.01; // seconds + while (count > 0) + { + const auto currentOffset = f->tellg(); + f->read(buffer, static_cast(count)); + const auto readSize = f->gcount(); + + if (readSize < static_cast(count)) + { + if (f->eof()) + { + std::chrono::duration d(sleepUnit); + std::this_thread::sleep_for(d); + f->clear(f->rdstate() & ~std::fstream::eofbit); + totalSlept += sleepUnit; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't read from file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalRead + readSize) + ".\n"); + } + } + buffer += readSize; + count -= readSize; + totalRead += readSize; + } + return std::pair(totalRead, totalSlept); +} + +size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path) +{ + f->write(buffer, static_cast(count)); + + if (f->bad()) + { + throw std::ios_base::failure( + "FileDrainer couldn't write to file " + path + + " count = " + std::to_string(count) + " bytes\n"); + } + + return count; +} + +void FileDrainerIO::Delete(OutputFile &f, const std::string &path) +{ + Close(f); + std::remove(path.c_str()); +} + +void FileDrainerIO::SetVerbose(int verboseLevel, int rank) +{ + m_Verbose = verboseLevel; + m_Rank = rank; +} + +} // end namespace burstbuffer +} // end namespace adios2 diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h new file mode 100644 index 0000000000..fa8859bd08 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h @@ -0,0 +1,92 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerFstream.h + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#ifndef ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ +#define ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "adios2/common/ADIOSTypes.h" + +namespace adios2 +{ +namespace burstbuffer +{ + +typedef std::shared_ptr InputFile; +typedef std::shared_ptr OutputFile; + +class FileDrainerIO +{ +public: + FileDrainerIO() = default; + + virtual ~FileDrainerIO() = default; + + /** turn on verbosity. set rank to differentiate between the output of + * processes */ + void SetVerbose(int verboseLevel, int rank); + +protected: + /** rank of process just for stdout/stderr messages */ + int m_Rank = 0; + int m_Verbose = 0; + static const int errorState = -1; + + /** instead for Open, use this function */ + InputFile GetFileForRead(const std::string &path); + OutputFile GetFileForWrite(const std::string &path, bool append = false); + + /** return true if the File is usable (no previous errors) */ + bool Good(InputFile &f); + bool Good(OutputFile &f); + + void CloseAll(); + + void Seek(InputFile &f, size_t offset, const std::string &path); + void Seek(OutputFile &f, size_t offset, const std::string &path); + void SeekEnd(OutputFile &f); + + /** Read from file. Return a pair of + * - number of bytes written + * - time spent in waiting for file to be actually written to disk for this + * read to succeed. + */ + std::pair Read(InputFile &f, size_t count, char *buffer, + const std::string &path); + size_t Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path); + + void Delete(OutputFile &f, const std::string &path); + +private: + typedef std::map InputFileMap; + typedef std::map OutputFileMap; + InputFileMap m_InputFileMap; + OutputFileMap m_OutputFileMap; + void Open(InputFile &f, const std::string &path); + void Close(InputFile &f); + void Open(OutputFile &f, const std::string &path, bool append); + void Close(OutputFile &f); + size_t GetFileSize(InputFile &f); +}; + +} // end namespace burstbuffer +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ */ diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp new file mode 100644 index 0000000000..988a6617c3 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -0,0 +1,251 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerIOPosix.cpp + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#include "FileDrainerIOPosix.h" + +#ifndef _LARGEFILE64_SOURCE +#define _LARGEFILE64_SOURCE +#endif + +#include +#include // remove +#include // std::memcpy, strerror +#include // errno +#include // open +#include // write output +#include // open, fstat +#include // open +#include // std::this_thread::sleep_for +#include // write, close + +namespace adios2 +{ +namespace burstbuffer +{ + +InputFile FileDrainerIO::GetFileForRead(const std::string &path) +{ + auto it = m_InputFileMap.find(path); + if (it != m_InputFileMap.end()) + { + return it->second; + } + else + { + InputFile f = std::make_shared(); + m_InputFileMap.emplace(path, f); + Open(f, path); + return f; + } +} + +OutputFile FileDrainerIO::GetFileForWrite(const std::string &path, bool append) +{ + auto it = m_OutputFileMap.find(path); + if (it != m_OutputFileMap.end()) + { + return it->second; + } + else + { + OutputFile f = std::make_shared(); + m_OutputFileMap.emplace(path, f); + Open(f, path, append); + return f; + } +} + +std::string FileDrainerIO::SysErrMsg(const int errorNumber) const +{ + return std::string(": errno = " + std::to_string(errorNumber) + ": " + + strerror(errorNumber)); +} + +void FileDrainerIO::Open(InputFile &f, const std::string &path) +{ + errno = 0; + f->m_fd = open(path.c_str(), O_RDONLY); + f->m_Errno = errno; + f->m_Path = path; +} + +void FileDrainerIO::Open(OutputFile &f, const std::string &path, bool append) +{ + errno = 0; + if (append) + { + f->m_fd = open(path.c_str(), O_RDWR | O_CREAT, 0777); + lseek64(f->m_fd, 0, SEEK_END); + } + else + { + f->m_fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + } + f->m_Errno = errno; + f->m_Path = path; +} + +void FileDrainerIO::Close(InputFile &f) +{ + errno = 0; + close(f->m_fd); + f->m_Errno = errno; +} +void FileDrainerIO::Close(OutputFile &f) +{ + errno = 0; + close(f->m_fd); + f->m_Errno = errno; +} + +bool FileDrainerIO::Good(InputFile &f) { return (f->m_Errno == 0); } +bool FileDrainerIO::Good(OutputFile &f) { return (f->m_Errno == 0); } + +void FileDrainerIO::CloseAll() +{ + for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_OutputFileMap.clear(); + for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_InputFileMap.clear(); +} + +void FileDrainerIO::Seek(InputFile &f, size_t offset, const std::string &path) +{ + lseek64(f->m_fd, static_cast(offset), SEEK_SET); +} + +void FileDrainerIO::Seek(OutputFile &f, size_t offset, const std::string &path) +{ + lseek64(f->m_fd, static_cast(offset), SEEK_SET); +} + +void FileDrainerIO::SeekEnd(OutputFile &f) { lseek64(f->m_fd, 0, SEEK_END); } + +size_t FileDrainerIO::GetFileSize(InputFile &f) +{ + struct stat fileStat; + errno = 0; + if (fstat(f->m_fd, &fileStat) == -1) + { + f->m_Errno = errno; + throw std::ios_base::failure("ERROR: couldn't get size of file " + + f->m_Path + SysErrMsg(errno)); + } + f->m_Errno = errno; + return static_cast(fileStat.st_size); +} + +std::pair FileDrainerIO::Read(InputFile &f, size_t count, + char *buffer, + const std::string &path) +{ + size_t totalRead = 0; + double totalSlept = 0.0; + const double sleepUnit = 0.01; // seconds + while (count > 0) + { + const auto currentOffset = lseek64(f->m_fd, 0, SEEK_CUR); + size_t readCount = + (count < DefaultMaxFileBatchSize ? count : DefaultMaxFileBatchSize); + errno = 0; + const auto readSize = read(f->m_fd, buffer, readCount); + if (readSize == -1) + { + if (errno == EINTR) + { + continue; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't read from file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalRead + readSize) + ".\n"); + } + } + else if (readSize < static_cast(count)) + { + // need to wait for more data to come + std::chrono::duration d(sleepUnit); + std::this_thread::sleep_for(d); + totalSlept += sleepUnit; + } + buffer += readSize; + count -= readSize; + totalRead += readSize; + } + return std::pair(totalRead, totalSlept); +} + +size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path) +{ + size_t totalWritten = 0; + while (count > 0) + { + const auto currentOffset = lseek64(f->m_fd, 0, SEEK_CUR); + size_t writeCount = + (count < DefaultMaxFileBatchSize ? count : DefaultMaxFileBatchSize); + errno = 0; + const auto writtenSize = write(f->m_fd, buffer, writeCount); + if (writtenSize == -1) + { + if (errno == EINTR) + { + continue; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't write to file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalWritten + writtenSize) + ".\n"); + } + } + buffer += writtenSize; + count -= writtenSize; + totalWritten += writtenSize; + } +#if (_POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500) + fdatasync(f->m_fd); +#else + fsync(f->m_fd); +#endif + return totalWritten; +} + +void FileDrainerIO::Delete(OutputFile &f, const std::string &path) +{ + Close(f); + std::remove(path.c_str()); +} + +void FileDrainerIO::SetVerbose(int verboseLevel, int rank) +{ + m_Verbose = verboseLevel; + m_Rank = rank; +} + +} // end namespace burstbuffer +} // end namespace adios2 diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h new file mode 100644 index 0000000000..e95456b4b8 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h @@ -0,0 +1,108 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerFstream.h + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#ifndef ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ +#define ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "adios2/common/ADIOSTypes.h" + +namespace adios2 +{ +namespace burstbuffer +{ + +class PosixInputFile +{ +public: + int m_fd; + int m_Errno; + std::string m_Path; +}; +class PosixOutputFile +{ +public: + int m_fd; + int m_Errno; + std::string m_Path; +}; + +using InputFile = std::shared_ptr; +using OutputFile = std::shared_ptr; + +class FileDrainerIO +{ +public: + FileDrainerIO() = default; + + virtual ~FileDrainerIO() = default; + + /** turn on verbosity. set rank to differentiate between the output of + * processes */ + void SetVerbose(int verboseLevel, int rank); + +protected: + /** rank of process just for stdout/stderr messages */ + int m_Rank = 0; + int m_Verbose = 0; + static const int errorState = -1; + + /** instead for Open, use this function */ + InputFile GetFileForRead(const std::string &path); + OutputFile GetFileForWrite(const std::string &path, bool append = false); + + /** return true if the File is usable (no previous errors) */ + bool Good(InputFile &f); + bool Good(OutputFile &f); + + void CloseAll(); + + void Seek(InputFile &f, size_t offset, const std::string &path); + void Seek(OutputFile &f, size_t offset, const std::string &path); + void SeekEnd(OutputFile &f); + + /** Read from file. Return a pair of + * - number of bytes written + * - time spent in waiting for file to be actually written to disk for this + * read to succeed. + */ + std::pair Read(InputFile &f, size_t count, char *buffer, + const std::string &path); + size_t Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path); + + void Delete(OutputFile &f, const std::string &path); + +private: + using InputFileMap = std::map; + using OutputFileMap = std::map; + InputFileMap m_InputFileMap; + OutputFileMap m_OutputFileMap; + std::string SysErrMsg(const int errorNumber) const; + void Open(InputFile &f, const std::string &path); + void Close(InputFile &f); + void Open(OutputFile &f, const std::string &path, bool append); + void Close(OutputFile &f); + size_t GetFileSize(InputFile &f); +}; + +} // end namespace burstbuffer +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ */ From 11265882e75f13e520fff4292df49aea4eb0bcb6 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Thu, 17 Dec 2020 16:43:47 -0500 Subject: [PATCH 4/6] fsync data less frequently, but read/write in small pieces --- .../burstbuffer/FileDrainerIOFstream.cpp | 2 + .../burstbuffer/FileDrainerIOFstream.h | 2 + .../burstbuffer/FileDrainerIOPosix.cpp | 11 +++-- .../toolkit/burstbuffer/FileDrainerIOPosix.h | 2 + .../burstbuffer/FileDrainerSingleThread.cpp | 47 +++++++++++++++++-- .../burstbuffer/FileDrainerSingleThread.h | 9 +++- 6 files changed, 63 insertions(+), 10 deletions(-) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp index 7661d456e4..2b3ddee02f 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp @@ -178,6 +178,8 @@ size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, return count; } +int FileDrainerIO::FileSync(OutputFile &f) { return 0; } + void FileDrainerIO::Delete(OutputFile &f, const std::string &path) { Close(f); diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h index fa8859bd08..7239311913 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h @@ -72,6 +72,8 @@ class FileDrainerIO size_t Write(OutputFile &f, size_t count, const char *buffer, const std::string &path); + int FileSync(OutputFile &f); + void Delete(OutputFile &f, const std::string &path); private: diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp index 988a6617c3..d14fecd231 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -227,12 +227,17 @@ size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, count -= writtenSize; totalWritten += writtenSize; } + + return totalWritten; +} + +int FileDrainerIO::FileSync(OutputFile &f) +{ #if (_POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500) - fdatasync(f->m_fd); + return fdatasync(f->m_fd); #else - fsync(f->m_fd); + return fsync(f->m_fd); #endif - return totalWritten; } void FileDrainerIO::Delete(OutputFile &f, const std::string &path) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h index e95456b4b8..a6215c4ca6 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h @@ -87,6 +87,8 @@ class FileDrainerIO size_t Write(OutputFile &f, size_t count, const char *buffer, const std::string &path); + int FileSync(OutputFile &f); + void Delete(OutputFile &f, const std::string &path); private: diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp index b2a37e6ca4..ee36aed6b6 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp @@ -37,11 +37,13 @@ FileDrainerSingleThread::FileDrainerSingleThread() : FileDrainer() {} FileDrainerSingleThread::~FileDrainerSingleThread() { Join(); } -void FileDrainerSingleThread::SetBufferSize(size_t bufferSizeBytes) +void FileDrainerSingleThread::SetBufferSize(size_t bytes) { - bufferSize = bufferSizeBytes; + bufferSize = bytes; } +void FileDrainerSingleThread::SetFlushSize(size_t bytes) { flushSize = bytes; } + void FileDrainerSingleThread::Start() { th = std::thread(&FileDrainerSingleThread::DrainThread, this); @@ -208,15 +210,50 @@ void FileDrainerSingleThread::DrainThread() te = std::chrono::steady_clock::now(); timeWrite += te - ts; } + size_t nonflushed = 0; + std::chrono::duration sleeptime(0.001); const size_t batches = fdo.countBytes / bufferSize; - const size_t remainder = fdo.countBytes % bufferSize; + const size_t remainderBytes = fdo.countBytes % bufferSize; for (size_t b = 0; b < batches; ++b) { lf_Copy(fdo, fdr, fdw, bufferSize); + nonflushed += bufferSize; + if (nonflushed > flushSize) + { + + if (m_Verbose >= 2) + { +#ifndef NO_SANITIZE_THREAD + std::cout << "Sync " << m_Rank << ": sync " + << fdo.toFileName << " to disk " + << nonflushed << " bytes " + << std::endl; +#endif + } + + FileSync(fdw); + nonflushed = 0; + } + std::this_thread::sleep_for(sleeptime); } - if (remainder) + if (remainderBytes) { - lf_Copy(fdo, fdr, fdw, remainder); + lf_Copy(fdo, fdr, fdw, remainderBytes); + nonflushed += remainderBytes; + } + if (nonflushed > 0) + { + + if (m_Verbose >= 2) + { +#ifndef NO_SANITIZE_THREAD + std::cout << "Sync " << m_Rank << ": sync " + << fdo.toFileName << " to disk " + << nonflushed << " bytes " << std::endl; +#endif + } + FileSync(fdw); + std::this_thread::sleep_for(sleeptime); } } catch (std::ios_base::failure &e) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h index dc5924cff7..bedff81659 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h @@ -24,13 +24,17 @@ class FileDrainerSingleThread : public FileDrainer { public: - static const size_t defaultBufferSize = 4194304; // 4MB + // Size of drain buffer (read/write buffer owned by the thread) + static constexpr size_t defaultBufferSize = 4194304; // 4MB + // Sync to disk when draining reaches flush size + static constexpr size_t defaultFlushSize = 32 * defaultBufferSize; // 128MB FileDrainerSingleThread(); ~FileDrainerSingleThread(); - void SetBufferSize(size_t bufferSizeBytes); + void SetBufferSize(size_t bytes); + void SetFlushSize(size_t bytes); /** Create thread. * This will create a thread to continuously run and idle if there @@ -47,6 +51,7 @@ class FileDrainerSingleThread : public FileDrainer private: size_t bufferSize = defaultBufferSize; + size_t flushSize = defaultFlushSize; std::thread th; // created by constructor bool finish = false; std::mutex finishMutex; From d85cd56184c71823468f77df6454c39c4cffe6a7 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 5 Jan 2021 10:25:29 -0500 Subject: [PATCH 5/6] use lseek instead of lseek64 on Mac --- source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp index d14fecd231..24482893d0 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -25,6 +25,11 @@ #include // std::this_thread::sleep_for #include // write, close +#ifdef __APPLE__ & __MACH__ +#define lseek64 lseek +#define open64 open +#endif + namespace adios2 { namespace burstbuffer From fb35f81f8ca6482560d4313061656276b247c93a Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Tue, 5 Jan 2021 10:28:04 -0500 Subject: [PATCH 6/6] do this apple thing properly --- source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp index 24482893d0..c6c0e998c2 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -25,9 +25,10 @@ #include // std::this_thread::sleep_for #include // write, close -#ifdef __APPLE__ & __MACH__ +#if defined(__APPLE__) && defined(__MACH__) #define lseek64 lseek #define open64 open +#define off64_t off_t #endif namespace adios2