diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 2b3a844474..e3024460a8 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -126,6 +126,9 @@ target_compile_features(adios2_core PUBLIC "$second; - } - else - { - InputFile f = std::make_shared(); - m_InputFileMap.emplace(path, f); - Open(f, path); - return f; - } -} - -OutputFile FileDrainer::GetFileForWrite(const std::string &path, bool append) -{ - auto it = m_OutputFileMap.find(path); - if (it != m_OutputFileMap.end()) - { - return it->second; - } - else - { - OutputFile f = std::make_shared(); - m_OutputFileMap.emplace(path, f); - Open(f, path, append); - return f; - } -} - -void FileDrainer::Open(InputFile &f, const std::string &path) -{ - - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::in | std::ios::binary); -} - -void FileDrainer::Open(OutputFile &f, const std::string &path, bool append) -{ - - if (append) - { - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::out | std::ios::app | std::ios::binary); - } - else - { - f->rdbuf()->pubsetbuf(0, 0); - f->open(path, std::ios::out | std::ios::trunc | std::ios::binary); - } -} - -void FileDrainer::Close(InputFile &f) { f->close(); } -void FileDrainer::Close(OutputFile &f) { f->close(); } - -bool FileDrainer::Good(InputFile &f) { return (f->good()); } -bool FileDrainer::Good(OutputFile &f) { return (f->good()); } - -void FileDrainer::CloseAll() -{ - for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) - { - // if (it->second->good()) - //{ - Close(it->second); - //} - } - m_OutputFileMap.clear(); - for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) - { - // if (it->second->good()) - //{ - Close(it->second); - //} - } - m_InputFileMap.clear(); -} - -void FileDrainer::Seek(InputFile &f, size_t offset, const std::string &path) -{ - f->seekg(offset, std::ios_base::beg); -} - -void FileDrainer::Seek(OutputFile &f, size_t offset, const std::string &path) -{ - f->seekp(offset, std::ios_base::beg); -} - -void FileDrainer::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); } - -size_t FileDrainer::GetFileSize(InputFile &f) -{ - const auto currentOffset = f->tellg(); - f->seekg(0, std::ios_base::end); - auto fileSize = f->tellg(); - f->seekg(currentOffset, std::ios_base::beg); - return static_cast(fileSize); -} - -std::pair FileDrainer::Read(InputFile &f, size_t count, - char *buffer, - const std::string &path) -{ - size_t totalRead = 0; - double totalSlept = 0.0; - const double sleepUnit = 0.01; // seconds - while (count > 0) - { - const auto currentOffset = f->tellg(); - f->read(buffer, static_cast(count)); - const auto readSize = f->gcount(); - - if (readSize < static_cast(count)) - { - if (f->eof()) - { - std::chrono::duration d(sleepUnit); - std::this_thread::sleep_for(d); - f->clear(f->rdstate() & ~std::fstream::eofbit); - totalSlept += sleepUnit; - } - else - { - throw std::ios_base::failure( - "FileDrainer couldn't read from file " + path + - " offset = " + std::to_string(currentOffset) + - " count = " + std::to_string(count) + " bytes but only " + - std::to_string(totalRead + readSize) + ".\n"); - } - } - buffer += readSize; - count -= readSize; - totalRead += readSize; - } - return std::pair(totalRead, totalSlept); -} - -size_t FileDrainer::Write(OutputFile &f, size_t count, const char *buffer, - const std::string &path) -{ - f->write(buffer, static_cast(count)); - - if (f->bad()) - { - throw std::ios_base::failure( - "FileDrainer couldn't write to file " + path + - " count = " + std::to_string(count) + " bytes\n"); - } - - return count; -} - -void FileDrainer::Delete(OutputFile &f, const std::string &path) -{ - Close(f); - std::remove(path.c_str()); -} - void FileDrainer::SetVerbose(int verboseLevel, int rank) { m_Verbose = verboseLevel; diff --git a/source/adios2/toolkit/burstbuffer/FileDrainer.h b/source/adios2/toolkit/burstbuffer/FileDrainer.h index db0da66276..e58bc60450 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainer.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainer.h @@ -23,6 +23,12 @@ #include "adios2/common/ADIOSTypes.h" +#ifdef _WIN32 +#include "FileDrainerIOFstream.h" +#else +#include "FileDrainerIOPosix.h" +#endif + namespace adios2 { namespace burstbuffer @@ -58,12 +64,7 @@ struct FileDrainOperation size_t fromOffset, size_t toOffset, const void *data); }; -typedef std::map> InputFileMap; -typedef std::map> OutputFileMap; -typedef std::shared_ptr InputFile; -typedef std::shared_ptr OutputFile; - -class FileDrainer +class FileDrainer : public FileDrainerIO { public: FileDrainer() = default; @@ -111,41 +112,6 @@ class FileDrainer int m_Rank = 0; int m_Verbose = 0; static const int errorState = -1; - - /** instead for Open, use this function */ - InputFile GetFileForRead(const std::string &path); - OutputFile GetFileForWrite(const std::string &path, bool append = false); - - /** return true if the File is usable (no previous errors) */ - bool Good(InputFile &f); - bool Good(OutputFile &f); - - void CloseAll(); - - void Seek(InputFile &f, size_t offset, const std::string &path); - void Seek(OutputFile &f, size_t offset, const std::string &path); - void SeekEnd(OutputFile &f); - - /** Read from file. Return a pair of - * - number of bytes written - * - time spent in waiting for file to be actually written to disk for this - * read to succeed. - */ - std::pair Read(InputFile &f, size_t count, char *buffer, - const std::string &path); - size_t Write(OutputFile &f, size_t count, const char *buffer, - const std::string &path); - - void Delete(OutputFile &f, const std::string &path); - -private: - InputFileMap m_InputFileMap; - OutputFileMap m_OutputFileMap; - void Open(InputFile &f, const std::string &path); - void Close(InputFile &f); - void Open(OutputFile &f, const std::string &path, bool append); - void Close(OutputFile &f); - size_t GetFileSize(InputFile &f); }; } // end namespace burstbuffer diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp new file mode 100644 index 0000000000..2b3ddee02f --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.cpp @@ -0,0 +1,196 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainer.cpp + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#include "FileDrainerIOFstream.h" + +#include +#include +#include // std::memcpy +#include // std::this_thread::sleep_for + +/// \cond EXCLUDE_FROM_DOXYGEN +#include //std::ios_base::failure +/// \endcond + +namespace adios2 +{ +namespace burstbuffer +{ + +InputFile FileDrainerIO::GetFileForRead(const std::string &path) +{ + auto it = m_InputFileMap.find(path); + if (it != m_InputFileMap.end()) + { + return it->second; + } + else + { + InputFile f = std::make_shared(); + m_InputFileMap.emplace(path, f); + Open(f, path); + return f; + } +} + +OutputFile FileDrainerIO::GetFileForWrite(const std::string &path, bool append) +{ + auto it = m_OutputFileMap.find(path); + if (it != m_OutputFileMap.end()) + { + return it->second; + } + else + { + OutputFile f = std::make_shared(); + m_OutputFileMap.emplace(path, f); + Open(f, path, append); + return f; + } +} + +void FileDrainerIO::Open(InputFile &f, const std::string &path) +{ + + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::in | std::ios::binary); +} + +void FileDrainerIO::Open(OutputFile &f, const std::string &path, bool append) +{ + + if (append) + { + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::out | std::ios::app | std::ios::binary); + } + else + { + f->rdbuf()->pubsetbuf(0, 0); + f->open(path, std::ios::out | std::ios::trunc | std::ios::binary); + } +} + +void FileDrainerIO::Close(InputFile &f) { f->close(); } +void FileDrainerIO::Close(OutputFile &f) { f->close(); } + +bool FileDrainerIO::Good(InputFile &f) { return (f->good()); } +bool FileDrainerIO::Good(OutputFile &f) { return (f->good()); } + +void FileDrainerIO::CloseAll() +{ + for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_OutputFileMap.clear(); + for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_InputFileMap.clear(); +} + +void FileDrainerIO::Seek(InputFile &f, size_t offset, const std::string &path) +{ + f->seekg(offset, std::ios_base::beg); +} + +void FileDrainerIO::Seek(OutputFile &f, size_t offset, const std::string &path) +{ + f->seekp(offset, std::ios_base::beg); +} + +void FileDrainerIO::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); } + +size_t FileDrainerIO::GetFileSize(InputFile &f) +{ + const auto currentOffset = f->tellg(); + f->seekg(0, std::ios_base::end); + auto fileSize = f->tellg(); + f->seekg(currentOffset, std::ios_base::beg); + return static_cast(fileSize); +} + +std::pair FileDrainerIO::Read(InputFile &f, size_t count, + char *buffer, + const std::string &path) +{ + size_t totalRead = 0; + double totalSlept = 0.0; + const double sleepUnit = 0.01; // seconds + while (count > 0) + { + const auto currentOffset = f->tellg(); + f->read(buffer, static_cast(count)); + const auto readSize = f->gcount(); + + if (readSize < static_cast(count)) + { + if (f->eof()) + { + std::chrono::duration d(sleepUnit); + std::this_thread::sleep_for(d); + f->clear(f->rdstate() & ~std::fstream::eofbit); + totalSlept += sleepUnit; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't read from file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalRead + readSize) + ".\n"); + } + } + buffer += readSize; + count -= readSize; + totalRead += readSize; + } + return std::pair(totalRead, totalSlept); +} + +size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path) +{ + f->write(buffer, static_cast(count)); + + if (f->bad()) + { + throw std::ios_base::failure( + "FileDrainer couldn't write to file " + path + + " count = " + std::to_string(count) + " bytes\n"); + } + + return count; +} + +int FileDrainerIO::FileSync(OutputFile &f) { return 0; } + +void FileDrainerIO::Delete(OutputFile &f, const std::string &path) +{ + Close(f); + std::remove(path.c_str()); +} + +void FileDrainerIO::SetVerbose(int verboseLevel, int rank) +{ + m_Verbose = verboseLevel; + m_Rank = rank; +} + +} // end namespace burstbuffer +} // end namespace adios2 diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h new file mode 100644 index 0000000000..7239311913 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOFstream.h @@ -0,0 +1,94 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerFstream.h + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#ifndef ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ +#define ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "adios2/common/ADIOSTypes.h" + +namespace adios2 +{ +namespace burstbuffer +{ + +typedef std::shared_ptr InputFile; +typedef std::shared_ptr OutputFile; + +class FileDrainerIO +{ +public: + FileDrainerIO() = default; + + virtual ~FileDrainerIO() = default; + + /** turn on verbosity. set rank to differentiate between the output of + * processes */ + void SetVerbose(int verboseLevel, int rank); + +protected: + /** rank of process just for stdout/stderr messages */ + int m_Rank = 0; + int m_Verbose = 0; + static const int errorState = -1; + + /** instead for Open, use this function */ + InputFile GetFileForRead(const std::string &path); + OutputFile GetFileForWrite(const std::string &path, bool append = false); + + /** return true if the File is usable (no previous errors) */ + bool Good(InputFile &f); + bool Good(OutputFile &f); + + void CloseAll(); + + void Seek(InputFile &f, size_t offset, const std::string &path); + void Seek(OutputFile &f, size_t offset, const std::string &path); + void SeekEnd(OutputFile &f); + + /** Read from file. Return a pair of + * - number of bytes written + * - time spent in waiting for file to be actually written to disk for this + * read to succeed. + */ + std::pair Read(InputFile &f, size_t count, char *buffer, + const std::string &path); + size_t Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path); + + int FileSync(OutputFile &f); + + void Delete(OutputFile &f, const std::string &path); + +private: + typedef std::map InputFileMap; + typedef std::map OutputFileMap; + InputFileMap m_InputFileMap; + OutputFileMap m_OutputFileMap; + void Open(InputFile &f, const std::string &path); + void Close(InputFile &f); + void Open(OutputFile &f, const std::string &path, bool append); + void Close(OutputFile &f); + size_t GetFileSize(InputFile &f); +}; + +} // end namespace burstbuffer +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_FSTREAM_H_ */ diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp new file mode 100644 index 0000000000..c6c0e998c2 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.cpp @@ -0,0 +1,262 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerIOPosix.cpp + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#include "FileDrainerIOPosix.h" + +#ifndef _LARGEFILE64_SOURCE +#define _LARGEFILE64_SOURCE +#endif + +#include +#include // remove +#include // std::memcpy, strerror +#include // errno +#include // open +#include // write output +#include // open, fstat +#include // open +#include // std::this_thread::sleep_for +#include // write, close + +#if defined(__APPLE__) && defined(__MACH__) +#define lseek64 lseek +#define open64 open +#define off64_t off_t +#endif + +namespace adios2 +{ +namespace burstbuffer +{ + +InputFile FileDrainerIO::GetFileForRead(const std::string &path) +{ + auto it = m_InputFileMap.find(path); + if (it != m_InputFileMap.end()) + { + return it->second; + } + else + { + InputFile f = std::make_shared(); + m_InputFileMap.emplace(path, f); + Open(f, path); + return f; + } +} + +OutputFile FileDrainerIO::GetFileForWrite(const std::string &path, bool append) +{ + auto it = m_OutputFileMap.find(path); + if (it != m_OutputFileMap.end()) + { + return it->second; + } + else + { + OutputFile f = std::make_shared(); + m_OutputFileMap.emplace(path, f); + Open(f, path, append); + return f; + } +} + +std::string FileDrainerIO::SysErrMsg(const int errorNumber) const +{ + return std::string(": errno = " + std::to_string(errorNumber) + ": " + + strerror(errorNumber)); +} + +void FileDrainerIO::Open(InputFile &f, const std::string &path) +{ + errno = 0; + f->m_fd = open(path.c_str(), O_RDONLY); + f->m_Errno = errno; + f->m_Path = path; +} + +void FileDrainerIO::Open(OutputFile &f, const std::string &path, bool append) +{ + errno = 0; + if (append) + { + f->m_fd = open(path.c_str(), O_RDWR | O_CREAT, 0777); + lseek64(f->m_fd, 0, SEEK_END); + } + else + { + f->m_fd = open(path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666); + } + f->m_Errno = errno; + f->m_Path = path; +} + +void FileDrainerIO::Close(InputFile &f) +{ + errno = 0; + close(f->m_fd); + f->m_Errno = errno; +} +void FileDrainerIO::Close(OutputFile &f) +{ + errno = 0; + close(f->m_fd); + f->m_Errno = errno; +} + +bool FileDrainerIO::Good(InputFile &f) { return (f->m_Errno == 0); } +bool FileDrainerIO::Good(OutputFile &f) { return (f->m_Errno == 0); } + +void FileDrainerIO::CloseAll() +{ + for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_OutputFileMap.clear(); + for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it) + { + // if (it->second->good()) + //{ + Close(it->second); + //} + } + m_InputFileMap.clear(); +} + +void FileDrainerIO::Seek(InputFile &f, size_t offset, const std::string &path) +{ + lseek64(f->m_fd, static_cast(offset), SEEK_SET); +} + +void FileDrainerIO::Seek(OutputFile &f, size_t offset, const std::string &path) +{ + lseek64(f->m_fd, static_cast(offset), SEEK_SET); +} + +void FileDrainerIO::SeekEnd(OutputFile &f) { lseek64(f->m_fd, 0, SEEK_END); } + +size_t FileDrainerIO::GetFileSize(InputFile &f) +{ + struct stat fileStat; + errno = 0; + if (fstat(f->m_fd, &fileStat) == -1) + { + f->m_Errno = errno; + throw std::ios_base::failure("ERROR: couldn't get size of file " + + f->m_Path + SysErrMsg(errno)); + } + f->m_Errno = errno; + return static_cast(fileStat.st_size); +} + +std::pair FileDrainerIO::Read(InputFile &f, size_t count, + char *buffer, + const std::string &path) +{ + size_t totalRead = 0; + double totalSlept = 0.0; + const double sleepUnit = 0.01; // seconds + while (count > 0) + { + const auto currentOffset = lseek64(f->m_fd, 0, SEEK_CUR); + size_t readCount = + (count < DefaultMaxFileBatchSize ? count : DefaultMaxFileBatchSize); + errno = 0; + const auto readSize = read(f->m_fd, buffer, readCount); + if (readSize == -1) + { + if (errno == EINTR) + { + continue; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't read from file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalRead + readSize) + ".\n"); + } + } + else if (readSize < static_cast(count)) + { + // need to wait for more data to come + std::chrono::duration d(sleepUnit); + std::this_thread::sleep_for(d); + totalSlept += sleepUnit; + } + buffer += readSize; + count -= readSize; + totalRead += readSize; + } + return std::pair(totalRead, totalSlept); +} + +size_t FileDrainerIO::Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path) +{ + size_t totalWritten = 0; + while (count > 0) + { + const auto currentOffset = lseek64(f->m_fd, 0, SEEK_CUR); + size_t writeCount = + (count < DefaultMaxFileBatchSize ? count : DefaultMaxFileBatchSize); + errno = 0; + const auto writtenSize = write(f->m_fd, buffer, writeCount); + if (writtenSize == -1) + { + if (errno == EINTR) + { + continue; + } + else + { + throw std::ios_base::failure( + "FileDrainer couldn't write to file " + path + + " offset = " + std::to_string(currentOffset) + + " count = " + std::to_string(count) + " bytes but only " + + std::to_string(totalWritten + writtenSize) + ".\n"); + } + } + buffer += writtenSize; + count -= writtenSize; + totalWritten += writtenSize; + } + + return totalWritten; +} + +int FileDrainerIO::FileSync(OutputFile &f) +{ +#if (_POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500) + return fdatasync(f->m_fd); +#else + return fsync(f->m_fd); +#endif +} + +void FileDrainerIO::Delete(OutputFile &f, const std::string &path) +{ + Close(f); + std::remove(path.c_str()); +} + +void FileDrainerIO::SetVerbose(int verboseLevel, int rank) +{ + m_Verbose = verboseLevel; + m_Rank = rank; +} + +} // end namespace burstbuffer +} // end namespace adios2 diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h new file mode 100644 index 0000000000..a6215c4ca6 --- /dev/null +++ b/source/adios2/toolkit/burstbuffer/FileDrainerIOPosix.h @@ -0,0 +1,110 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * FileDrainerFstream.h + * + * Created on: April 1, 2020 + * Author: Norbert Podhorszki + */ + +#ifndef ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ +#define ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "adios2/common/ADIOSTypes.h" + +namespace adios2 +{ +namespace burstbuffer +{ + +class PosixInputFile +{ +public: + int m_fd; + int m_Errno; + std::string m_Path; +}; +class PosixOutputFile +{ +public: + int m_fd; + int m_Errno; + std::string m_Path; +}; + +using InputFile = std::shared_ptr; +using OutputFile = std::shared_ptr; + +class FileDrainerIO +{ +public: + FileDrainerIO() = default; + + virtual ~FileDrainerIO() = default; + + /** turn on verbosity. set rank to differentiate between the output of + * processes */ + void SetVerbose(int verboseLevel, int rank); + +protected: + /** rank of process just for stdout/stderr messages */ + int m_Rank = 0; + int m_Verbose = 0; + static const int errorState = -1; + + /** instead for Open, use this function */ + InputFile GetFileForRead(const std::string &path); + OutputFile GetFileForWrite(const std::string &path, bool append = false); + + /** return true if the File is usable (no previous errors) */ + bool Good(InputFile &f); + bool Good(OutputFile &f); + + void CloseAll(); + + void Seek(InputFile &f, size_t offset, const std::string &path); + void Seek(OutputFile &f, size_t offset, const std::string &path); + void SeekEnd(OutputFile &f); + + /** Read from file. Return a pair of + * - number of bytes written + * - time spent in waiting for file to be actually written to disk for this + * read to succeed. + */ + std::pair Read(InputFile &f, size_t count, char *buffer, + const std::string &path); + size_t Write(OutputFile &f, size_t count, const char *buffer, + const std::string &path); + + int FileSync(OutputFile &f); + + void Delete(OutputFile &f, const std::string &path); + +private: + using InputFileMap = std::map; + using OutputFileMap = std::map; + InputFileMap m_InputFileMap; + OutputFileMap m_OutputFileMap; + std::string SysErrMsg(const int errorNumber) const; + void Open(InputFile &f, const std::string &path); + void Close(InputFile &f); + void Open(OutputFile &f, const std::string &path, bool append); + void Close(OutputFile &f); + size_t GetFileSize(InputFile &f); +}; + +} // end namespace burstbuffer +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_BURSTBUFFER_FILEDRAINER_POSIX_H_ */ diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp index b2a37e6ca4..ee36aed6b6 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp +++ b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.cpp @@ -37,11 +37,13 @@ FileDrainerSingleThread::FileDrainerSingleThread() : FileDrainer() {} FileDrainerSingleThread::~FileDrainerSingleThread() { Join(); } -void FileDrainerSingleThread::SetBufferSize(size_t bufferSizeBytes) +void FileDrainerSingleThread::SetBufferSize(size_t bytes) { - bufferSize = bufferSizeBytes; + bufferSize = bytes; } +void FileDrainerSingleThread::SetFlushSize(size_t bytes) { flushSize = bytes; } + void FileDrainerSingleThread::Start() { th = std::thread(&FileDrainerSingleThread::DrainThread, this); @@ -208,15 +210,50 @@ void FileDrainerSingleThread::DrainThread() te = std::chrono::steady_clock::now(); timeWrite += te - ts; } + size_t nonflushed = 0; + std::chrono::duration sleeptime(0.001); const size_t batches = fdo.countBytes / bufferSize; - const size_t remainder = fdo.countBytes % bufferSize; + const size_t remainderBytes = fdo.countBytes % bufferSize; for (size_t b = 0; b < batches; ++b) { lf_Copy(fdo, fdr, fdw, bufferSize); + nonflushed += bufferSize; + if (nonflushed > flushSize) + { + + if (m_Verbose >= 2) + { +#ifndef NO_SANITIZE_THREAD + std::cout << "Sync " << m_Rank << ": sync " + << fdo.toFileName << " to disk " + << nonflushed << " bytes " + << std::endl; +#endif + } + + FileSync(fdw); + nonflushed = 0; + } + std::this_thread::sleep_for(sleeptime); } - if (remainder) + if (remainderBytes) { - lf_Copy(fdo, fdr, fdw, remainder); + lf_Copy(fdo, fdr, fdw, remainderBytes); + nonflushed += remainderBytes; + } + if (nonflushed > 0) + { + + if (m_Verbose >= 2) + { +#ifndef NO_SANITIZE_THREAD + std::cout << "Sync " << m_Rank << ": sync " + << fdo.toFileName << " to disk " + << nonflushed << " bytes " << std::endl; +#endif + } + FileSync(fdw); + std::this_thread::sleep_for(sleeptime); } } catch (std::ios_base::failure &e) diff --git a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h index dc5924cff7..bedff81659 100644 --- a/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h +++ b/source/adios2/toolkit/burstbuffer/FileDrainerSingleThread.h @@ -24,13 +24,17 @@ class FileDrainerSingleThread : public FileDrainer { public: - static const size_t defaultBufferSize = 4194304; // 4MB + // Size of drain buffer (read/write buffer owned by the thread) + static constexpr size_t defaultBufferSize = 4194304; // 4MB + // Sync to disk when draining reaches flush size + static constexpr size_t defaultFlushSize = 32 * defaultBufferSize; // 128MB FileDrainerSingleThread(); ~FileDrainerSingleThread(); - void SetBufferSize(size_t bufferSizeBytes); + void SetBufferSize(size_t bytes); + void SetFlushSize(size_t bytes); /** Create thread. * This will create a thread to continuously run and idle if there @@ -47,6 +51,7 @@ class FileDrainerSingleThread : public FileDrainer private: size_t bufferSize = defaultBufferSize; + size_t flushSize = defaultFlushSize; std::thread th; // created by constructor bool finish = false; std::mutex finishMutex;