diff --git a/docs/user_guide/source/engines/bp5.rst b/docs/user_guide/source/engines/bp5.rst index 411a745e55..0dfc8ce396 100644 --- a/docs/user_guide/source/engines/bp5.rst +++ b/docs/user_guide/source/engines/bp5.rst @@ -125,6 +125,8 @@ This engine allows the user to fine tune the buffering operations through the fo #. **StatsLevel**: 1 turns on *Min/Max* calculation for every variable, 0 turns this off. Default is 1. It has some cost to generate this metadata so it can be turned off if there is no need for this information. + #. **MaxOpenFilesAtOnce**: Specify how many subfiles a process can keep open at once. Default is unlimited. If a dataset contains more subfiles than how many open file descriptors the system allows (see *ulimit -n*) then one can either try to raise that system limit (set it with *ulimit -n*), or set this parameter to force the reader to close some subfiles to stay within the limits. + ============================== ===================== =========================================================== **Key** **Value Format** **Default** and Examples @@ -147,9 +149,10 @@ This engine allows the user to fine tune the buffering operations through the fo AsyncOpen string On/Off **On**, Off, true, false AsyncWrite string On/Off **Off**, On, true, false DirectIO string On/Off **Off**, On, true, false - DirectIOAlignOffset integer **512** - DirectIOAlignBuffer integer set to DirectIOAlignOffset if unset + DirectIOAlignOffset integer >= 0 **512** + DirectIOAlignBuffer integer >= 0 set to DirectIOAlignOffset if unset StatsLevel integer, 0 or 1 **1**, ``0`` + MaxOpenFilesAtOnce integer >= 0 **UINT_MAX**, 1024, 1 ============================== ===================== =========================================================== diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 20ed7a0110..99407e888c 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -159,7 +159,8 @@ class BP5Engine MACRO(ReaderShortCircuitReads, Bool, bool, false) \ MACRO(StatsLevel, UInt, unsigned int, 1) \ MACRO(StatsBlockSize, SizeBytes, size_t, DefaultStatsBlockSize) \ - MACRO(Threads, UInt, unsigned int, 1) + MACRO(Threads, UInt, unsigned int, 1) \ + MACRO(MaxOpenFilesAtOnce, UInt, unsigned int, UINT_MAX) struct BP5Params { diff --git a/source/adios2/engine/bp5/BP5Reader.cpp b/source/adios2/engine/bp5/BP5Reader.cpp index 1982115fed..d4d7eda3ea 100644 --- a/source/adios2/engine/bp5/BP5Reader.cpp +++ b/source/adios2/engine/bp5/BP5Reader.cpp @@ -9,6 +9,7 @@ #include "BP5Reader.h" #include "BP5Reader.tcc" +#include "adios2/helper/adiosMath.h" // SetWithinLimit #include #include @@ -186,10 +187,11 @@ void BP5Reader::EndStep() PerformGets(); } -void BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, - const size_t WriterRank, const size_t Timestep, - const size_t StartOffset, const size_t Length, - char *Destination) +std::pair +BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, + const size_t maxOpenFiles, const size_t WriterRank, + const size_t Timestep, const size_t StartOffset, + const size_t Length, char *Destination) { size_t FlushCount = m_MetadataIndexTable[Timestep][2]; size_t DataPosPos = m_MetadataIndexTable[Timestep][3]; @@ -197,17 +199,23 @@ void BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, m_WriterMap[m_WriterMapIndex[Timestep]].RankToSubfile[WriterRank]); // check if subfile is already opened + TP startSubfile = NOW(); if (FileManager.m_Transports.count(SubfileNum) == 0) { - if (FileManager.m_Transports.count(SubfileNum) == 0) + const std::string subFileName = GetBPSubStreamName( + m_Name, SubfileNum, m_Minifooter.HasSubFiles, true); + if (FileManager.m_Transports.size() >= maxOpenFiles) { - const std::string subFileName = GetBPSubStreamName( - m_Name, SubfileNum, m_Minifooter.HasSubFiles, true); - FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, - {{"transport", "File"}}, false); + auto m = FileManager.m_Transports.begin(); + FileManager.CloseFiles((int)m->first); } + FileManager.OpenFileID(subFileName, SubfileNum, Mode::Read, + {{"transport", "File"}}, false); } + TP endSubfile = NOW(); + double timeSubfile = DURATION(startSubfile, endSubfile); + TP startRead = NOW(); size_t InfoStartPos = DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t)); size_t ThisFlushInfo = InfoStartPos; @@ -231,12 +239,21 @@ void BP5Reader::ReadData(adios2::transportman::TransportMan &FileManager, RemainingLength -= ThisDataSize; Offset = 0; if (RemainingLength == 0) - return; + { + break; + } + } + if (RemainingLength > 0) + { + ThisDataPos = + helper::ReadValue(m_MetadataIndex.m_Buffer, ThisFlushInfo, + m_Minifooter.IsLittleEndian); + FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset, + SubfileNum); } - ThisDataPos = helper::ReadValue( - m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian); - FileManager.ReadFile(Destination, RemainingLength, ThisDataPos + Offset, - SubfileNum); + TP endRead = NOW(); + double timeRead = DURATION(startRead, endRead); + return std::make_pair(timeSubfile, timeRead); } void BP5Reader::PerformGets() @@ -250,25 +267,66 @@ void BP5Reader::PerformGets() .RankToSubfile[r2.WriterRank]); }; - auto lf_Reader = - [&](adios2::transportman::TransportMan FileManager, - std::vector - ReadRequests, - size_t startReq, size_t nReq) -> bool { - for (size_t r = startReq; r < startReq + nReq; ++r) + // TP start = NOW(); + PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets"); + size_t maxReadSize; + + // TP startGenerate = NOW(); + auto ReadRequests = + m_BP5Deserializer->GenerateReadRequests(false, &maxReadSize); + size_t nRequest = ReadRequests.size(); + // TP endGenerate = NOW(); + // double generateTime = DURATION(startGenerate, endGenerate); + + size_t nextRequest = 0; + std::mutex mutexReadRequests; + + auto lf_GetNextRequest = [&]() -> size_t { + std::lock_guard lockGuard(mutexReadRequests); + size_t reqidx = MaxSizeT; + if (nextRequest < nRequest) { - const auto &Req = ReadRequests[r]; - ReadData(FileManager, Req.WriterRank, Req.Timestep, Req.StartOffset, - Req.ReadLength, Req.DestinationAddr); - m_BP5Deserializer->FinalizeGet(Req); + reqidx = nextRequest; + ++nextRequest; } - return true; + return reqidx; }; - // TP start = NOW(); - PERFSTUBS_SCOPED_TIMER("BP5Reader::PerformGets"); - auto ReadRequests = m_BP5Deserializer->GenerateReadRequests(); - size_t nRequest = ReadRequests.size(); + auto lf_Reader = [&](adios2::transportman::TransportMan FileManager, + const size_t maxOpenFiles) + -> std::tuple { + double copyTotal = 0.0; + double readTotal = 0.0; + double subfileTotal = 0.0; + size_t nReads = 0; + std::vector buf(maxReadSize); + + while (true) + { + const auto reqidx = lf_GetNextRequest(); + if (reqidx > nRequest) + { + break; + } + auto &Req = ReadRequests[reqidx]; + if (!Req.DestinationAddr) + { + Req.DestinationAddr = buf.data(); + } + std::pair t = ReadData( + FileManager, maxOpenFiles, Req.WriterRank, Req.Timestep, + Req.StartOffset, Req.ReadLength, Req.DestinationAddr); + + TP startCopy = NOW(); + m_BP5Deserializer->FinalizeGet(Req, false); + TP endCopy = NOW(); + subfileTotal += t.first; + readTotal += t.second; + copyTotal += DURATION(startCopy, endCopy); + ++nReads; + } + return std::make_tuple(subfileTotal, readTotal, copyTotal, nReads); + }; // TP startRead = NOW(); // double sortTime = 0.0; @@ -281,10 +339,13 @@ void BP5Reader::PerformGets() // sortTime = DURATION(startSort, endSort); size_t nThreads = (m_Parameters.Threads < nRequest ? m_Parameters.Threads : nRequest); - size_t startReq = 0, n, rem; - n = nRequest / nThreads; - rem = nRequest % nThreads; - std::vector> futures(nThreads - 1); + + size_t maxOpenFiles = helper::SetWithinLimit( + (size_t)m_Parameters.MaxOpenFilesAtOnce / nThreads, (size_t)1, + MaxSizeT); + + std::vector>> + futures(nThreads - 1); helper::Comm singleComm; std::vector fileManagers( nThreads - 1, transportman::TransportMan(singleComm)); @@ -292,47 +353,70 @@ void BP5Reader::PerformGets() // then main thread process the last subset for (size_t tid = 0; tid < nThreads - 1; ++tid) { - size_t nReq = n; - if (tid < rem) - { - ++nReq; - } fileManagers[tid]; - futures[tid] = - std::async(std::launch::async, lf_Reader, fileManagers[tid], - ReadRequests, startReq, nReq); - startReq += nReq; + futures[tid] = std::async(std::launch::async, lf_Reader, + fileManagers[tid], maxOpenFiles); } // main thread runs last subset of reads - lf_Reader(m_DataFileManager, ReadRequests, startReq, - nRequest - startReq); + /*auto tMain = */ lf_Reader(m_DataFileManager, maxOpenFiles); + /*{ + double tSubfile = std::get<0>(tMain); + double tRead = std::get<1>(tMain); + double tCopy = std::get<2>(tMain); + size_t nReads = std::get<3>(tMain); + std::cout << " -> PerformGets() thread MAIN total = " + << tSubfile + tRead + tCopy << "s, subfile = " << tSubfile + << "s, read = " << tRead << "s, copy = " << tCopy + << ", nReads = " << nReads << std::endl; + }*/ // wait for all async threads + int tid = 1; for (auto &f : futures) { - f.get(); - } - // clear pending requests inside deserializer - { - std::vector empty; - m_BP5Deserializer->FinalizeGets(empty); + /*auto t = */ f.get(); + /*double tSubfile = std::get<0>(t); + double tRead = std::get<1>(t); + double tCopy = std::get<2>(t); + size_t nReads = std::get<3>(t); + std::cout << " -> PerformGets() thread " << tid + << " total = " << tSubfile + tRead + tCopy + << "s, subfile = " << tSubfile << "s, read = " << tRead + << "s, copy = " << tCopy << ", nReads = " << nReads + << std::endl;*/ + ++tid; } } else { - for (const auto &Req : ReadRequests) + size_t maxOpenFiles = helper::SetWithinLimit( + (size_t)m_Parameters.MaxOpenFilesAtOnce, (size_t)1, MaxSizeT); + std::vector buf(maxReadSize); + for (auto &Req : ReadRequests) { - ReadData(m_DataFileManager, Req.WriterRank, Req.Timestep, - Req.StartOffset, Req.ReadLength, Req.DestinationAddr); + if (!Req.DestinationAddr) + { + Req.DestinationAddr = buf.data(); + } + ReadData(m_DataFileManager, maxOpenFiles, Req.WriterRank, + Req.Timestep, Req.StartOffset, Req.ReadLength, + Req.DestinationAddr); + m_BP5Deserializer->FinalizeGet(Req, false); } - m_BP5Deserializer->FinalizeGets(ReadRequests); + } + + // clear pending requests inside deserializer + { + std::vector empty; + m_BP5Deserializer->FinalizeGets(empty); } /*TP end = NOW(); double t1 = DURATION(start, end); double t2 = DURATION(startRead, end); std::cout << " -> PerformGets() total = " << t1 << "s, Read loop = " << t2 - << "s, sort = " << sortTime << "s" << std::endl;*/ + << "s, sort = " << sortTime << "s, generate = " << generateTime + << ", nRequests = " << nRequest << std::endl;*/ } // PRIVATE diff --git a/source/adios2/engine/bp5/BP5Reader.h b/source/adios2/engine/bp5/BP5Reader.h index 8d9cebf477..f4fbcd1685 100644 --- a/source/adios2/engine/bp5/BP5Reader.h +++ b/source/adios2/engine/bp5/BP5Reader.h @@ -237,10 +237,11 @@ class BP5Reader : public BP5Engine, public Engine void InstallMetaMetaData(format::BufferSTL MetaMetadata); void InstallMetadataForTimestep(size_t Step); - void ReadData(adios2::transportman::TransportMan &FileManager, - const size_t WriterRank, const size_t Timestep, - const size_t StartOffset, const size_t Length, - char *Destination); + std::pair + ReadData(adios2::transportman::TransportMan &FileManager, + const size_t maxOpenFiles, const size_t WriterRank, + const size_t Timestep, const size_t StartOffset, + const size_t Length, char *Destination); struct WriterMapStruct { diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp index 4abf2cc1f7..f34dcb2ca1 100644 --- a/source/adios2/engine/sst/SstReader.cpp +++ b/source/adios2/engine/sst/SstReader.cpp @@ -640,7 +640,9 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_gets) void SstReader::BP5PerformGets() { - auto ReadRequests = m_BP5Deserializer->GenerateReadRequests(); + size_t maxReadSize; + auto ReadRequests = + m_BP5Deserializer->GenerateReadRequests(true, &maxReadSize); std::vector sstReadHandlers; for (const auto &Req : ReadRequests) { diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp index fd5ba4c2e9..3ff891593f 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.cpp @@ -1083,9 +1083,11 @@ static size_t CalcBlockLength(const size_t dimensionsSize, const size_t *count) } std::vector -BP5Deserializer::GenerateReadRequests() +BP5Deserializer::GenerateReadRequests(const bool doAllocTempBuffers, + size_t *maxReadSize) { std::vector Ret; + *maxReadSize = 0; for (size_t ReqIndex = 0; ReqIndex < PendingRequests.size(); ReqIndex++) { @@ -1122,7 +1124,14 @@ BP5Deserializer::GenerateReadRequests() helper::GetDataTypeSize(Req->VarRec->Type) * CalcBlockLength(Req->VarRec->DimCount, &writer_meta_base->Count[StartDim]); - RR.DestinationAddr = (char *)malloc(RR.ReadLength); + RR.DestinationAddr = nullptr; + if (doAllocTempBuffers) + { + RR.DestinationAddr = (char *)malloc(RR.ReadLength); + } + *maxReadSize = + (*maxReadSize < RR.ReadLength ? RR.ReadLength + : *maxReadSize); RR.Internal = NULL; RR.OffsetInBlock = 0; RR.ReqIndex = ReqIndex; @@ -1170,7 +1179,15 @@ BP5Deserializer::GenerateReadRequests() writer_meta_base->DataLocation[Block]; RR.ReadLength = writer_meta_base->DataLengths[Block]; - RR.DestinationAddr = (char *)malloc(RR.ReadLength); + RR.DestinationAddr = nullptr; + if (doAllocTempBuffers) + { + RR.DestinationAddr = + (char *)malloc(RR.ReadLength); + } + *maxReadSize = + (*maxReadSize < RR.ReadLength ? RR.ReadLength + : *maxReadSize); RR.Internal = NULL; RR.ReqIndex = ReqIndex; RR.BlockID = Block; @@ -1212,7 +1229,15 @@ BP5Deserializer::GenerateReadRequests() StartOffsetInBlock; RR.ReadLength = EndOffsetInBlock - StartOffsetInBlock; - RR.DestinationAddr = (char *)malloc(RR.ReadLength); + RR.DestinationAddr = nullptr; + if (doAllocTempBuffers) + { + RR.DestinationAddr = + (char *)malloc(RR.ReadLength); + } + *maxReadSize = + (*maxReadSize < RR.ReadLength ? RR.ReadLength + : *maxReadSize); RR.Internal = NULL; RR.OffsetInBlock = StartOffsetInBlock; RR.ReqIndex = ReqIndex; @@ -1227,7 +1252,7 @@ BP5Deserializer::GenerateReadRequests() return Ret; } -void BP5Deserializer::FinalizeGet(const ReadRequest &Read) +void BP5Deserializer::FinalizeGet(const ReadRequest &Read, const bool freeAddr) { auto Req = PendingRequests[Read.ReqIndex]; /*std::cout << " Req: block = " << Req.BlockID << " step = " << Req.Step @@ -1309,14 +1334,17 @@ void BP5Deserializer::FinalizeGet(const ReadRequest &Read) (char *)Req.Data, outStart, outCount, true, true, ElementSize, Dims(), Dims(), Dims(), Dims(), false, Req.MemSpace); - free((char *)Read.DestinationAddr); + if (freeAddr) + { + free((char *)Read.DestinationAddr); + } } void BP5Deserializer::FinalizeGets(std::vector &Reads) { for (const auto &Read : Reads) { - FinalizeGet(Read); + FinalizeGet(Read, true); } PendingRequests.clear(); } diff --git a/source/adios2/toolkit/format/bp5/BP5Deserializer.h b/source/adios2/toolkit/format/bp5/BP5Deserializer.h index aed9ce1218..d005d05706 100644 --- a/source/adios2/toolkit/format/bp5/BP5Deserializer.h +++ b/source/adios2/toolkit/format/bp5/BP5Deserializer.h @@ -61,8 +61,17 @@ class BP5Deserializer : virtual public BP5Base bool QueueGetSingle(core::VariableBase &variable, void *DestData, size_t Step); - std::vector GenerateReadRequests(); - void FinalizeGet(const ReadRequest &); + /* generate read requests. return vector of requests AND the size of + * the largest allocation block necessary for reading. + * input flag: true allocates a temporary buffer for each read request + * unless the request can go directly to user memory. + * False will not allocate a temporary buffer + * (RR.DestinationAddress==nullptr) but may also assign the user memory + * pointer for direct read in + */ + std::vector GenerateReadRequests(const bool doAllocTempBuffers, + size_t *maxReadSize); + void FinalizeGet(const ReadRequest &, const bool freeAddr); void FinalizeGets(std::vector &); MinVarInfo *AllRelativeStepsMinBlocksInfo(const VariableBase &var); diff --git a/source/utils/adios_reorganize/Reorganize.h b/source/utils/adios_reorganize/Reorganize.h index 3d46b84427..e8ce0e1f4d 100644 --- a/source/utils/adios_reorganize/Reorganize.h +++ b/source/utils/adios_reorganize/Reorganize.h @@ -75,8 +75,8 @@ class Reorganize : public Utils std::string rmethodname; // ADIOS read method std::string rmethodparam_str; // ADIOS read method parameter string - static const size_t max_read_buffer_size = 1024 * 1024 * 1024; - static const size_t max_write_buffer_size = 1024 * 1024 * 1024; + static const size_t max_read_buffer_size = 16 * 1024 * 1024 * 1024ULL; + static const size_t max_write_buffer_size = 16 * 1024 * 1024 * 1024ULL; // will stop if no data found for this time (-1: never stop) static const int timeout_sec = 300;