diff --git a/examples/basics/globalArray/globalArray_write.cpp b/examples/basics/globalArray/globalArray_write.cpp index 2b88a04960..b53d01d9ae 100644 --- a/examples/basics/globalArray/globalArray_write.cpp +++ b/examples/basics/globalArray/globalArray_write.cpp @@ -63,6 +63,8 @@ int main(int argc, char *argv[]) // Get io settings from the config file or // create one with default settings here adios2::IO io = adios.DeclareIO("Output"); + io.SetEngine("BP5"); + io.SetParameter("NumAggregators", "1"); /* * Define global array: type, name, global dimensions diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 5168f47fc3..ac036822c9 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -99,6 +99,7 @@ add_library(adios2_core toolkit/aggregator/mpi/MPIAggregator.cpp toolkit/aggregator/mpi/MPIChain.cpp + toolkit/aggregator/mpi/MPIShmChain.cpp toolkit/burstbuffer/FileDrainer.cpp toolkit/burstbuffer/FileDrainerSingleThread.cpp @@ -124,7 +125,7 @@ if (ADIOS2_HAVE_BP5) target_sources(adios2_core PRIVATE engine/bp5/BP5Engine.cpp engine/bp5/BP5Reader.cpp engine/bp5/BP5Reader.tcc - engine/bp5/BP5Writer.cpp engine/bp5/BP5Writer.tcc + engine/bp5/BP5Writer.cpp engine/bp5/BP5Writer.tcc engine/bp5/BP5Writer_TwoLevelShm.cpp ) endif() diff --git a/source/adios2/engine/bp3/BP3Writer.cpp b/source/adios2/engine/bp3/BP3Writer.cpp index bf2aa9afc7..5e853716ba 100644 --- a/source/adios2/engine/bp3/BP3Writer.cpp +++ b/source/adios2/engine/bp3/BP3Writer.cpp @@ -123,6 +123,7 @@ void BP3Writer::Init() static_cast(m_BP3Serializer.m_SizeMPI)) { m_BP3Serializer.m_Aggregator.Init( + m_BP3Serializer.m_Parameters.NumAggregators, m_BP3Serializer.m_Parameters.NumAggregators, m_Comm); } InitTransports(); @@ -388,10 +389,10 @@ void BP3Writer::AggregateWriteData(const bool isFinal, const int transportIndex) // async? for (int r = 0; r < m_BP3Serializer.m_Aggregator.m_Size; ++r) { - aggregator::MPIAggregator::ExchangeRequests dataRequests = + aggregator::MPIChain::ExchangeRequests dataRequests = m_BP3Serializer.m_Aggregator.IExchange(m_BP3Serializer.m_Data, r); - aggregator::MPIAggregator::ExchangeAbsolutePositionRequests + aggregator::MPIChain::ExchangeAbsolutePositionRequests absolutePositionRequests = m_BP3Serializer.m_Aggregator.IExchangeAbsolutePosition( m_BP3Serializer.m_Data, r); diff --git a/source/adios2/engine/bp4/BP4Writer.cpp b/source/adios2/engine/bp4/BP4Writer.cpp index ba9fa834db..062318acad 100644 --- a/source/adios2/engine/bp4/BP4Writer.cpp +++ b/source/adios2/engine/bp4/BP4Writer.cpp @@ -127,6 +127,7 @@ void BP4Writer::Init() static_cast(m_BP4Serializer.m_SizeMPI)) { m_BP4Serializer.m_Aggregator.Init( + m_BP4Serializer.m_Parameters.NumAggregators, m_BP4Serializer.m_Parameters.NumAggregators, m_Comm); } InitTransports(); @@ -745,10 +746,10 @@ void BP4Writer::AggregateWriteData(const bool isFinal, const int transportIndex) // async? for (int r = 0; r < m_BP4Serializer.m_Aggregator.m_Size; ++r) { - aggregator::MPIAggregator::ExchangeRequests dataRequests = + aggregator::MPIChain::ExchangeRequests dataRequests = m_BP4Serializer.m_Aggregator.IExchange(m_BP4Serializer.m_Data, r); - aggregator::MPIAggregator::ExchangeAbsolutePositionRequests + aggregator::MPIChain::ExchangeAbsolutePositionRequests absolutePositionRequests = m_BP4Serializer.m_Aggregator.IExchangeAbsolutePosition( m_BP4Serializer.m_Data, r); diff --git a/source/adios2/engine/bp5/BP5Engine.cpp b/source/adios2/engine/bp5/BP5Engine.cpp index c2ac897c36..3be352b204 100644 --- a/source/adios2/engine/bp5/BP5Engine.cpp +++ b/source/adios2/engine/bp5/BP5Engine.cpp @@ -192,8 +192,6 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) std::string value = itKey->second; parameter = helper::StringToByteUnits( value, "for Parameter key=" + key + "in call to Open"); - parameter = - helper::StringTo(value, " in Parameter key=" + key); } }; @@ -263,6 +261,37 @@ void BP5Engine::ParseParams(IO &io, struct BP5Params &Params) } }; + auto lf_SetAggregationTypeParameter = [&](const std::string key, + int ¶meter, int def) { + auto itKey = io.m_Parameters.find(key); + parameter = def; + if (itKey != io.m_Parameters.end()) + { + std::string value = itKey->second; + std::transform(value.begin(), value.end(), value.begin(), + ::tolower); + if (value == "everyonewrites" || value == "auto") + { + parameter = (int)AggregationType::EveryoneWrites; + } + else if (value == "everyonewritesserial") + { + parameter = (int)AggregationType::EveryoneWritesSerial; + } + else if (value == "twolevelshm") + { + parameter = (int)AggregationType::TwoLevelShm; + } + else + { + throw std::invalid_argument( + "ERROR: Unknown BP5 AggregationType parameter \"" + value + + "\" (must be \"auto\", \"everyonewrites\" or " + "\"twolevelshm\""); + } + } + }; + #define get_params(Param, Type, Typedecl, Default) \ lf_Set##Type##Parameter(#Param, Params.Param, Default); BP5_FOREACH_PARAMETER_TYPE_4ARGS(get_params); diff --git a/source/adios2/engine/bp5/BP5Engine.h b/source/adios2/engine/bp5/BP5Engine.h index 609197f5c4..7841caf5cf 100644 --- a/source/adios2/engine/bp5/BP5Engine.h +++ b/source/adios2/engine/bp5/BP5Engine.h @@ -93,6 +93,14 @@ class BP5Engine BufferVType UseBufferV = BufferVType::ChunkVType; + enum class AggregationType + { + EveryoneWrites, + EveryoneWritesSerial, + TwoLevelShm, + Auto + }; + #define BP5_FOREACH_PARAMETER_TYPE_4ARGS(MACRO) \ MACRO(OpenTimeoutSecs, Int, int, 3600) \ MACRO(BeginStepPollingFrequencySecs, Int, int, 0) \ @@ -103,6 +111,10 @@ class BP5Engine MACRO(verbose, Int, int, 0) \ MACRO(CollectiveMetadata, Bool, bool, true) \ MACRO(NumAggregators, UInt, unsigned int, 999999) \ + MACRO(NumSubFiles, UInt, unsigned int, 999999) \ + MACRO(FileSystemPageSize, UInt, unsigned int, 4096) \ + MACRO(AggregationType, AggregationType, int, \ + (int)AggregationType::TwoLevelShm) \ MACRO(AsyncTasks, Bool, bool, true) \ MACRO(GrowthFactor, Float, float, DefaultBufferGrowthFactor) \ MACRO(InitialBufferSize, SizeBytes, size_t, DefaultInitialBufferSize) \ diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index d13cf9330f..b4e5b5e19c 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -136,19 +136,50 @@ uint64_t BP5Writer::WriteMetadata( void BP5Writer::WriteData(format::BufferV *Data) { format::BufferV::BufferV_iovec DataVec = Data->DataVec(); + switch (m_Parameters.AggregationType) + { + case (int)AggregationType::EveryoneWrites: + WriteData_EveryoneWrites(Data, false); + break; + case (int)AggregationType::EveryoneWritesSerial: + WriteData_EveryoneWrites(Data, true); + break; + case (int)AggregationType::TwoLevelShm: + WriteData_TwoLevelShm(Data); + break; + default: + throw std::invalid_argument( + "Aggregation method " + + std::to_string(m_Parameters.AggregationType) + + "is not supported in BP5"); + } +} + +void BP5Writer::WriteData_EveryoneWrites(format::BufferV *Data, + bool SerializedWriters) +{ + const aggregator::MPIChain *a = + dynamic_cast(m_Aggregator); + + format::BufferV::BufferV_iovec DataVec = Data->DataVec(); // new step writing starts at offset m_DataPos on aggregator // others will wait for the position to arrive from the rank below - if (m_Aggregator.m_Comm.Rank() > 0) + if (a->m_Comm.Rank() > 0) { - m_Aggregator.m_Comm.Recv(&m_DataPos, 1, m_Aggregator.m_Comm.Rank() - 1, - 0, "Chain token in BP5Writer::WriteData"); + a->m_Comm.Recv(&m_DataPos, 1, a->m_Comm.Rank() - 1, 0, + "Chain token in BP5Writer::WriteData"); } + + // align to PAGE_SIZE + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, + m_Parameters.FileSystemPageSize); m_StartDataPos = m_DataPos; - if (m_Aggregator.m_Comm.Rank() < m_Aggregator.m_Comm.Size() - 1) + if (!SerializedWriters && a->m_Comm.Rank() < a->m_Comm.Size() - 1) { + /* Send the token before writing so everyone can start writing asap */ int i = 0; uint64_t nextWriterPos = m_DataPos; while (DataVec[i].iov_base != NULL) @@ -156,9 +187,8 @@ void BP5Writer::WriteData(format::BufferV *Data) nextWriterPos += DataVec[i].iov_len; i++; } - m_Aggregator.m_Comm.Isend(&nextWriterPos, 1, - m_Aggregator.m_Comm.Rank() + 1, 0, - "Chain token in BP5Writer::WriteData"); + a->m_Comm.Isend(&nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, + "Chain token in BP5Writer::WriteData"); } int i = 0; @@ -166,6 +196,7 @@ void BP5Writer::WriteData(format::BufferV *Data) { if (i == 0) { + m_FileDataManager.WriteFileAt((char *)DataVec[i].iov_base, DataVec[i].iov_len, m_StartDataPos); } @@ -178,23 +209,30 @@ void BP5Writer::WriteData(format::BufferV *Data) i++; } - if (m_Aggregator.m_Comm.Size() > 1) + if (SerializedWriters && a->m_Comm.Rank() < a->m_Comm.Size() - 1) + { + /* send token now, effectively serializing the writers in the chain */ + uint64_t nextWriterPos = m_DataPos; + a->m_Comm.Isend(&nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, + "Chain token in BP5Writer::WriteData"); + } + + if (a->m_Comm.Size() > 1) { // at the end, last rank sends back the final data pos to first rank // so it can update its data pos - if (m_Aggregator.m_Comm.Rank() == m_Aggregator.m_Comm.Size() - 1) + if (a->m_Comm.Rank() == a->m_Comm.Size() - 1) { - m_Aggregator.m_Comm.Isend( - &m_DataPos, 1, 0, 0, - "Final chain token in BP5Writer::WriteData"); + a->m_Comm.Isend(&m_DataPos, 1, 0, 0, + "Final chain token in BP5Writer::WriteData"); } - if (m_Aggregator.m_Comm.Rank() == 0) + if (a->m_Comm.Rank() == 0) { - m_Aggregator.m_Comm.Recv(&m_DataPos, 1, - m_Aggregator.m_Comm.Size() - 1, 0, - "Chain token in BP5Writer::WriteData"); + a->m_Comm.Recv(&m_DataPos, 1, a->m_Comm.Size() - 1, 0, + "Chain token in BP5Writer::WriteData"); } } + delete[] DataVec; } @@ -351,15 +389,7 @@ void BP5Writer::Init() m_BP5Serializer.m_Engine = this; m_RankMPI = m_Comm.Rank(); InitParameters(); - if (m_Parameters.NumAggregators > static_cast(m_Comm.Size())) - { - m_Parameters.NumAggregators = static_cast(m_Comm.Size()); - } - // in BP5, aggregation is "always on", but processes may be alone, so - // m_Aggregator.m_IsActive is always true - // m_Aggregator.m_Comm.Rank() will always succeed (not abort) - // m_Aggregator.m_SubFileIndex is always set - m_Aggregator.Init(m_Parameters.NumAggregators, m_Comm); + InitAggregator(); InitTransports(); InitBPBuffer(); } @@ -382,6 +412,71 @@ void BP5Writer::InitParameters() ParseParams(m_IO, m_Parameters); m_WriteToBB = !(m_Parameters.BurstBufferPath.empty()); m_DrainBB = m_WriteToBB && m_Parameters.BurstBufferDrain; + + if (m_Parameters.NumAggregators > static_cast(m_Comm.Size())) + { + m_Parameters.NumAggregators = static_cast(m_Comm.Size()); + } + + if (m_Parameters.NumSubFiles > m_Parameters.NumAggregators) + { + m_Parameters.NumSubFiles = m_Parameters.NumAggregators; + } + + if (m_Parameters.FileSystemPageSize == 0) + { + m_Parameters.FileSystemPageSize = 4096; + } + if (m_Parameters.FileSystemPageSize > 67108864) + { + // Limiting to max 64MB page size + m_Parameters.FileSystemPageSize = 67108864; + } +} + +void BP5Writer::InitAggregator() +{ + // in BP5, aggregation is "always on", but processes may be alone, so + // m_Aggregator.m_IsActive is always true + // m_Aggregator.m_Comm.Rank() will always succeed (not abort) + // m_Aggregator.m_SubFileIndex is always set + if (m_Parameters.AggregationType == (int)AggregationType::EveryoneWrites || + m_Parameters.AggregationType == + (int)AggregationType::EveryoneWritesSerial) + { + m_Parameters.NumSubFiles = m_Parameters.NumAggregators; + m_AggregatorEveroneWrites.Init(m_Parameters.NumAggregators, + m_Parameters.NumSubFiles, m_Comm); + m_IAmDraining = m_AggregatorEveroneWrites.m_IsAggregator; + m_IAmWritingDataHeader = m_AggregatorEveroneWrites.m_IsAggregator; + m_IAmWritingData = true; + DataWritingComm = &m_AggregatorEveroneWrites.m_Comm; + m_Aggregator = static_cast( + &m_AggregatorEveroneWrites); + } + else + { + size_t numNodes = m_AggregatorTwoLevelShm.PreInit(m_Comm); + m_AggregatorTwoLevelShm.Init(m_Parameters.NumAggregators, + m_Parameters.NumSubFiles, m_Comm); + + /*std::cout << "Rank " << m_RankMPI << " aggr? " + << m_AggregatorTwoLevelShm.m_IsAggregator << " master? " + << m_AggregatorTwoLevelShm.m_IsMasterAggregator + << " aggr size = " << m_AggregatorTwoLevelShm.m_Size + << " rank = " << m_AggregatorTwoLevelShm.m_Rank + << " subfile = " << m_AggregatorTwoLevelShm.m_SubStreamIndex + << " type = " << m_Parameters.AggregationType + + << std::endl;*/ + + m_IAmDraining = m_AggregatorTwoLevelShm.m_IsMasterAggregator; + m_IAmWritingData = m_AggregatorTwoLevelShm.m_IsAggregator; + m_IAmWritingDataHeader = m_AggregatorTwoLevelShm.m_IsMasterAggregator; + DataWritingComm = &m_AggregatorTwoLevelShm.m_AggregatorChainComm; + m_Aggregator = + static_cast(&m_AggregatorTwoLevelShm); + } } void BP5Writer::InitTransports() @@ -414,18 +509,18 @@ void BP5Writer::InitTransports() // /path/name.bp.dir/name.bp.rank m_SubStreamNames = - GetBPSubStreamNames(transportsNames, m_Aggregator.m_SubStreamIndex); + GetBPSubStreamNames(transportsNames, m_Aggregator->m_SubStreamIndex); - if (m_Aggregator.m_IsAggregator) + if (m_IAmDraining) { - // Only aggregators will run draining processes + // Only (master)aggregators will run draining processes if (m_DrainBB) { const std::vector drainTransportNames = m_FileDataManager.GetFilesBaseNames( m_Name, m_IO.m_TransportsParameters); m_DrainSubStreamNames = GetBPSubStreamNames( - drainTransportNames, m_Aggregator.m_SubStreamIndex); + drainTransportNames, m_Aggregator->m_SubStreamIndex); /* start up BB thread */ // m_FileDrainer.SetVerbose( // m_Parameters.BurstBufferVerbose, @@ -463,11 +558,15 @@ void BP5Writer::InitTransports() m_IO.m_TransportsParameters[i]["asynctasks"] = "true"; } } - m_FileDataManager.OpenFiles(m_SubStreamNames, m_OpenMode, - m_IO.m_TransportsParameters, false, - m_Aggregator.m_Comm); - if (m_Aggregator.m_IsAggregator) + if (m_IAmWritingData) + { + m_FileDataManager.OpenFiles(m_SubStreamNames, m_OpenMode, + m_IO.m_TransportsParameters, false, + *DataWritingComm); + } + + if (m_IAmDraining) { if (m_DrainBB) { @@ -654,11 +753,26 @@ void BP5Writer::InitBPBuffer() * them yet so that Open() can stay free of writing to disk) */ - const uint64_t a = static_cast(m_Aggregator.m_SubStreamIndex); + const uint64_t a = static_cast(m_Aggregator->m_SubStreamIndex); + std::vector Assignment = m_Comm.GatherValues(a, 0); - m_Assignment = m_Comm.GatherValues(a, 0); - - if (m_Aggregator.m_IsAggregator) + if (m_Comm.Rank() == 0) + { + format::BufferSTL b; + MakeHeader(b, "Metadata", false); + m_FileMetadataManager.WriteFiles(b.m_Buffer.data(), b.m_Position); + m_MetaDataPos = b.m_Position; + format::BufferSTL bi; + MakeHeader(bi, "Index Table", true); + m_FileMetadataIndexManager.WriteFiles(bi.m_Buffer.data(), + bi.m_Position); + // where each rank's data will end up + m_FileMetadataIndexManager.WriteFiles((char *)Assignment.data(), + sizeof(Assignment[0]) * + Assignment.size()); + } + + if (m_Aggregator->m_IsAggregator) { m_DataPos = 0; } diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index 3be368e2de..c94a58bdc6 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -13,7 +13,9 @@ #include "adios2/core/Engine.h" #include "adios2/engine/bp5/BP5Engine.h" #include "adios2/helper/adiosComm.h" +#include "adios2/helper/adiosMemory.h" // PaddingToAlignOffset #include "adios2/toolkit/aggregator/mpi/MPIChain.h" +#include "adios2/toolkit/aggregator/mpi/MPIShmChain.h" #include "adios2/toolkit/burstbuffer/FileDrainerSingleThread.h" #include "adios2/toolkit/format/bp5/BP5Serializer.h" #include "adios2/toolkit/transportman/TransportMan.h" @@ -93,6 +95,8 @@ class BP5Writer : public BP5Engine, public core::Engine /** Parses parameters from IO SetParameters */ void InitParameters() final; + /** Set up the aggregator */ + void InitAggregator(); /** Parses transports and parameters from IO AddTransport */ void InitTransports() final; /** Allocates memory and starts a PG group */ @@ -127,6 +131,9 @@ class BP5Writer : public BP5Engine, public core::Engine /** Write Data to disk, in an aggregator chain */ void WriteData(format::BufferV *Data); + void WriteData_EveryoneWrites(format::BufferV *Data, + bool SerializedWriters); + void WriteData_TwoLevelShm(format::BufferV *Data); void PopulateMetadataIndexFileContent( format::BufferSTL &buffer, const uint64_t currentStep, @@ -140,17 +147,11 @@ class BP5Writer : public BP5Engine, public core::Engine void MarshalAttributes(); - /** - * N-to-N data buffers writes, including metadata file - * @param transportIndex - */ - // void WriteData(const bool isFinal, const int transportIndex = -1); - - /** - * N-to-M (aggregation) data buffers writes, including metadata file - * @param transportIndex - */ - void AggregateWriteData(const bool isFinal, const int transportIndex = -1); + /* Two-level-shm aggregator functions */ + void WriteMyOwnData(format::BufferV::BufferV_iovec DataVec); + void SendDataToAggregator(format::BufferV::BufferV_iovec DataVec, + const size_t TotalSize); + void WriteOthersData(const size_t TotalSize); template T *BufferDataCommon(const size_t payloadOffset, @@ -160,7 +161,13 @@ class BP5Writer : public BP5Engine, public core::Engine void PerformPutCommon(Variable &variable); /** manages all communication tasks in aggregation */ - aggregator::MPIChain m_Aggregator; + aggregator::MPIAggregator *m_Aggregator; // points to one of these below + aggregator::MPIShmChain m_AggregatorTwoLevelShm; + aggregator::MPIChain m_AggregatorEveroneWrites; + bool m_IAmDraining = false; + bool m_IAmWritingData = false; + helper::Comm *DataWritingComm; // processes that write the same data file + bool m_IAmWritingDataHeader = false; private: // updated during WriteMetaData diff --git a/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp new file mode 100644 index 0000000000..2aaced4374 --- /dev/null +++ b/source/adios2/engine/bp5/BP5Writer_TwoLevelShm.cpp @@ -0,0 +1,283 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * BP5Writer.cpp + * + */ + +#include "BP5Writer.h" + +#include "adios2/common/ADIOSMacros.h" +#include "adios2/core/IO.h" +#include "adios2/helper/adiosFunctions.h" //CheckIndexRange +#include "adios2/toolkit/format/buffer/chunk/ChunkV.h" +#include "adios2/toolkit/format/buffer/malloc/MallocV.h" +#include "adios2/toolkit/transport/file/FileFStream.h" +#include + +#include +#include + +namespace adios2 +{ +namespace core +{ +namespace engine +{ + +using namespace adios2::format; + +void BP5Writer::WriteData_TwoLevelShm(format::BufferV *Data) +{ + const aggregator::MPIShmChain *a = + dynamic_cast(m_Aggregator); + + format::BufferV::BufferV_iovec DataVec = Data->DataVec(); + + // new step writing starts at offset m_DataPos on master aggregator + // other aggregators to the same file will need to wait for the position + // to arrive from the rank below + + // align to PAGE_SIZE (only valid on master aggregator at this point) + m_DataPos += helper::PaddingToAlignOffset(m_DataPos, + m_Parameters.FileSystemPageSize); + + /* + // Each aggregator needs to know the total size they write + // including alignment to page size + // This calculation is valid on aggregators only + std::vector mySizes = a->m_Comm.GatherValues(Data->Size()); + uint64_t myTotalSize = 0; + uint64_t pos = m_DataPos; + for (auto s : mySizes) + { + uint64_t alignment = + helper::PaddingToAlignOffset(pos, m_Parameters.FileSystemPageSize); + myTotalSize += alignment + s; + pos += alignment + s; + } + */ + + // Each aggregator needs to know the total size they write + // This calculation is valid on aggregators only + std::vector mySizes = a->m_Comm.GatherValues(Data->Size()); + uint64_t myTotalSize = 0; + for (auto s : mySizes) + { + myTotalSize += s; + } + + if (a->m_IsAggregator) + { + // In each aggregator chain, send from master down the line + // these total sizes, so every aggregator knows where to start + if (a->m_AggregatorChainComm.Rank() > 0) + { + a->m_AggregatorChainComm.Recv( + &m_DataPos, 1, a->m_AggregatorChainComm.Rank() - 1, 0, + "AggregatorChain token in BP5Writer::WriteData_TwoLevelShm"); + // align to PAGE_SIZE + m_DataPos += helper::PaddingToAlignOffset( + m_DataPos, m_Parameters.FileSystemPageSize); + } + m_StartDataPos = m_DataPos; // metadata needs this info + if (a->m_AggregatorChainComm.Rank() < + a->m_AggregatorChainComm.Size() - 1) + { + uint64_t nextWriterPos = m_DataPos + myTotalSize; + a->m_AggregatorChainComm.Isend( + &nextWriterPos, 1, a->m_AggregatorChainComm.Rank() + 1, 0, + "Chain token in BP5Writer::WriteData"); + } + else if (a->m_AggregatorChainComm.Size() > 1) + { + // send back final position from last aggregator in file to master + // aggregator + uint64_t nextWriterPos = m_DataPos + myTotalSize; + a->m_AggregatorChainComm.Isend( + &nextWriterPos, 1, 0, 0, "Chain token in BP5Writer::WriteData"); + } + + /*std::cout << "Rank " << m_Comm.Rank() + << " aggregator start writing step " << m_WriterStep + << " to subfile " << a->m_SubStreamIndex << " at pos " + << m_DataPos << " totalsize " << myTotalSize << std::endl;*/ + + // Send token to first non-aggregator to start filling shm + // Also informs next process its starting offset (for correct metadata) + if (a->m_Comm.Size() > 1) + { + uint64_t nextWriterPos = m_DataPos + Data->Size(); + a->m_Comm.Isend(&nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, + "Shm token in BP5Writer::WriteData_TwoLevelShm"); + } + + WriteMyOwnData(DataVec); + + /* Write from shm until every non-aggr sent all data */ + if (a->m_Comm.Size() > 1) + { + WriteOthersData(myTotalSize - Data->Size()); + } + + // Master aggregator needs to know where the last writing ended by the + // last aggregator in the chain, so that it can start from the correct + // position at the next output step + if (a->m_AggregatorChainComm.Size() > 1 && + !a->m_AggregatorChainComm.Rank()) + { + a->m_AggregatorChainComm.Recv( + &m_DataPos, 1, a->m_AggregatorChainComm.Size() - 1, 0, + "Chain token in BP5Writer::WriteData"); + } + } + else + { + // non-aggregators fill shared buffer in marching order + // they also receive their starting offset this way + a->m_Comm.Recv(&m_StartDataPos, 1, a->m_Comm.Rank() - 1, 0, + "Shm token in BP5Writer::WriteData_TwoLevelShm"); + + /*std::cout << "Rank " << m_Comm.Rank() + << " non-aggregator recv token to fill shm = " + << m_StartDataPos << std::endl;*/ + + SendDataToAggregator(DataVec, Data->Size()); + + if (a->m_Comm.Rank() < a->m_Comm.Size() - 1) + { + uint64_t nextWriterPos = m_StartDataPos + Data->Size(); + a->m_Comm.Isend(&nextWriterPos, 1, a->m_Comm.Rank() + 1, 0, + "Shm token in BP5Writer::WriteData_TwoLevelShm"); + } + } + + delete[] DataVec; +} + +void BP5Writer::WriteMyOwnData(format::BufferV::BufferV_iovec DataVec) +{ + m_StartDataPos = m_DataPos; + int i = 0; + while (DataVec[i].iov_base != NULL) + { + if (i == 0) + { + m_FileDataManager.WriteFileAt((char *)DataVec[i].iov_base, + DataVec[i].iov_len, m_StartDataPos); + } + else + { + m_FileDataManager.WriteFiles((char *)DataVec[i].iov_base, + DataVec[i].iov_len); + } + m_DataPos += DataVec[i].iov_len; + i++; + } +} + +void BP5Writer::SendDataToAggregator(format::BufferV::BufferV_iovec DataVec, + const size_t TotalSize) +{ + /* Only one process is running this function at once + See shmFillerToken in the caller function + + In a loop, copy the local data into the shared memory, alternating + between the two segments. + */ + + aggregator::MPIShmChain *a = + dynamic_cast(m_Aggregator); + + size_t sent = 0; + int block = 0; + size_t temp_offset = 0; + while (DataVec[block].iov_base != nullptr) + { + // potentially blocking call waiting on Aggregator + aggregator::MPIShmChain::ShmDataBuffer *b = a->LockProducerBuffer(); + // b->max_size: how much we can copy + // b->actual_size: how much we actually copy + b->actual_size = 0; + while (true) + { + if (DataVec[block].iov_base == nullptr) + { + break; + } + /* Copy n bytes from the current block, current offset to shm + making sure to use up to shm_size bytes + */ + size_t n = DataVec[block].iov_len - temp_offset; + if (n > (b->max_size - b->actual_size)) + { + n = b->max_size - b->actual_size; + } + std::memcpy(&b->buf[b->actual_size], + (const char *)DataVec[block].iov_base + temp_offset, n); + b->actual_size += n; + + /* Have we processed the entire block or staying with it? */ + if (n + temp_offset < DataVec[block].iov_len) + { + temp_offset += n; + } + else + { + temp_offset = 0; + ++block; + } + + /* Have we reached the max allowed shm size ?*/ + if (b->actual_size >= b->max_size) + { + break; + } + } + sent += b->actual_size; + + /*std::cout << "Rank " << m_Comm.Rank() + << " filled shm, data_size = " << b->actual_size + << " block = " << block << " temp offset = " << temp_offset + << " sent = " << sent + << " buf = " << static_cast(b->buf) << " = [" + << (int)b->buf[0] << (int)b->buf[1] << "..." + << (int)b->buf[b->actual_size - 2] + << (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/ + + a->UnlockProducerBuffer(); + } +} +void BP5Writer::WriteOthersData(size_t TotalSize) +{ + /* Only an Aggregator calls this function */ + aggregator::MPIShmChain *a = + dynamic_cast(m_Aggregator); + + size_t wrote = 0; + while (wrote < TotalSize) + { + // potentially blocking call waiting on some non-aggr process + aggregator::MPIShmChain::ShmDataBuffer *b = a->LockConsumerBuffer(); + + /*std::cout << "Rank " << m_Comm.Rank() + << " write from shm, data_size = " << b->actual_size + << " total so far = " << wrote + << " buf = " << static_cast(b->buf) << " = [" + << (int)b->buf[0] << (int)b->buf[1] << "..." + << (int)b->buf[b->actual_size - 2] + << (int)b->buf[b->actual_size - 1] << "]" << std::endl;*/ + + // b->actual_size: how much we need to write + m_FileDataManager.WriteFiles(b->buf, b->actual_size); + + wrote += b->actual_size; + + a->UnlockConsumerBuffer(); + } +} + +} // end namespace engine +} // end namespace core +} // end namespace adios2 diff --git a/source/adios2/helper/adiosComm.cpp b/source/adios2/helper/adiosComm.cpp index 42ce4d7fce..4c119f65cc 100644 --- a/source/adios2/helper/adiosComm.cpp +++ b/source/adios2/helper/adiosComm.cpp @@ -114,6 +114,31 @@ std::vector Comm::GetGathervDisplacements(const size_t *counts, return displacements; } +Comm::Win Comm::Win_allocate_shared(size_t size, int disp_unit, void *baseptr, + const std::string &hint) +{ + return m_Impl->Win_allocate_shared(size, disp_unit, baseptr, hint); +} +int Comm::Win_shared_query(Comm::Win &win, int rank, size_t *size, + int *disp_unit, void *baseptr, + const std::string &hint) +{ + return m_Impl->Win_shared_query(win, rank, size, disp_unit, baseptr, hint); +} +int Comm::Win_free(Win &win, const std::string &hint) +{ + return m_Impl->Win_free(win, hint); +} +int Comm::Win_Lock(LockType lock_type, int rank, int assert, Win &win, + const std::string &hint) +{ + return m_Impl->Win_Lock(lock_type, rank, assert, win, hint); +} +int Comm::Win_Unlock(int rank, Win &win, const std::string &hint) +{ + return m_Impl->Win_Unlock(rank, win, hint); +} + Comm::Req::Req() = default; Comm::Req::Req(std::unique_ptr impl) : m_Impl(std::move(impl)) {} @@ -135,6 +160,27 @@ Comm::Status Comm::Req::Wait(const std::string &hint) return status; } +Comm::Win::Win() = default; + +Comm::Win::Win(std::unique_ptr impl) : m_Impl(std::move(impl)) {} + +Comm::Win::~Win() = default; + +Comm::Win::Win(Win &&win) = default; + +Comm::Win &Comm::Win::operator=(Win &&win) = default; + +int Comm::Win::Free(const std::string &hint) +{ + int status = 0; + if (m_Impl) + { + status = m_Impl->Free(hint); + m_Impl.reset(); + } + return status; +} + CommImpl::~CommImpl() = default; size_t CommImpl::SizeOf(Datatype datatype) { return ToSize(datatype); } @@ -149,9 +195,18 @@ Comm::Req CommImpl::MakeReq(std::unique_ptr impl) return Comm::Req(std::move(impl)); } +Comm::Win CommImpl::MakeWin(std::unique_ptr impl) +{ + return Comm::Win(std::move(impl)); +} + CommImpl *CommImpl::Get(Comm const &comm) { return comm.m_Impl.get(); } +CommWinImpl *CommWinImpl::Get(Comm::Win const &win) { return win.m_Impl.get(); } + CommReqImpl::~CommReqImpl() = default; +CommWinImpl::~CommWinImpl() = default; + } // end namespace helper } // end namespace adios2 diff --git a/source/adios2/helper/adiosComm.h b/source/adios2/helper/adiosComm.h index f8e2c7cf9e..4f4ed2fc54 100644 --- a/source/adios2/helper/adiosComm.h +++ b/source/adios2/helper/adiosComm.h @@ -19,6 +19,7 @@ namespace helper class CommImpl; class CommReqImpl; +class CommWinImpl; /** @brief Encapsulation for communication in a multi-process environment. */ class Comm @@ -26,6 +27,7 @@ class Comm public: class Req; class Status; + class Win; /** * @brief Enumeration of element-wise accumulation operations. @@ -49,6 +51,15 @@ class Comm None, }; + /** + * @brief Enumeration of locking operations. + */ + enum class LockType + { + Exclusive, + Shared + }; + /** * @brief Default constructor. Produces an empty communicator. * @@ -254,6 +265,16 @@ class Comm Req Irecv(T *buffer, const size_t count, int source, int tag, const std::string &hint = std::string()) const; + Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, + const std::string &hint = std::string()); + int Win_shared_query(Win &win, int rank, size_t *size, int *disp_unit, + void *baseptr, + const std::string &hint = std::string()); + int Win_free(Win &win, const std::string &hint = std::string()); + int Win_Lock(LockType lock_type, int rank, int assert, Win &win, + const std::string &hint = std::string()); + int Win_Unlock(int rank, Win &win, const std::string &hint = std::string()); + private: friend class CommImpl; @@ -340,6 +361,59 @@ class Comm::Status bool Cancelled = false; }; +class Comm::Win +{ +public: + /** + * @brief Default constructor. Produces an empty Win. + * + * An empty Win may not be used. + */ + Win(); + + /** + * @brief Move constructor. Moves Win state from that given. + * + * The moved-from Win is left empty and may not be used. + */ + Win(Win &&); + + /** + * @brief Deleted copy constructor. A Win may not be copied. + */ + Win(Win const &) = delete; + + ~Win(); + + /** + * @brief Move assignment. Moves Win state from that given. + * + * The moved-from Win is left empty and may not be used. + */ + Win &operator=(Win &&); + + /** + * @brief Deleted copy assignment. A Win may not be copied. + */ + Win &operator=(Win const &) = delete; + + /** + * @brief Free the Win object. + * + * On return, the Win is empty. + * For an MPI Win object this is equivalent to the call MPI_Win_free() + */ + int Free(const std::string &hint = std::string()); + +private: + friend class CommImpl; + friend class CommWinImpl; + + explicit Win(std::unique_ptr impl); + + std::unique_ptr m_Impl; +}; + class CommImpl { public: @@ -437,10 +511,23 @@ class CommImpl int source, int tag, const std::string &hint) const = 0; + virtual Comm::Win Win_allocate_shared(size_t size, int disp_unit, + void *baseptr, + const std::string &hint) const = 0; + virtual int Win_shared_query(Comm::Win &win, int rank, size_t *size, + int *disp_unit, void *baseptr, + const std::string &hint) const = 0; + virtual int Win_free(Comm::Win &win, const std::string &hint) const = 0; + virtual int Win_Lock(Comm::LockType lock_type, int rank, int assert, + Comm::Win &win, const std::string &hint) const = 0; + virtual int Win_Unlock(int rank, Comm::Win &win, + const std::string &hint) const = 0; + static size_t SizeOf(Datatype datatype); static Comm MakeComm(std::unique_ptr impl); static Comm::Req MakeReq(std::unique_ptr impl); + static Comm::Win MakeWin(std::unique_ptr impl); static CommImpl *Get(Comm const &comm); }; @@ -451,6 +538,14 @@ class CommReqImpl virtual Comm::Status Wait(const std::string &hint) = 0; }; +class CommWinImpl +{ +public: + virtual ~CommWinImpl() = 0; + virtual int Free(const std::string &hint) = 0; + static CommWinImpl *Get(Comm::Win const &win); +}; + } // end namespace helper } // end namespace adios2 diff --git a/source/adios2/helper/adiosCommDummy.cpp b/source/adios2/helper/adiosCommDummy.cpp index eddf9827d5..7c4ae8fd00 100644 --- a/source/adios2/helper/adiosCommDummy.cpp +++ b/source/adios2/helper/adiosCommDummy.cpp @@ -38,6 +38,17 @@ class CommReqImplDummy : public CommReqImpl CommReqImplDummy::~CommReqImplDummy() = default; +class CommWinImplDummy : public CommWinImpl +{ +public: + CommWinImplDummy() {} + ~CommWinImplDummy() override; + + int Free(const std::string &hint) override; +}; + +CommWinImplDummy::~CommWinImplDummy() = default; + class CommImplDummy : public CommImpl { public: @@ -104,6 +115,16 @@ class CommImplDummy : public CommImpl Comm::Req Irecv(void *buffer, size_t count, Datatype datatype, int source, int tag, const std::string &hint) const override; + + Comm::Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, + const std::string &hint) const override; + int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, + void *baseptr, const std::string &hint) const override; + int Win_free(Comm::Win &win, const std::string &hint) const override; + int Win_Lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, + const std::string &hint) const override; + int Win_Unlock(int rank, Comm::Win &win, + const std::string &hint) const override; }; CommImplDummy::~CommImplDummy() = default; @@ -285,12 +306,50 @@ Comm::Req CommImplDummy::Irecv(void *, size_t, Datatype, int, int, return MakeReq(std::move(req)); } +Comm::Win CommImplDummy::Win_allocate_shared(size_t size, int disp_unit, + void *baseptr, + const std::string &) const +{ + auto win = std::unique_ptr(new CommWinImplDummy()); + // TODO: How do you set the out pointer to NULL? baseptr = nullptr; + return MakeWin(std::move(win)); +} + +int CommImplDummy::Win_shared_query(Comm::Win &win, int rank, size_t *size, + int *disp_unit, void *baseptr, + const std::string &) const +{ + *size = 0; + *disp_unit = 1; + // TODO: How do you set the out pointer to NULL? baseptr = nullptr; + return 0; +} + +int CommImplDummy::Win_free(Comm::Win &win, const std::string &) const +{ + win.Free(); + return 0; +} + +int CommImplDummy::Win_Lock(Comm::LockType lock_type, int rank, int assert, + Comm::Win &win, const std::string &) const +{ + return 0; +} +int CommImplDummy::Win_Unlock(int rank, Comm::Win &win, + const std::string &) const +{ + return 0; +} + Comm::Status CommReqImplDummy::Wait(const std::string &hint) { Comm::Status status; return status; } +int CommWinImplDummy::Free(const std::string &hint) { return 0; } + Comm CommDummy() { auto comm = std::unique_ptr(new CommImplDummy()); diff --git a/source/adios2/helper/adiosCommMPI.cpp b/source/adios2/helper/adiosCommMPI.cpp index eebc4e7251..0ccf1e39e3 100644 --- a/source/adios2/helper/adiosCommMPI.cpp +++ b/source/adios2/helper/adiosCommMPI.cpp @@ -43,6 +43,10 @@ const MPI_Op OpToMPI[] = { MPI_Op ToMPI(Comm::Op op) { return OpToMPI[int(op)]; } +const int LockTypeToMPI[] = {MPI_LOCK_EXCLUSIVE, MPI_LOCK_SHARED}; + +int ToMPI(Comm::LockType lock_type) { return LockTypeToMPI[int(lock_type)]; } + const MPI_Datatype DatatypeToMPI[] = { MPI_SIGNED_CHAR, MPI_CHAR, @@ -108,6 +112,19 @@ class CommReqImplMPI : public CommReqImpl CommReqImplMPI::~CommReqImplMPI() = default; +class CommWinImplMPI : public CommWinImpl +{ +public: + CommWinImplMPI() {} + ~CommWinImplMPI() override; + + int Free(const std::string &hint) override; + + MPI_Win m_Win; +}; + +CommWinImplMPI::~CommWinImplMPI() = default; + class CommImplMPI : public CommImpl { public: @@ -177,6 +194,16 @@ class CommImplMPI : public CommImpl Comm::Req Irecv(void *buffer, size_t count, Datatype datatype, int source, int tag, const std::string &hint) const override; + + Comm::Win Win_allocate_shared(size_t size, int disp_unit, void *baseptr, + const std::string &hint) const override; + int Win_shared_query(Comm::Win &win, int rank, size_t *size, int *disp_unit, + void *baseptr, const std::string &hint) const override; + int Win_free(Comm::Win &win, const std::string &hint) const override; + int Win_Lock(Comm::LockType lock_type, int rank, int assert, Comm::Win &win, + const std::string &hint) const override; + int Win_Unlock(int rank, Comm::Win &win, + const std::string &hint) const override; }; CommImplMPI::~CommImplMPI() @@ -519,6 +546,56 @@ Comm::Req CommImplMPI::Irecv(void *buffer, size_t count, Datatype datatype, return MakeReq(std::move(req)); } +Comm::Win CommImplMPI::Win_allocate_shared(size_t size, int disp_unit, + void *baseptr, + const std::string &hint) const +{ + auto w = std::unique_ptr(new CommWinImplMPI()); + MPI_Aint asize = static_cast(size); + CheckMPIReturn(MPI_Win_allocate_shared(asize, disp_unit, MPI_INFO_NULL, + m_MPIComm, baseptr, &w->m_Win), + "in call to Win_allocate_shared " + hint + "\n"); + return MakeWin(std::move(w)); +} + +int CommImplMPI::Win_shared_query(Comm::Win &win, int rank, size_t *size, + int *disp_unit, void *baseptr, + const std::string &hint) const +{ + CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); + MPI_Aint asize; + int ret = MPI_Win_shared_query(w->m_Win, rank, &asize, disp_unit, baseptr); + CheckMPIReturn(ret, "in call to Win_shared_query " + hint + "\n"); + *size = static_cast(asize); + return ret; +} + +int CommImplMPI::Win_free(Comm::Win &win, const std::string &hint) const +{ + CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); + int ret = MPI_Win_free(&w->m_Win); + CheckMPIReturn(ret, "in call to Win_free " + hint + "\n"); + return ret; +} + +int CommImplMPI::Win_Lock(Comm::LockType lock_type, int rank, int assert, + Comm::Win &win, const std::string &hint) const +{ + CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); + int mpi_lock_type = ToMPI(lock_type); + int ret = MPI_Win_lock(mpi_lock_type, rank, assert, w->m_Win); + CheckMPIReturn(ret, "in call to Win_Lock " + hint + "\n"); + return ret; +} +int CommImplMPI::Win_Unlock(int rank, Comm::Win &win, + const std::string &hint) const +{ + CommWinImplMPI *w = dynamic_cast(CommWinImpl::Get(win)); + int ret = MPI_Win_unlock(rank, w->m_Win); + CheckMPIReturn(ret, "in call to Win_Lock " + hint + "\n"); + return ret; +} + Comm::Status CommReqImplMPI::Wait(const std::string &hint) { Comm::Status status; @@ -580,6 +657,11 @@ Comm::Status CommReqImplMPI::Wait(const std::string &hint) return status; } +int CommWinImplMPI::Free(const std::string &hint) +{ + return MPI_Win_free(&m_Win); +} + Comm CommWithMPI(MPI_Comm mpiComm) { static InitMPI const initMPI; diff --git a/source/adios2/helper/adiosMemory.cpp b/source/adios2/helper/adiosMemory.cpp index 3bf8c89adc..549c2c3af4 100644 --- a/source/adios2/helper/adiosMemory.cpp +++ b/source/adios2/helper/adiosMemory.cpp @@ -297,5 +297,15 @@ size_t PaddingToAlignPointer(const void *ptr) return padSize; } +uint64_t PaddingToAlignOffset(uint64_t offset, uint64_t alignment_size) +{ + uint64_t padSize = alignment_size - (offset % alignment_size); + if (padSize == alignment_size) + { + padSize = 0; + } + return padSize; +} + } // end namespace helper } // end namespace adios2 diff --git a/source/adios2/helper/adiosMemory.h b/source/adios2/helper/adiosMemory.h index 9d5e40fbd5..009cf1b318 100644 --- a/source/adios2/helper/adiosMemory.h +++ b/source/adios2/helper/adiosMemory.h @@ -243,6 +243,10 @@ size_t PayloadSize(const T *data, const Dims &count) noexcept; */ size_t PaddingToAlignPointer(const void *ptr); +/** Calculate padding to an arbitrary offset to be aligned to + * the size alignment_size */ +uint64_t PaddingToAlignOffset(uint64_t offset, uint64_t alignment_size); + } // end namespace helper } // end namespace adios2 diff --git a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp index f995ba27e6..7af472fc72 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp +++ b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.cpp @@ -19,27 +19,14 @@ namespace aggregator MPIAggregator::MPIAggregator() {} -MPIAggregator::~MPIAggregator() -{ - if (m_IsActive) - { - m_Comm.Free("freeing aggregators comm in MPIAggregator " - "destructor, not recommended"); - } -} +MPIAggregator::~MPIAggregator() { Close(); } -void MPIAggregator::Init(const size_t subStreams, +void MPIAggregator::Init(const size_t numAggregators, const size_t subStreams, helper::Comm const &parentComm) -{ -} - -void MPIAggregator::SwapBuffers(const int step) noexcept {} -void MPIAggregator::ResetBuffers() noexcept {} - -format::Buffer &MPIAggregator::GetConsumerBuffer(format::Buffer &buffer) { - return buffer; + m_NumAggregators = numAggregators; + m_SubStreams = subStreams; } void MPIAggregator::Close() diff --git a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h index ac61425373..67f2b08770 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h +++ b/source/adios2/toolkit/aggregator/mpi/MPIAggregator.h @@ -31,6 +31,11 @@ class MPIAggregator /** current substream index from 0 to m_SubStreams-1 */ size_t m_SubStreamIndex = 0; + /** total number of aggregators + * (BP3/BP4 uses aggregators = substreams) + */ + size_t m_NumAggregators = 0; + /** split Communicator for a substream: producers and consumer (rank=0) */ helper::Comm m_Comm; @@ -57,41 +62,11 @@ class MPIAggregator virtual ~MPIAggregator(); - virtual void Init(const size_t subStreams, helper::Comm const &parentComm); - - struct ExchangeRequests - { - helper::Comm::Req m_SendSize; - helper::Comm::Req m_SendData; - helper::Comm::Req m_RecvData; - }; - - virtual ExchangeRequests IExchange(format::Buffer &buffer, - const int step) = 0; - - struct ExchangeAbsolutePositionRequests - { - helper::Comm::Req m_Send; - helper::Comm::Req m_Recv; - }; - - virtual ExchangeAbsolutePositionRequests - IExchangeAbsolutePosition(format::Buffer &buffer, const int step) = 0; - - virtual void - WaitAbsolutePosition(ExchangeAbsolutePositionRequests &requests, - const int step) = 0; - - virtual void Wait(ExchangeRequests &requests, const int step) = 0; - - virtual void SwapBuffers(const int step) noexcept; - - virtual void ResetBuffers() noexcept; - - virtual format::Buffer &GetConsumerBuffer(format::Buffer &buffer); + virtual void Init(const size_t numAggregators, const size_t subStreams, + helper::Comm const &parentComm); /** closes current aggregator, frees m_Comm */ - void Close(); + virtual void Close(); protected: /** Init m_Comm splitting assigning ranks to subStreams (balanced except for diff --git a/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp b/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp index eea9c50f15..b81be2ba7b 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp +++ b/source/adios2/toolkit/aggregator/mpi/MPIChain.cpp @@ -18,8 +18,11 @@ namespace aggregator MPIChain::MPIChain() : MPIAggregator() {} -void MPIChain::Init(const size_t subStreams, helper::Comm const &parentComm) +void MPIChain::Init(const size_t numAggregators, const size_t subStreams, + helper::Comm const &parentComm) { + /* numAggregators ignored here as BP3/BP4 uses substreams = aggregators */ + m_NumAggregators = subStreams; if (subStreams > 0) { InitComm(subStreams, parentComm); @@ -39,6 +42,8 @@ void MPIChain::Init(const size_t subStreams, helper::Comm const &parentComm) } } +void MPIChain::Close() { MPIAggregator::Close(); } + MPIChain::ExchangeRequests MPIChain::IExchange(format::Buffer &buffer, const int step) { diff --git a/source/adios2/toolkit/aggregator/mpi/MPIChain.h b/source/adios2/toolkit/aggregator/mpi/MPIChain.h index a26895d8d2..9a8570c243 100644 --- a/source/adios2/toolkit/aggregator/mpi/MPIChain.h +++ b/source/adios2/toolkit/aggregator/mpi/MPIChain.h @@ -22,27 +22,44 @@ class MPIChain : public MPIAggregator { public: + /* Chain aggregator used by BP3/BP4 */ MPIChain(); ~MPIChain() = default; - void Init(const size_t subStreams, helper::Comm const &parentComm) final; + void Init(const size_t numAggregators, const size_t subStreams, + helper::Comm const &parentComm) final; - ExchangeRequests IExchange(format::Buffer &buffer, const int step) final; + void Close() final; - ExchangeAbsolutePositionRequests - IExchangeAbsolutePosition(format::Buffer &buffer, const int step) final; + struct ExchangeRequests + { + helper::Comm::Req m_SendSize; + helper::Comm::Req m_SendData; + helper::Comm::Req m_RecvData; + }; + + ExchangeRequests IExchange(format::Buffer &buffer, const int step); - void Wait(ExchangeRequests &requests, const int step) final; + struct ExchangeAbsolutePositionRequests + { + helper::Comm::Req m_Send; + helper::Comm::Req m_Recv; + }; + + ExchangeAbsolutePositionRequests + IExchangeAbsolutePosition(format::Buffer &buffer, const int step); void WaitAbsolutePosition(ExchangeAbsolutePositionRequests &requests, - const int step) final; + const int step); + + void Wait(ExchangeRequests &requests, const int step); - void SwapBuffers(const int step) noexcept final; + void SwapBuffers(const int step) noexcept; - void ResetBuffers() noexcept final; + void ResetBuffers() noexcept; - format::Buffer &GetConsumerBuffer(format::Buffer &buffer) final; + format::Buffer &GetConsumerBuffer(format::Buffer &buffer); private: bool m_IsInExchangeAbsolutePosition = false; diff --git a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp new file mode 100644 index 0000000000..6f1fb04ee3 --- /dev/null +++ b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.cpp @@ -0,0 +1,392 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * MPIShmChain.h + * + * Created on: July 5, 2021 + * Author: Norbert Podhorszki pnorbert@ornl.gov + */ +#include "MPIShmChain.h" + +#include + +namespace adios2 +{ +namespace aggregator +{ + +MPIShmChain::MPIShmChain() : MPIAggregator() {} + +MPIShmChain::~MPIShmChain() +{ + Close(); + /*if (m_IsActive) + { + m_NodeComm.Free("free per-node comm in ~MPIShmChain()"); + m_OnePerNodeComm.Free("free chain of nodes in ~MPIShmChain()"); + m_AllAggregatorsComm.Free( + "free comm of all aggregators in ~MPIShmChain()"); + m_AggregatorChainComm.Free( + "free chains of aggregators in ~MPIShmChain()"); + }*/ +} + +void MPIShmChain::Close() +{ + if (m_IsActive) + { + m_NodeComm.Free("free per-node comm in ~MPIShmChain()"); + m_OnePerNodeComm.Free("free chain of nodes in ~MPIShmChain()"); + m_AllAggregatorsComm.Free( + "free comm of all aggregators in ~MPIShmChain()"); + m_AggregatorChainComm.Free( + "free chains of aggregators in ~MPIShmChain()"); + } + MPIAggregator::Close(); +} + +size_t MPIShmChain::PreInit(helper::Comm const &parentComm) +{ + /* Communicator connecting ranks on each Compute Node */ + m_NodeComm = parentComm.GroupByShm("creating per-node comm at Open"); + int NodeRank = m_NodeComm.Rank(); + + /* + * Communicators connecting rank N of each node + * We are only interested in the chain of rank 0s + */ + int color = (NodeRank ? 1 : 0); + m_OnePerNodeComm = + parentComm.Split(color, 0, "creating chain of nodes at Open"); + + /* Number of nodes */ + if (!NodeRank) + { + m_NumNodes = static_cast(m_OnePerNodeComm.Size()); + } + m_NumNodes = m_NodeComm.BroadcastValue(m_NumNodes, 0); + PreInitCalled = true; + return m_NumNodes; +} + +void MPIShmChain::Init(const size_t numAggregators, const size_t subStreams, + helper::Comm const &parentComm) +{ + if (!PreInitCalled) + { + PreInit(parentComm); + } + + // int AllRank = parentComm.Rank(); + // int AllSize = parentComm.Size(); + int NodeRank = m_NodeComm.Rank(); + size_t NodeSize = static_cast(m_NodeComm.Size()); + + /* Number of aggregators per node */ + size_t aggregatorPerNode = numAggregators / m_NumNodes; + if (aggregatorPerNode == 0) + { + aggregatorPerNode = 1; /* default */ + } + if (aggregatorPerNode > NodeSize) + { + aggregatorPerNode = NodeSize; + } + + /* Create main communicator that splits the node comm into one or more + * aggregator chains */ + float k = + static_cast(NodeSize) / static_cast(aggregatorPerNode); + float c = static_cast(NodeRank) / k; + int color = static_cast(c); + m_Comm = m_NodeComm.Split(color, 0, "creating aggregator groups at Open"); + m_Rank = m_Comm.Rank(); + m_Size = m_Comm.Size(); + if (m_Rank != 0) + { + m_IsAggregator = false; + m_IsMasterAggregator = false; + } + + /* Identify parent rank of aggregator process within each chain */ + if (!m_Rank) + { + m_AggregatorRank = parentComm.Rank(); + } + m_AggregatorRank = m_Comm.BroadcastValue(m_AggregatorRank, 0); + + /* Communicator for all Aggregators */ + color = (m_Rank ? 1 : 0); + m_AllAggregatorsComm = + parentComm.Split(color, 0, "creating comm of all aggregators at Open"); + + /* Total number of aggregators */ + if (!NodeRank) + { + m_NumAggregators = static_cast(m_AllAggregatorsComm.Size()); + } + m_NumAggregators = m_NodeComm.BroadcastValue(m_NumAggregators); + + /* Number of substreams */ + m_SubStreams = subStreams; + if (m_SubStreams == 0) + { + m_SubStreams = m_NumAggregators; /* default */ + } + if (m_SubStreams > m_NumAggregators) + { + m_SubStreams = m_NumAggregators; + } + + if (!m_Rank) + { + k = static_cast(m_NumAggregators) / + static_cast(m_SubStreams); + /* 1.0 <= k <= m_NumAggregators */ + c = static_cast(m_AllAggregatorsComm.Rank()) / k; + m_SubStreamIndex = static_cast(c); + } + m_SubStreamIndex = m_Comm.BroadcastValue(m_SubStreamIndex); + + /* Create the communicator to connect aggregators writing to the same + * substream */ + color = static_cast(m_SubStreamIndex); + m_AggregatorChainComm = m_AllAggregatorsComm.Split( + color, 0, "creating chains of aggregators at Open"); + + if (m_AggregatorChainComm.Rank() != 0) + { + m_IsMasterAggregator = false; + } + + m_IsActive = true; + + helper::Comm::Req sendRequest = HandshakeLinks_Start(); + + /* Create the shared memory segment */ + if (m_Comm.Size() > 1) + { + CreateShm(); + } + + HandshakeLinks_Complete(sendRequest); +} + +// PRIVATE +helper::Comm::Req MPIShmChain::HandshakeLinks_Start() +{ + int link = -1; + + helper::Comm::Req sendRequest; + if (m_Rank > 0) // send + { + sendRequest = m_Comm.Isend( + &m_Rank, 1, m_Rank - 1, 0, + "Isend handshake with neighbor, MPIChain aggregator, at Open"); + } + + if (m_Rank < m_Size - 1) // receive + { + helper::Comm::Req receiveRequest = m_Comm.Irecv( + &link, 1, m_Rank + 1, 0, + "Irecv handshake with neighbor, MPIChain aggregator, at Open"); + + receiveRequest.Wait("Irecv Wait handshake with neighbor, MPIChain " + "aggregator, at Open"); + } + + if (m_Rank > 0) + { + sendRequest.Wait("Isend wait handshake with neighbor, MPIChain " + "aggregator, at Open"); + } + return sendRequest; +} + +void MPIShmChain::HandshakeLinks_Complete(helper::Comm::Req &req) +{ + if (m_Rank > 0) + { + req.Wait("Isend wait handshake with neighbor, MPIChain " + "aggregator, at Open"); + } +} + +void MPIShmChain::CreateShm() +{ + void *ptr; + if (!m_Rank) + { + m_Win = m_Comm.Win_allocate_shared(sizeof(ShmSegment), 1, &ptr); + } + + if (m_Rank) + { + m_Win = m_Comm.Win_allocate_shared(0, 1, &ptr); + size_t shmsize; + int disp_unit; + m_Comm.Win_shared_query(m_Win, 0, &shmsize, &disp_unit, &ptr); + } + m_Shm = reinterpret_cast(ptr); + m_Shm->producerBuffer = LastBufferUsed::None; + m_Shm->consumerBuffer = LastBufferUsed::None; + m_Shm->NumBuffersFull = 0; + m_Shm->sdbA.buf = nullptr; + m_Shm->sdbA.max_size = SHM_BUF_SIZE; + m_Shm->sdbB.buf = nullptr; + m_Shm->sdbB.max_size = SHM_BUF_SIZE; + + /*std::cout << "Rank " << m_Rank << " shm = " << ptr + << " bufA = " << static_cast(m_Shm->bufA) + << " bufB = " << static_cast(m_Shm->bufB) << std::endl;*/ +} + +void MPIShmChain::DestroyShm() { m_Comm.Win_free(m_Win); } + +/* + The buffering strategy is the following. + Assumptions: 1. Only one Producer (and one Consumer) is active at a time. + + The first Producer fills buffer A first then B and then is always + alternating, blocking when Consumer is behind (NumBuffersFull == 2). + The next Producer will continue with the alternating pattern where the + previous Producer has finished. + + The Consumer is blocked until there is at least one buffer available. + It takes buffer A at the first call, then it alternates between the two + buffers. + + MPI_Win locking is used to modify the m_Shm variables exclusively (very short + time) C++ atomic locks are used to give long term exclusive access to one + buffer to be filled or consumed. + + The sleeping phases, to wait on the other party to catch up, are outside of + the locking code areas. + + Note: the m_Shm->sdbX.buf pointers must be set on the local process every + time, even tough it is stored on the shared memory segment, because the + address of the segment is different on every process. Failing to set on the + local process causes this pointer pointing to an invalid address (set on + another process). + + Note: the sdbA and sdbB structs are stored on the shared memory segment + because they contain 'actual_size' which is set on the Producer and used by + the Consumer. + +*/ + +MPIShmChain::ShmDataBuffer *MPIShmChain::LockProducerBuffer() +{ + MPIShmChain::ShmDataBuffer *sdb = nullptr; + + // Sleep until there is a buffer available at all + while (m_Shm->NumBuffersFull == 2) + { + std::this_thread::sleep_for(std::chrono::duration(0.00001)); + } + + m_Comm.Win_Lock(helper::Comm::LockType::Exclusive, 0, 0, m_Win); + if (m_Shm->producerBuffer == LastBufferUsed::A) + + { + m_Shm->producerBuffer = LastBufferUsed::B; + sdb = &m_Shm->sdbB; + // point to shm data buffer (in local process memory) + sdb->buf = m_Shm->bufB; + } + else // None or B + { + m_Shm->producerBuffer = LastBufferUsed::A; + sdb = &m_Shm->sdbA; + // point to shm data buffer (in local process memory) + sdb->buf = m_Shm->bufA; + } + m_Comm.Win_Unlock(0, m_Win); + + // We determined we want a specific buffer + // Now we need to get a lock on it in case consumer is using it + if (m_Shm->producerBuffer == LastBufferUsed::A) + { + m_Shm->lockA.lock(); + } + else + { + m_Shm->lockB.lock(); + } + + return sdb; +} + +void MPIShmChain::UnlockProducerBuffer() +{ + if (m_Shm->producerBuffer == LastBufferUsed::A) + { + m_Shm->lockA.unlock(); + } + else + { + m_Shm->lockB.unlock(); + } + ++m_Shm->NumBuffersFull; +} + +MPIShmChain::ShmDataBuffer *MPIShmChain::LockConsumerBuffer() +{ + MPIShmChain::ShmDataBuffer *sdb = nullptr; + + // Sleep until there is at least one buffer filled + while (m_Shm->NumBuffersFull < 1) + { + std::this_thread::sleep_for(std::chrono::duration(0.00001)); + } + // At this point we know buffer A has content or going to have content + // when we successfully lock it + + m_Comm.Win_Lock(helper::Comm::LockType::Exclusive, 0, 0, m_Win); + if (m_Shm->consumerBuffer == LastBufferUsed::A) + + { + m_Shm->consumerBuffer = LastBufferUsed::B; + sdb = &m_Shm->sdbB; + // point to shm data buffer (in local process memory) + sdb->buf = m_Shm->bufB; + } + else // None or B + { + m_Shm->consumerBuffer = LastBufferUsed::A; + sdb = &m_Shm->sdbA; + // point to shm data buffer (in local process memory) + sdb->buf = m_Shm->bufA; + } + m_Comm.Win_Unlock(0, m_Win); + + // We determined we want a specific buffer + // Now we need to get a lock on it in case producer is using it + if (m_Shm->consumerBuffer == LastBufferUsed::A) + { + m_Shm->lockA.lock(); + } + else + { + m_Shm->lockB.lock(); + } + + return sdb; +} + +void MPIShmChain::UnlockConsumerBuffer() +{ + if (m_Shm->consumerBuffer == LastBufferUsed::A) + { + m_Shm->lockA.unlock(); + } + else + { + m_Shm->lockB.unlock(); + } + --m_Shm->NumBuffersFull; +} + +} // end namespace aggregator +} // end namespace adios2 diff --git a/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h new file mode 100644 index 0000000000..35dd9fdbde --- /dev/null +++ b/source/adios2/toolkit/aggregator/mpi/MPIShmChain.h @@ -0,0 +1,168 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * MPIShmChain.h + * + * Created on: July 5, 2021 + * Author: Norbert Podhorszki pnorbert@ornl.gov + * + */ + +#ifndef ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPISHMCHAIN_H_ +#define ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPISHMCHAIN_H_ + +#include "adios2/common/ADIOSConfig.h" +#include "adios2/toolkit/aggregator/mpi/MPIAggregator.h" + +#include +#include +#include + +namespace adios2 +{ +namespace aggregator +{ + +class Spinlock +{ + /* from + * https://wang-yimu.com/a-tutorial-on-shared-memory-inter-process-communication + */ +public: + Spinlock() { flag_.clear(); } + void lock() + { + while (!try_lock()) + { + std::this_thread::sleep_for(std::chrono::duration(0.00001)); + } + } + void unlock() { flag_.clear(); } + +private: + inline bool try_lock() { return !flag_.test_and_set(); } + std::atomic_flag flag_; //{ATOMIC_FLAG_INIT}; +}; + +constexpr size_t SHM_BUF_SIZE = 4194304; // 4MB +// we allocate 2x this size + a bit for shared memory segment + +/** A one- or two-layer aggregator chain for using Shared memory within a + * compute node. + * Use MPI split type to group processes within a node into one chain. + * Depending on the number of aggregators, multiple nodes may be merged into + * a two-layer chain, (multiple Aggregators and one Master). + * Otherwise its a simple chain with one Aggregator=Master. + * + * m_Comm is the communicator that split for each node + * + */ + +class MPIShmChain : public MPIAggregator +{ + +public: + MPIShmChain(); + + ~MPIShmChain(); + + /* Create a per-node communicator and return number of nodes */ + size_t PreInit(helper::Comm const &parentComm); + + void Init(const size_t numAggregators, const size_t subStreams, + helper::Comm const &parentComm) final; + + void Close() final; + + /** + * true: the Master (aggregator) process in the chain + * always m_Rank == m_Comm.Rank() == 0 for a master aggregator + * same as (m_AggregatorChainComm.Rank() == 0) + */ + bool m_IsMasterAggregator = true; + + /* + Variables set in PreInit + */ + + bool PreInitCalled = false; + /* Communicator per compute node */ + helper::Comm m_NodeComm; + /* Communicator connecting rank 0 on each node + Useful only on rank 0s of m_NodeComm */ + helper::Comm m_OnePerNodeComm; + /* Number of Compute Nodes + * (size of m_OnePerNodeComm created from rank 0s of m_NodeComm) + */ + size_t m_NumNodes; + + /* + Variables set in Init + */ + + /* Communicator connecting all aggregators + (rank 0 of each aggregator group) + Useful only on aggregators themselves + */ + helper::Comm m_AllAggregatorsComm; + + /* Communicator connecting the aggregators + that write to the same substream. + rank 0 becomes a MasterAggregator + Useful only on aggregators themselves + */ + helper::Comm m_AggregatorChainComm; + + struct ShmDataBuffer + { + size_t max_size; // max size for buf + size_t actual_size; // size of actual content + // points to data buffer in shared memory + // Warning: This is a different address on every process + char *buf; + }; + + ShmDataBuffer *LockProducerBuffer(); + void UnlockProducerBuffer(); + ShmDataBuffer *LockConsumerBuffer(); + void UnlockConsumerBuffer(); + void ResetBuffers() noexcept; + +private: + helper::Comm::Req HandshakeLinks_Start(); + void HandshakeLinks_Complete(helper::Comm::Req &req); + + helper::Comm::Win m_Win; + void CreateShm(); + void DestroyShm(); + + enum class LastBufferUsed + { + None, + A, + B + }; + + struct ShmSegment + { + LastBufferUsed producerBuffer; + LastBufferUsed consumerBuffer; + unsigned int NumBuffersFull; + // user facing structs + ShmDataBuffer sdbA; + ShmDataBuffer sdbB; + // locks for individual buffers (sdb and buf) + aggregator::Spinlock lockA; + aggregator::Spinlock lockB; + // the actual data buffers + char bufA[SHM_BUF_SIZE]; + char bufB[SHM_BUF_SIZE]; + }; + ShmSegment *m_Shm; +}; + +} // end namespace aggregator +} // end namespace adios2 + +#endif /* ADIOS2_TOOLKIT_AGGREGATOR_MPI_MPISHMCHAIN_H_ */ diff --git a/source/adios2/toolkit/transport/Transport.h b/source/adios2/toolkit/transport/Transport.h index 9e0a87176c..688febe86b 100644 --- a/source/adios2/toolkit/transport/Transport.h +++ b/source/adios2/toolkit/transport/Transport.h @@ -75,7 +75,7 @@ class Transport * @param chainComm * @param async */ - virtual void OpenChain(const std::string &name, const Mode openMode, + virtual void OpenChain(const std::string &name, Mode openMode, const helper::Comm &chainComm, const bool async = false); diff --git a/source/adios2/toolkit/transport/file/FileFStream.cpp b/source/adios2/toolkit/transport/file/FileFStream.cpp index 50e7169567..c71e5fdfe7 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.cpp +++ b/source/adios2/toolkit/transport/file/FileFStream.cpp @@ -101,6 +101,96 @@ void FileFStream::Open(const std::string &name, const Mode openMode, } } +void FileFStream::OpenChain(const std::string &name, Mode openMode, + const helper::Comm &chainComm, const bool async) +{ + auto lf_AsyncOpenWrite = [&](const std::string &name) -> void { + ProfilerStart("open"); + m_FileStream.open(name, std::fstream::out | std::fstream::binary | + std::fstream::trunc); + ProfilerStop("open"); + }; + + bool createOnAppend = true; + int token = 1; + m_Name = name; + CheckName(); + + if (chainComm.Rank() > 0) + { + chainComm.Recv(&token, 1, chainComm.Rank() - 1, 0, + "Chain token in FileFStream::OpenChain"); + if (openMode == Mode::Write) + { + openMode = Mode::Append; + createOnAppend = false; + } + } + + m_OpenMode = openMode; + switch (m_OpenMode) + { + case (Mode::Write): + if (async && chainComm.Size() == 1) + { + m_IsOpening = true; + m_OpenFuture = + std::async(std::launch::async, lf_AsyncOpenWrite, name); + } + else + { + ProfilerStart("open"); + m_FileStream.open(name, std::fstream::out | std::fstream::binary | + std::fstream::trunc); + ProfilerStop("open"); + } + break; + + case (Mode::Append): + ProfilerStart("open"); + + if (createOnAppend) + { + m_FileStream.open(name, std::fstream::in | std::fstream::out | + std::fstream::binary); + m_FileStream.seekp(0, std::ios_base::end); + } + else + { + m_FileStream.open(name, std::fstream::in | std::fstream::out | + std::fstream::binary); + m_FileStream.seekp(0, std::ios_base::beg); + } + + ProfilerStop("open"); + break; + + case (Mode::Read): + ProfilerStart("open"); + m_FileStream.open(name, std::fstream::in | std::fstream::binary); + ProfilerStop("open"); + break; + + default: + CheckFile("unknown open mode for file " + m_Name + + ", in call to stream open"); + } + + if (!m_IsOpening) + { + CheckFile( + "couldn't open file " + m_Name + + ", check permissions or path existence, in call to fstream open"); + m_IsOpen = true; + } + + if (chainComm.Rank() < chainComm.Size() - 1) + { + chainComm.Isend(&token, 1, chainComm.Rank() + 1, 0, + "Sending Chain token in FileFStream::OpenChain"); + } +} + void FileFStream::SetBuffer(char *buffer, size_t size) { if (!buffer && size != 0) diff --git a/source/adios2/toolkit/transport/file/FileFStream.h b/source/adios2/toolkit/transport/file/FileFStream.h index 1c44e9bd9b..9e47c4b2e8 100644 --- a/source/adios2/toolkit/transport/file/FileFStream.h +++ b/source/adios2/toolkit/transport/file/FileFStream.h @@ -35,6 +35,10 @@ class FileFStream : public Transport void Open(const std::string &name, const Mode openMode, const bool async = false) final; + void OpenChain(const std::string &name, Mode openMode, + const helper::Comm &chainComm, + const bool async = false) final; + void SetBuffer(char *buffer, size_t size) final; void Write(const char *buffer, size_t size, size_t start = MaxSizeT) final; diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.cpp b/source/adios2/toolkit/transport/file/FilePOSIX.cpp index f2fabfeb31..7ec300be4a 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.cpp +++ b/source/adios2/toolkit/transport/file/FilePOSIX.cpp @@ -18,6 +18,8 @@ #include // open #include // write, close +#include + /// \cond EXCLUDE_FROM_DOXYGEN #include //std::ios_base::failure /// \endcond @@ -120,7 +122,7 @@ void FilePOSIX::Open(const std::string &name, const Mode openMode, } } -void FilePOSIX::OpenChain(const std::string &name, const Mode openMode, +void FilePOSIX::OpenChain(const std::string &name, Mode openMode, const helper::Comm &chainComm, const bool async) { auto lf_AsyncOpenWrite = [&](const std::string &name) -> int { @@ -132,6 +134,7 @@ void FilePOSIX::OpenChain(const std::string &name, const Mode openMode, return FD; }; + bool createOnAppend = true; int token = 1; m_Name = name; CheckName(); @@ -140,6 +143,11 @@ void FilePOSIX::OpenChain(const std::string &name, const Mode openMode, { chainComm.Recv(&token, 1, chainComm.Rank() - 1, 0, "Chain token in FilePOSIX::OpenChain"); + if (openMode == Mode::Write) + { + openMode = Mode::Append; + createOnAppend = false; + } } m_OpenMode = openMode; @@ -147,8 +155,9 @@ void FilePOSIX::OpenChain(const std::string &name, const Mode openMode, { case (Mode::Write): - if (async) + if (async && chainComm.Size() == 1) { + // only single process open can do it asynchronously m_IsOpening = true; m_OpenFuture = std::async(std::launch::async, lf_AsyncOpenWrite, name); @@ -167,9 +176,19 @@ void FilePOSIX::OpenChain(const std::string &name, const Mode openMode, case (Mode::Append): ProfilerStart("open"); errno = 0; - // m_FileDescriptor = open(m_Name.c_str(), O_RDWR); - m_FileDescriptor = open(m_Name.c_str(), O_RDWR | O_CREAT, 0777); - lseek(m_FileDescriptor, 0, SEEK_END); + + if (createOnAppend) + { + m_FileDescriptor = open(m_Name.c_str(), O_RDWR | O_CREAT, 0777); + lseek(m_FileDescriptor, 0, SEEK_END); + } + else + { + /* This case runs on rank > 0 when called with Write mode */ + m_FileDescriptor = open(m_Name.c_str(), O_RDWR); + lseek(m_FileDescriptor, 0, SEEK_SET); + } + m_Errno = errno; ProfilerStop("open"); break; @@ -243,6 +262,11 @@ void FilePOSIX::Write(const char *buffer, size_t size, size_t start) ", in call to POSIX lseek" + SysErrMsg()); } } + else + { + const auto pos = lseek(m_FileDescriptor, 0, SEEK_CUR); + start = static_cast(pos); + } if (size > DefaultMaxFileBatchSize) { diff --git a/source/adios2/toolkit/transport/file/FilePOSIX.h b/source/adios2/toolkit/transport/file/FilePOSIX.h index 8d9b0300d2..5e6cb7f94a 100644 --- a/source/adios2/toolkit/transport/file/FilePOSIX.h +++ b/source/adios2/toolkit/transport/file/FilePOSIX.h @@ -37,7 +37,7 @@ class FilePOSIX : public Transport void Open(const std::string &name, const Mode openMode, const bool async = false) final; - void OpenChain(const std::string &name, const Mode openMode, + void OpenChain(const std::string &name, Mode openMode, const helper::Comm &chainComm, const bool async = false) final;