From 07fb1879a6ca1717978f0cc35d464e46490b1952 Mon Sep 17 00:00:00 2001 From: Norbert Podhorszki Date: Fri, 11 Aug 2023 06:51:41 -0400 Subject: [PATCH] Organize the processes into groups so that the two steps of metadata aggregation has more or less the same number or participants. This replaces in-node aggregation in first step. The new strategy balances the size of metadata gathered in the two steps. --- source/adios2/engine/bp5/BP5Writer.cpp | 53 +++++++++++++++----------- source/adios2/engine/bp5/BP5Writer.h | 5 +++ 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/source/adios2/engine/bp5/BP5Writer.cpp b/source/adios2/engine/bp5/BP5Writer.cpp index 6888573ca2..caa4f09f73 100644 --- a/source/adios2/engine/bp5/BP5Writer.cpp +++ b/source/adios2/engine/bp5/BP5Writer.cpp @@ -574,14 +574,14 @@ void BP5Writer::EndStep() TSInfo.NewMetaMetaBlocks, {m}, {a}, {m_ThisTimestepDataSize}, {m_StartDataPos}); - if (m_Aggregator->m_Comm.Size() > 1) + if (m_AggregatorMetadata.m_Comm.Size() > 1) { // level 1 m_Profiler.Start("meta_gather1"); size_t LocalSize = MetaBuffer.size(); std::vector RecvCounts = - m_Aggregator->m_Comm.GatherValues(LocalSize, 0); + m_AggregatorMetadata.m_Comm.GatherValues(LocalSize, 0); std::vector RecvBuffer; - if (m_Aggregator->m_Comm.Rank() == 0) + if (m_AggregatorMetadata.m_Comm.Rank() == 0) { uint64_t TotalSize = 0; for (auto &n : RecvCounts) @@ -591,11 +591,11 @@ void BP5Writer::EndStep() << TotalSize << " bytes from aggregator group" << std::endl;*/ } - m_Aggregator->m_Comm.GathervArrays(MetaBuffer.data(), LocalSize, - RecvCounts.data(), RecvCounts.size(), - RecvBuffer.data(), 0); + m_AggregatorMetadata.m_Comm.GathervArrays( + MetaBuffer.data(), LocalSize, RecvCounts.data(), RecvCounts.size(), + RecvBuffer.data(), 0); m_Profiler.Stop("meta_gather1"); - if (m_Aggregator->m_Comm.Rank() == 0) + if (m_AggregatorMetadata.m_Comm.Rank() == 0) { std::vector UniqueMetaMetaBlocks; @@ -615,17 +615,17 @@ void BP5Writer::EndStep() m_Profiler.Stop("meta_lvl1"); m_Profiler.Start("meta_lvl2"); // level 2 - if (m_Aggregator->m_Comm.Rank() == 0) + if (m_AggregatorMetadata.m_Comm.Rank() == 0) { std::vector RecvBuffer; std::vector *buf; std::vector RecvCounts; size_t LocalSize = MetaBuffer.size(); - if (m_CommAggregators.Size() > 1) + if (m_CommMetadataAggregators.Size() > 1) { m_Profiler.Start("meta_gather2"); - RecvCounts = m_CommAggregators.GatherValues(LocalSize, 0); - if (m_CommAggregators.Rank() == 0) + RecvCounts = m_CommMetadataAggregators.GatherValues(LocalSize, 0); + if (m_CommMetadataAggregators.Rank() == 0) { uint64_t TotalSize = 0; for (auto &n : RecvCounts) @@ -636,7 +636,7 @@ void BP5Writer::EndStep() << std::endl;*/ } - m_CommAggregators.GathervArrays( + m_CommMetadataAggregators.GathervArrays( MetaBuffer.data(), LocalSize, RecvCounts.data(), RecvCounts.size(), RecvBuffer.data(), 0); buf = &RecvBuffer; @@ -648,7 +648,7 @@ void BP5Writer::EndStep() RecvCounts.push_back(LocalSize); } - if (m_CommAggregators.Rank() == 0) + if (m_CommMetadataAggregators.Rank() == 0) { std::vector UniqueMetaMetaBlocks; @@ -693,12 +693,12 @@ void BP5Writer::EndStep() if (TSInfo.AttributeEncodeBuffer) { - delete TSInfo.AttributeEncodeBuffer; + delete TSInfo.AttributeEncodeBuffer; } if (TSInfo.MetaEncodeBuffer) { - delete TSInfo.MetaEncodeBuffer; + delete TSInfo.MetaEncodeBuffer; } } @@ -860,8 +860,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) switch (recordID) { - case IndexRecord::WriterMapRecord: - { + case IndexRecord::WriterMapRecord: { m_AppendWriterCount = helper::ReadValue(buffer, position, IsLittleEndian); m_AppendAggregatorCount = @@ -876,8 +875,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) position += m_AppendWriterCount * sizeof(uint64_t); break; } - case IndexRecord::StepRecord: - { + case IndexRecord::StepRecord: { position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize const uint64_t FlushCount = helper::ReadValue(buffer, position, IsLittleEndian); @@ -947,8 +945,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) switch (recordID) { - case IndexRecord::WriterMapRecord: - { + case IndexRecord::WriterMapRecord: { m_AppendWriterCount = helper::ReadValue(buffer, position, IsLittleEndian); m_AppendAggregatorCount = @@ -966,8 +963,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL) } break; } - case IndexRecord::StepRecord: - { + case IndexRecord::StepRecord: { m_AppendMetadataIndexPos = position - sizeof(unsigned char) - sizeof(uint64_t); // pos of RecordID const uint64_t MetadataPos = @@ -1061,6 +1057,17 @@ void BP5Writer::InitAggregator() int color = m_Aggregator->m_Comm.Rank(); m_CommAggregators = m_Comm.Split(color, 0, "creating level 2 chain of aggregators at Open"); + + /* Metadata aggregator for two-level metadata aggregation */ + { + size_t n = static_cast(m_Comm.Size()); + size_t a = (int)floor(sqrt((double)n)); + m_AggregatorMetadata.Init(a, a, m_Comm); + /* chain of rank 0s form the second level of aggregation */ + int color = m_AggregatorMetadata.m_Comm.Rank(); + m_CommMetadataAggregators = m_Comm.Split( + color, 0, "creating level 2 chain of aggregators at Open"); + } } void BP5Writer::InitTransports() diff --git a/source/adios2/engine/bp5/BP5Writer.h b/source/adios2/engine/bp5/BP5Writer.h index 829138b292..2517a5e50f 100644 --- a/source/adios2/engine/bp5/BP5Writer.h +++ b/source/adios2/engine/bp5/BP5Writer.h @@ -204,6 +204,11 @@ class BP5Writer : public BP5Engine, public core::Engine helper::Comm *DataWritingComm; // processes that write the same data file // aggregators only (valid if m_Aggregator->m_Comm.Rank() == 0) helper::Comm m_CommAggregators; + + /* two-level metadata aggregation */ + aggregator::MPIChain m_AggregatorMetadata; // first level + helper::Comm m_CommMetadataAggregators; // second level + adios2::profiling::JSONProfiler m_Profiler; protected: