Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bp5 multithreaded read, dynamic version #3233

Merged
merged 3 commits into from
May 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docs/user_guide/source/engines/bp5.rst
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ This engine allows the user to fine tune the buffering operations through the fo

#. **StatsLevel**: 1 turns on *Min/Max* calculation for every variable, 0 turns this off. Default is 1. It has some cost to generate this metadata so it can be turned off if there is no need for this information.

#. **MaxOpenFilesAtOnce**: Specify how many subfiles a process can keep open at once. Default is unlimited. If a dataset contains more subfiles than how many open file descriptors the system allows (see *ulimit -n*) then one can either try to raise that system limit (set it with *ulimit -n*), or set this parameter to force the reader to close some subfiles to stay within the limits.


============================== ===================== ===========================================================
**Key** **Value Format** **Default** and Examples
Expand All @@ -147,9 +149,10 @@ This engine allows the user to fine tune the buffering operations through the fo
AsyncOpen string On/Off **On**, Off, true, false
AsyncWrite string On/Off **Off**, On, true, false
DirectIO string On/Off **Off**, On, true, false
DirectIOAlignOffset integer **512**
DirectIOAlignBuffer integer set to DirectIOAlignOffset if unset
DirectIOAlignOffset integer >= 0 **512**
DirectIOAlignBuffer integer >= 0 set to DirectIOAlignOffset if unset
StatsLevel integer, 0 or 1 **1**, ``0``
MaxOpenFilesAtOnce integer >= 0 **UINT_MAX**, 1024, 1
============================== ===================== ===========================================================


Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ class BP5Engine
MACRO(ReaderShortCircuitReads, Bool, bool, false) \
MACRO(StatsLevel, UInt, unsigned int, 1) \
MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \
MACRO(Threads, UInt, unsigned int, 1)
MACRO(Threads, UInt, unsigned int, 1) \
MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX)

struct BP5Params
{
Expand Down
194 changes: 139 additions & 55 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "BP5Reader.h"
#include "BP5Reader.tcc"

#include "adios2/helper/adiosMath.h" // SetWithinLimit
#include <adios2-perfstubs-interface.h>

#include <chrono>
Expand Down Expand Up @@ -186,28 +187,35 @@ void BP5Reader::EndStep()
PerformGets();
}

void BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination)
std::pair<double, double>
BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
const size_t maxOpenFiles, const size_t WriterRank,
const size_t Timestep, const size_t StartOffset,
const size_t Length, char *Destination)
{
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = static_cast<size_t>(
m_WriterMap[m_WriterMapIndex[Timestep]].RankToSubfile[WriterRank]);

// check if subfile is already opened
TP startSubfile = NOW();
if (FileManager.m_Transports.count(SubfileNum) == 0)
{
if (FileManager.m_Transports.count(SubfileNum) == 0)
const std::string subFileName = GetBPSubStreamName(
m_Name, SubfileNum, m_Minifooter.HasSubFiles, true);
if (FileManager.m_Transports.size() >= maxOpenFiles)
{
const std::string subFileName = GetBPSubStreamName(
m_Name, SubfileNum, m_Minifooter.HasSubFiles, true);
FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
auto m = FileManager.m_Transports.begin();
FileManager.CloseFiles((int)m->first);
}
FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
}
TP endSubfile = NOW();
double timeSubfile = DURATION(startSubfile, endSubfile);

TP startRead = NOW();
size_t InfoStartPos =
DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
size_t ThisFlushInfo = InfoStartPos;
Expand All @@ -231,12 +239,21 @@ void BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager,
RemainingLength -= ThisDataSize;
Offset = 0;
if (RemainingLength == 0)
return;
{
break;
}
}
if (RemainingLength > 0)
{
ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset,
SubfileNum);
}
ThisDataPos = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian);
FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset,
SubfileNum);
TP endRead = NOW();
double timeRead = DURATION(startRead, endRead);
return std::make_pair(timeSubfile, timeRead);
}

void BP5Reader::PerformGets()
Expand All @@ -250,25 +267,66 @@ void BP5Reader::PerformGets()
.RankToSubfile[r2.WriterRank]);
};

auto lf_Reader =
[&](adios2::transportman::TransportMan FileManager,
std::vector<adios2::format::BP5Deserializer::ReadRequest>
ReadRequests,
size_t startReq, size_t nReq) -> bool {
for (size_t r = startReq; r < startReq + nReq; ++r)
// TP start = NOW();
PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets");
size_t maxReadSize;

// TP startGenerate = NOW();
auto ReadRequests =
m_BP5Deserializer->GenerateReadRequests(false, &maxReadSize);
size_t nRequest = ReadRequests.size();
// TP endGenerate = NOW();
// double generateTime = DURATION(startGenerate, endGenerate);

size_t nextRequest = 0;
std::mutex mutexReadRequests;

auto lf_GetNextRequest = [&]() -> size_t {
std::lock_guard<std::mutex> lockGuard(mutexReadRequests);
size_t reqidx = MaxSizeT;
if (nextRequest < nRequest)
{
const auto &Req = ReadRequests[r];
ReadData(FileManager, Req.WriterRank, Req.Timestep, Req.StartOffset,
Req.ReadLength, Req.DestinationAddr);
m_BP5Deserializer->FinalizeGet(Req);
reqidx = nextRequest;
++nextRequest;
}
return true;
return reqidx;
};

// TP start = NOW();
PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets");
auto ReadRequests = m_BP5Deserializer->GenerateReadRequests();
size_t nRequest = ReadRequests.size();
auto lf_Reader = [&](adios2::transportman::TransportMan FileManager,
const size_t maxOpenFiles)
-> std::tuple<double, double, double, size_t> {
double copyTotal = 0.0;
double readTotal = 0.0;
double subfileTotal = 0.0;
size_t nReads = 0;
std::vector<char> buf(maxReadSize);

while (true)
{
const auto reqidx = lf_GetNextRequest();
if (reqidx > nRequest)
{
break;
}
auto &Req = ReadRequests[reqidx];
if (!Req.DestinationAddr)
{
Req.DestinationAddr = buf.data();
}
std::pair<double, double> t = ReadData(
FileManager, maxOpenFiles, Req.WriterRank, Req.Timestep,
Req.StartOffset, Req.ReadLength, Req.DestinationAddr);

TP startCopy = NOW();
m_BP5Deserializer->FinalizeGet(Req, false);
TP endCopy = NOW();
subfileTotal += t.first;
readTotal += t.second;
copyTotal += DURATION(startCopy, endCopy);
++nReads;
}
return std::make_tuple(subfileTotal, readTotal, copyTotal, nReads);
};

// TP startRead = NOW();
// double sortTime = 0.0;
Expand All @@ -281,58 +339,84 @@ void BP5Reader::PerformGets()
// sortTime = DURATION(startSort, endSort);
size_t nThreads =
(m_Parameters.Threads < nRequest ? m_Parameters.Threads : nRequest);
size_t startReq = 0, n, rem;
n = nRequest / nThreads;
rem = nRequest % nThreads;
std::vector<std::future<bool>> futures(nThreads - 1);

size_t maxOpenFiles = helper::SetWithinLimit(
(size_t)m_Parameters.MaxOpenFilesAtOnce / nThreads, (size_t)1,
MaxSizeT);

std::vector<std::future<std::tuple<double, double, double, size_t>>>
futures(nThreads - 1);
helper::Comm singleComm;
std::vector<transportman::TransportMan> fileManagers(
nThreads - 1, transportman::TransportMan(singleComm));
// launch Threads-1 threads to process subsets of requests,
// then main thread process the last subset
for (size_t tid = 0; tid < nThreads - 1; ++tid)
{
size_t nReq = n;
if (tid < rem)
{
++nReq;
}
fileManagers[tid];
futures[tid] =
std::async(std::launch::async, lf_Reader, fileManagers[tid],
ReadRequests, startReq, nReq);
startReq += nReq;
futures[tid] = std::async(std::launch::async, lf_Reader,
fileManagers[tid], maxOpenFiles);
}
// main thread runs last subset of reads
lf_Reader(m_DataFileManager, ReadRequests, startReq,
nRequest - startReq);
/*auto tMain = */ lf_Reader(m_DataFileManager, maxOpenFiles);
/*{
double tSubfile = std::get<0>(tMain);
double tRead = std::get<1>(tMain);
double tCopy = std::get<2>(tMain);
size_t nReads = std::get<3>(tMain);
std::cout << " -> PerformGets() thread MAIN total = "
<< tSubfile + tRead + tCopy << "s, subfile = " << tSubfile
<< "s, read = " << tRead << "s, copy = " << tCopy
<< ", nReads = " << nReads << std::endl;
}*/

// wait for all async threads
int tid = 1;
for (auto &f : futures)
{
f.get();
}
// clear pending requests inside deserializer
{
std::vector<adios2::format::BP5Deserializer::ReadRequest> empty;
m_BP5Deserializer->FinalizeGets(empty);
/*auto t = */ f.get();
/*double tSubfile = std::get<0>(t);
double tRead = std::get<1>(t);
double tCopy = std::get<2>(t);
size_t nReads = std::get<3>(t);
std::cout << " -> PerformGets() thread " << tid
<< " total = " << tSubfile + tRead + tCopy
<< "s, subfile = " << tSubfile << "s, read = " << tRead
<< "s, copy = " << tCopy << ", nReads = " << nReads
<< std::endl;*/
++tid;
}
}
else
{
for (const auto &Req : ReadRequests)
size_t maxOpenFiles = helper::SetWithinLimit(
(size_t)m_Parameters.MaxOpenFilesAtOnce, (size_t)1, MaxSizeT);
std::vector<char> buf(maxReadSize);
for (auto &Req : ReadRequests)
{
ReadData(m_DataFileManager, Req.WriterRank, Req.Timestep,
Req.StartOffset, Req.ReadLength, Req.DestinationAddr);
if (!Req.DestinationAddr)
{
Req.DestinationAddr = buf.data();
}
ReadData(m_DataFileManager, maxOpenFiles, Req.WriterRank,
Req.Timestep, Req.StartOffset, Req.ReadLength,
Req.DestinationAddr);
m_BP5Deserializer->FinalizeGet(Req, false);
}
m_BP5Deserializer->FinalizeGets(ReadRequests);
}

// clear pending requests inside deserializer
{
std::vector<adios2::format::BP5Deserializer::ReadRequest> empty;
m_BP5Deserializer->FinalizeGets(empty);
}

/*TP end = NOW();
double t1 = DURATION(start, end);
double t2 = DURATION(startRead, end);
std::cout << " -> PerformGets() total = " << t1 << "s, Read loop = " << t2
<< "s, sort = " << sortTime << "s" << std::endl;*/
<< "s, sort = " << sortTime << "s, generate = " << generateTime
<< ", nRequests = " << nRequest << std::endl;*/
}

// PRIVATE
Expand Down
9 changes: 5 additions & 4 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,11 @@ class BP5Reader : public BP5Engine, public Engine

void InstallMetaMetaData(format::BufferSTL MetaMetadata);
void InstallMetadataForTimestep(size_t Step);
void ReadData(adios2::transportman::TransportMan &FileManager,
const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination);
std::pair<double, double>
ReadData(adios2::transportman::TransportMan &FileManager,
const size_t maxOpenFiles, const size_t WriterRank,
const size_t Timestep, const size_t StartOffset,
const size_t Length, char *Destination);

struct WriterMapStruct
{
Expand Down
4 changes: 3 additions & 1 deletion source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,9 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_gets)

void SstReader::BP5PerformGets()
{
auto ReadRequests = m_BP5Deserializer->GenerateReadRequests();
size_t maxReadSize;
auto ReadRequests =
m_BP5Deserializer->GenerateReadRequests(true, &maxReadSize);
std::vector<void *> sstReadHandlers;
for (const auto &Req : ReadRequests)
{
Expand Down
Loading