Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filedrain posix #2547

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ target_compile_features(adios2_core PUBLIC "$<BUILD_INTERFACE:${ADIOS2_CXX11_FEA

if(UNIX)
target_sources(adios2_core PRIVATE toolkit/transport/file/FilePOSIX.cpp)
target_sources(adios2_core PRIVATE toolkit/burstbuffer/FileDrainerIOPosix.cpp)
else()
target_sources(adios2_core PRIVATE toolkit/burstbuffer/FileDrainerIOFstream.cpp)
endif()

if(ADIOS2_HAVE_MPI)
Expand Down
160 changes: 0 additions & 160 deletions source/adios2/toolkit/burstbuffer/FileDrainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,166 +120,6 @@ void FileDrainer::AddOperationDelete(const std::string &toFileName)
AddOperation(DrainOperation::Delete, emptyStr, toFileName, 0, 0, 0);
}

InputFile FileDrainer::GetFileForRead(const std::string &path)
{
auto it = m_InputFileMap.find(path);
if (it != m_InputFileMap.end())
{
return it->second;
}
else
{
InputFile f = std::make_shared<std::ifstream>();
m_InputFileMap.emplace(path, f);
Open(f, path);
return f;
}
}

OutputFile FileDrainer::GetFileForWrite(const std::string &path, bool append)
{
auto it = m_OutputFileMap.find(path);
if (it != m_OutputFileMap.end())
{
return it->second;
}
else
{
OutputFile f = std::make_shared<std::ofstream>();
m_OutputFileMap.emplace(path, f);
Open(f, path, append);
return f;
}
}

void FileDrainer::Open(InputFile &f, const std::string &path)
{

f->rdbuf()->pubsetbuf(0, 0);
f->open(path, std::ios::in | std::ios::binary);
}

void FileDrainer::Open(OutputFile &f, const std::string &path, bool append)
{

if (append)
{
f->rdbuf()->pubsetbuf(0, 0);
f->open(path, std::ios::out | std::ios::app | std::ios::binary);
}
else
{
f->rdbuf()->pubsetbuf(0, 0);
f->open(path, std::ios::out | std::ios::trunc | std::ios::binary);
}
}

void FileDrainer::Close(InputFile &f) { f->close(); }
void FileDrainer::Close(OutputFile &f) { f->close(); }

bool FileDrainer::Good(InputFile &f) { return (f->good()); }
bool FileDrainer::Good(OutputFile &f) { return (f->good()); }

void FileDrainer::CloseAll()
{
for (auto it = m_OutputFileMap.begin(); it != m_OutputFileMap.end(); ++it)
{
// if (it->second->good())
//{
Close(it->second);
//}
}
m_OutputFileMap.clear();
for (auto it = m_InputFileMap.begin(); it != m_InputFileMap.end(); ++it)
{
// if (it->second->good())
//{
Close(it->second);
//}
}
m_InputFileMap.clear();
}

void FileDrainer::Seek(InputFile &f, size_t offset, const std::string &path)
{
f->seekg(offset, std::ios_base::beg);
}

void FileDrainer::Seek(OutputFile &f, size_t offset, const std::string &path)
{
f->seekp(offset, std::ios_base::beg);
}

void FileDrainer::SeekEnd(OutputFile &f) { f->seekp(0, std::ios_base::end); }

size_t FileDrainer::GetFileSize(InputFile &f)
{
const auto currentOffset = f->tellg();
f->seekg(0, std::ios_base::end);
auto fileSize = f->tellg();
f->seekg(currentOffset, std::ios_base::beg);
return static_cast<size_t>(fileSize);
}

std::pair<size_t, double> FileDrainer::Read(InputFile &f, size_t count,
char *buffer,
const std::string &path)
{
size_t totalRead = 0;
double totalSlept = 0.0;
const double sleepUnit = 0.01; // seconds
while (count > 0)
{
const auto currentOffset = f->tellg();
f->read(buffer, static_cast<std::streamsize>(count));
const auto readSize = f->gcount();

if (readSize < static_cast<std::streamsize>(count))
{
if (f->eof())
{
std::chrono::duration<double> d(sleepUnit);
std::this_thread::sleep_for(d);
f->clear(f->rdstate() & ~std::fstream::eofbit);
totalSlept += sleepUnit;
}
else
{
throw std::ios_base::failure(
"FileDrainer couldn't read from file " + path +
" offset = " + std::to_string(currentOffset) +
" count = " + std::to_string(count) + " bytes but only " +
std::to_string(totalRead + readSize) + ".\n");
}
}
buffer += readSize;
count -= readSize;
totalRead += readSize;
}
return std::pair<size_t, double>(totalRead, totalSlept);
}

size_t FileDrainer::Write(OutputFile &f, size_t count, const char *buffer,
const std::string &path)
{
f->write(buffer, static_cast<std::streamsize>(count));

if (f->bad())
{
throw std::ios_base::failure(
"FileDrainer couldn't write to file " + path +
" count = " + std::to_string(count) + " bytes\n");
}

return count;
}

void FileDrainer::Delete(OutputFile &f, const std::string &path)
{
Close(f);
std::remove(path.c_str());
}

void FileDrainer::SetVerbose(int verboseLevel, int rank)
{
m_Verbose = verboseLevel;
Expand Down
48 changes: 7 additions & 41 deletions source/adios2/toolkit/burstbuffer/FileDrainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

#include "adios2/common/ADIOSTypes.h"

#ifdef _WIN32
#include "FileDrainerIOFstream.h"
#else
#include "FileDrainerIOPosix.h"
#endif

namespace adios2
{
namespace burstbuffer
Expand Down Expand Up @@ -58,12 +64,7 @@ struct FileDrainOperation
size_t fromOffset, size_t toOffset, const void *data);
};

typedef std::map<std::string, std::shared_ptr<std::ifstream>> InputFileMap;
typedef std::map<std::string, std::shared_ptr<std::ofstream>> OutputFileMap;
typedef std::shared_ptr<std::ifstream> InputFile;
typedef std::shared_ptr<std::ofstream> OutputFile;

class FileDrainer
class FileDrainer : public FileDrainerIO
{
public:
FileDrainer() = default;
Expand Down Expand Up @@ -111,41 +112,6 @@ class FileDrainer
int m_Rank = 0;
int m_Verbose = 0;
static const int errorState = -1;

/** instead for Open, use this function */
InputFile GetFileForRead(const std::string &path);
OutputFile GetFileForWrite(const std::string &path, bool append = false);

/** return true if the File is usable (no previous errors) */
bool Good(InputFile &f);
bool Good(OutputFile &f);

void CloseAll();

void Seek(InputFile &f, size_t offset, const std::string &path);
void Seek(OutputFile &f, size_t offset, const std::string &path);
void SeekEnd(OutputFile &f);

/** Read from file. Return a pair of
* - number of bytes written
* - time spent in waiting for file to be actually written to disk for this
* read to succeed.
*/
std::pair<size_t, double> Read(InputFile &f, size_t count, char *buffer,
const std::string &path);
size_t Write(OutputFile &f, size_t count, const char *buffer,
const std::string &path);

void Delete(OutputFile &f, const std::string &path);

private:
InputFileMap m_InputFileMap;
OutputFileMap m_OutputFileMap;
void Open(InputFile &f, const std::string &path);
void Close(InputFile &f);
void Open(OutputFile &f, const std::string &path, bool append);
void Close(OutputFile &f);
size_t GetFileSize(InputFile &f);
};

} // end namespace burstbuffer
Expand Down
Loading