Skip to content

Commit

Permalink
Merge pull request ornladios#3750 from pnorbert/bp5-balanced-two-leve…
Browse files Browse the repository at this point in the history
…l-metadata-aggregation

Organize the processes into groups so that the two steps of metadata …
  • Loading branch information
pnorbert authored Aug 11, 2023
2 parents d143154 + 07fb187 commit 2d5d936
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
53 changes: 30 additions & 23 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -574,14 +574,14 @@ void BP5Writer::EndStep()
TSInfo.NewMetaMetaBlocks, {m}, {a}, {m_ThisTimestepDataSize},
{m_StartDataPos});

if (m_Aggregator->m_Comm.Size() > 1)
if (m_AggregatorMetadata.m_Comm.Size() > 1)
{ // level 1
m_Profiler.Start("meta_gather1");
size_t LocalSize = MetaBuffer.size();
std::vector<size_t> RecvCounts =
m_Aggregator->m_Comm.GatherValues(LocalSize, 0);
m_AggregatorMetadata.m_Comm.GatherValues(LocalSize, 0);
std::vector<char> RecvBuffer;
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
uint64_t TotalSize = 0;
for (auto &n : RecvCounts)
Expand All @@ -591,11 +591,11 @@ void BP5Writer::EndStep()
<< TotalSize << " bytes from aggregator group"
<< std::endl;*/
}
m_Aggregator->m_Comm.GathervArrays(MetaBuffer.data(), LocalSize,
RecvCounts.data(), RecvCounts.size(),
RecvBuffer.data(), 0);
m_AggregatorMetadata.m_Comm.GathervArrays(
MetaBuffer.data(), LocalSize, RecvCounts.data(), RecvCounts.size(),
RecvBuffer.data(), 0);
m_Profiler.Stop("meta_gather1");
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
std::vector<format::BP5Base::MetaMetaInfoBlock>
UniqueMetaMetaBlocks;
Expand All @@ -615,17 +615,17 @@ void BP5Writer::EndStep()
m_Profiler.Stop("meta_lvl1");
m_Profiler.Start("meta_lvl2");
// level 2
if (m_Aggregator->m_Comm.Rank() == 0)
if (m_AggregatorMetadata.m_Comm.Rank() == 0)
{
std::vector<char> RecvBuffer;
std::vector<char> *buf;
std::vector<size_t> RecvCounts;
size_t LocalSize = MetaBuffer.size();
if (m_CommAggregators.Size() > 1)
if (m_CommMetadataAggregators.Size() > 1)
{
m_Profiler.Start("meta_gather2");
RecvCounts = m_CommAggregators.GatherValues(LocalSize, 0);
if (m_CommAggregators.Rank() == 0)
RecvCounts = m_CommMetadataAggregators.GatherValues(LocalSize, 0);
if (m_CommMetadataAggregators.Rank() == 0)
{
uint64_t TotalSize = 0;
for (auto &n : RecvCounts)
Expand All @@ -636,7 +636,7 @@ void BP5Writer::EndStep()
<< std::endl;*/
}

m_CommAggregators.GathervArrays(
m_CommMetadataAggregators.GathervArrays(
MetaBuffer.data(), LocalSize, RecvCounts.data(),
RecvCounts.size(), RecvBuffer.data(), 0);
buf = &RecvBuffer;
Expand All @@ -648,7 +648,7 @@ void BP5Writer::EndStep()
RecvCounts.push_back(LocalSize);
}

if (m_CommAggregators.Rank() == 0)
if (m_CommMetadataAggregators.Rank() == 0)
{
std::vector<format::BP5Base::MetaMetaInfoBlock>
UniqueMetaMetaBlocks;
Expand Down Expand Up @@ -693,12 +693,12 @@ void BP5Writer::EndStep()

if (TSInfo.AttributeEncodeBuffer)
{
delete TSInfo.AttributeEncodeBuffer;
delete TSInfo.AttributeEncodeBuffer;
}

if (TSInfo.MetaEncodeBuffer)
{
delete TSInfo.MetaEncodeBuffer;
delete TSInfo.MetaEncodeBuffer;
}
}

Expand Down Expand Up @@ -860,8 +860,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)

switch (recordID)
{
case IndexRecord::WriterMapRecord:
{
case IndexRecord::WriterMapRecord: {
m_AppendWriterCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendAggregatorCount =
Expand All @@ -876,8 +875,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
position += m_AppendWriterCount * sizeof(uint64_t);
break;
}
case IndexRecord::StepRecord:
{
case IndexRecord::StepRecord: {
position += 2 * sizeof(uint64_t); // MetadataPos, MetadataSize
const uint64_t FlushCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
Expand Down Expand Up @@ -947,8 +945,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)

switch (recordID)
{
case IndexRecord::WriterMapRecord:
{
case IndexRecord::WriterMapRecord: {
m_AppendWriterCount =
helper::ReadValue<uint64_t>(buffer, position, IsLittleEndian);
m_AppendAggregatorCount =
Expand All @@ -966,8 +963,7 @@ uint64_t BP5Writer::CountStepsInMetadataIndex(format::BufferSTL &bufferSTL)
}
break;
}
case IndexRecord::StepRecord:
{
case IndexRecord::StepRecord: {
m_AppendMetadataIndexPos = position - sizeof(unsigned char) -
sizeof(uint64_t); // pos of RecordID
const uint64_t MetadataPos =
Expand Down Expand Up @@ -1061,6 +1057,17 @@ void BP5Writer::InitAggregator()
int color = m_Aggregator->m_Comm.Rank();
m_CommAggregators =
m_Comm.Split(color, 0, "creating level 2 chain of aggregators at Open");

/* Metadata aggregator for two-level metadata aggregation */
{
size_t n = static_cast<size_t>(m_Comm.Size());
size_t a = (int)floor(sqrt((double)n));
m_AggregatorMetadata.Init(a, a, m_Comm);
/* chain of rank 0s form the second level of aggregation */
int color = m_AggregatorMetadata.m_Comm.Rank();
m_CommMetadataAggregators = m_Comm.Split(
color, 0, "creating level 2 chain of aggregators at Open");
}
}

void BP5Writer::InitTransports()
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class BP5Writer : public BP5Engine, public core::Engine
helper::Comm *DataWritingComm; // processes that write the same data file
// aggregators only (valid if m_Aggregator->m_Comm.Rank() == 0)
helper::Comm m_CommAggregators;

/* two-level metadata aggregation */
aggregator::MPIChain m_AggregatorMetadata; // first level
helper::Comm m_CommMetadataAggregators; // second level

adios2::profiling::JSONProfiler m_Profiler;

protected:
Expand Down

0 comments on commit 2d5d936

Please sign in to comment.