Skip to content

Commit

Permalink
Merge pull request #2807 from eisenhauer/BP5Flush
Browse files Browse the repository at this point in the history
BP5 flush data
  • Loading branch information
eisenhauer authored Jul 31, 2021
2 parents 7adcf1c + 4db4fe2 commit c5d6778
Show file tree
Hide file tree
Showing 12 changed files with 265 additions and 102 deletions.
17 changes: 12 additions & 5 deletions source/adios2/engine/bp5/BP5Engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,18 @@ class BP5Engine
* BP5 header for "Index Table" (64 bytes)
* for each Writer, what aggregator writes its data
* uint16_t [ WriterCount]
* for each timestep:
* uint64_t 0 : CombinedMetaDataPos
* uint64_t 1 : CombinedMetaDataSize
* for each Writer
* uint64_t DataPos (in the file above)
* for each timestep: (size (WriterCount + 2 ) 64-bit ints
* uint64_t 0 : CombinedMetaDataPos
* uint64_t 1 : CombinedMetaDataSize
* uint64_t 2 : FlushCount
* for each Writer
* for each flush before the last:
* uint64_t DataPos (in the file above)
* uint64_t DataSize
* for the final flush:
* uint64_t DataPos (in the file above)
* So, each timestep takes sizeof(uint64_t)* (3 + ((FlushCount-1)*2 +
*1) * WriterCount) bytes
*
* MetaMetadata file (mmd.0) contains FFS format information
* for each meta metadata item:
Expand Down
56 changes: 49 additions & 7 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
const size_t StartOffset, const size_t Length,
char *Destination)
{
size_t DataStartPos = m_MetadataIndexTable[Timestep][2];
size_t FlushCount = m_MetadataIndexTable[Timestep][2];
size_t DataPosPos = m_MetadataIndexTable[Timestep][3];
size_t SubfileNum = m_WriterToFileMap[WriterRank];
DataStartPos += WriterRank * sizeof(uint64_t);
size_t DataStart = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, DataStartPos, m_Minifooter.IsLittleEndian);

// check if subfile is already opened
if (m_DataFileManager.m_Transports.count(SubfileNum) == 0)
{
Expand All @@ -152,7 +151,35 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
m_DataFileManager.OpenFileID(subFileName, SubfileNum, Mode::Read,
{{"transport", "File"}}, false);
}
m_DataFileManager.ReadFile(Destination, Length, DataStart + StartOffset,

size_t InfoStartPos =
DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
size_t ThisFlushInfo = InfoStartPos;
size_t RemainingLength = Length;
size_t ThisDataPos;
size_t Offset = StartOffset;
for (int flush = 0; flush < FlushCount; flush++)
{

ThisDataPos =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
size_t ThisDataSize =
helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
m_Minifooter.IsLittleEndian);
if (ThisDataSize > RemainingLength)
ThisDataSize = RemainingLength;
m_DataFileManager.ReadFile(Destination, ThisDataSize,
ThisDataPos + Offset, SubfileNum);
Destination += ThisDataSize;
RemainingLength -= ThisDataSize;
Offset = 0;
if (RemainingLength == 0)
return;
}
ThisDataPos = helper::ReadValue<uint64_t>(
m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian);
m_DataFileManager.ReadFile(Destination, RemainingLength, ThisDataPos,
SubfileNum);
}

Expand Down Expand Up @@ -601,19 +628,34 @@ void BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL,
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t MetadataSize = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);
const uint64_t FlushCount = helper::ReadValue<uint64_t>(
buffer, position, m_Minifooter.IsLittleEndian);

ptrs.push_back(MetadataPos);
ptrs.push_back(MetadataSize);
ptrs.push_back(FlushCount);
ptrs.push_back(position);
m_MetadataIndexTable[currentStep] = ptrs;
#ifdef DUMPDATALOCINFO
for (uint64_t i = 0; i < m_WriterCount; i++)
{
size_t DataPosPos = ptrs[2] + sizeof(uint64_t) * i;
size_t DataPosPos = ptrs[3];
std::cout << "Writer " << i << " data at ";
for (uint64_t j = 0; j < FlushCount; j++)
{
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
const uint64_t DataSize = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << " siz:" << DataSize << "; ";
}
const uint64_t DataPos = helper::ReadValue<uint64_t>(
buffer, DataPosPos, m_Minifooter.IsLittleEndian);
std::cout << "loc:" << DataPos << std::endl;
}
#endif

position += sizeof(uint64_t) * m_WriterCount;
position += sizeof(uint64_t) * m_WriterCount * ((2 * FlushCount) + 1);
m_StepsCount++;
currentStep++;
} while (!oneStepOnly && position < buffer.size());
Expand Down
114 changes: 78 additions & 36 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ StepStatus BP5Writer::BeginStep(StepMode mode, const float timeoutSeconds)
false /* always copy */,
m_Parameters.BufferChunkSize));
}
m_ThisTimestepDataSize = 0;

return StepStatus::OK;
}

Expand Down Expand Up @@ -242,23 +244,50 @@ void BP5Writer::WriteMetadataFileIndex(uint64_t MetaDataPos,

m_FileMetadataManager.FlushFiles();

uint64_t buf[2];
std::vector<uint64_t> buf;
buf.resize(3 + ((FlushPosSizeInfo.size() * 2) + 1) * m_Comm.Size());
buf[0] = MetaDataPos;
buf[1] = MetaDataSize;
m_FileMetadataIndexManager.WriteFiles((char *)buf, sizeof(buf));
m_FileMetadataIndexManager.WriteFiles((char *)m_WriterDataPos.data(),
m_WriterDataPos.size() *
sizeof(uint64_t));
/*std::cout << "Write Index positions = {";
for (size_t i = 0; i < m_WriterDataPos.size(); ++i)
{
std::cout << m_WriterDataPos[i];
if (i < m_WriterDataPos.size() - 1)
buf[2] = FlushPosSizeInfo.size();

uint64_t pos = 3;

for (int writer = 0; writer < m_Comm.Size(); writer++)
{
for (int flushNum = 0; flushNum < FlushPosSizeInfo.size(); flushNum++)
{
std::cout << ", ";
buf[pos + (flushNum * 2)] = FlushPosSizeInfo[flushNum][2 * writer];
buf[pos + (flushNum * 2) + 1] =
FlushPosSizeInfo[flushNum][2 * writer + 1];
}
buf[pos + FlushPosSizeInfo.size() * 2] = m_WriterDataPos[writer];
pos += (FlushPosSizeInfo.size() * 2) + 1;
}
std::cout << "}" << std::endl;*/

m_FileMetadataIndexManager.WriteFiles((char *)buf.data(),
buf.size() * sizeof(uint64_t));

#ifdef DUMPDATALOCINFO
std::cout << "Flush count is :" << FlushPosSizeInfo.size() << std::endl;
std::cout << "Write Index positions = {" << std::endl;

for (size_t i = 0; i < m_Comm.Size(); ++i)
{
std::cout << "Writer " << i << " has data at: " << std::endl;
uint64_t eachWriterSize = FlushPosSizeInfo.size() * 2 + 1;
for (size_t j = 0; j < FlushPosSizeInfo.size(); ++j)
{
std::cout << "loc:" << buf[3 + eachWriterSize * i + j * 2]
<< " siz:" << buf[3 + eachWriterSize * i + j * 2 + 1]
<< std::endl;
}
std::cout << "loc:" << buf[3 + eachWriterSize * (i + 1) - 1]
<< std::endl;
}
std::cout << "}" << std::endl;
#endif
/* reset for next timestep */
FlushPosSizeInfo.clear();
}

void BP5Writer::MarshalAttributes()
Expand Down Expand Up @@ -332,10 +361,11 @@ void BP5Writer::EndStep()

WriteData(TSInfo.DataBuffer);

m_ThisTimestepDataSize += TSInfo.DataBuffer->Size();

std::vector<char> MetaBuffer = m_BP5Serializer.CopyMetadataToContiguous(
TSInfo.NewMetaMetaBlocks, TSInfo.MetaEncodeBuffer,
TSInfo.AttributeEncodeBuffer, TSInfo.DataBuffer->Size(),
m_StartDataPos);
TSInfo.AttributeEncodeBuffer, m_ThisTimestepDataSize, m_StartDataPos);

size_t LocalSize = MetaBuffer.size();
std::vector<size_t> RecvCounts = m_Comm.GatherValues(LocalSize, 0);
Expand Down Expand Up @@ -765,7 +795,7 @@ void BP5Writer::MakeHeader(format::BufferSTL &b, const std::string fileType,
(helper::IsRowMajor(m_IO.m_HostLanguage) == false) ? 'y' : 'n';
helper::CopyToBuffer(buffer, position, &columnMajor);

// byte 45-63: unused
// byte 49-63: unused
position += 15;
absolutePosition = position;
}
Expand Down Expand Up @@ -807,41 +837,53 @@ void BP5Writer::InitBPBuffer()
}
}

void BP5Writer::DoFlush(const bool isFinal, const int transportIndex)
void BP5Writer::FlushData(const bool isFinal)
{
m_FileMetadataManager.FlushFiles();
m_FileMetaMetadataManager.FlushFiles();
m_FileDataManager.FlushFiles();

// true: advances step
// BufferV *DataBuf;
BufferV *DataBuf;
if (m_Parameters.BufferVType == (int)BufferVType::MallocVType)
{
// DataBuf = m_BP5Serializer.ReinitStepData(new
// MallocV("BP5Writer", false,
// m_Parameters.InitialBufferSize,
// m_Parameters.GrowthFactor));
DataBuf = m_BP5Serializer.ReinitStepData(
new MallocV("BP5Writer", false, m_Parameters.InitialBufferSize,
m_Parameters.GrowthFactor));
}
else
{
// DataBuf = m_BP5Serializer.ReinitStepData(new
// ChunkV("BP5Writer",
// false /* always
// copy
//*/,
// m_Parameters.BufferChunkSize));
DataBuf = m_BP5Serializer.ReinitStepData(
new ChunkV("BP5Writer", false /* always copy */,
m_Parameters.BufferChunkSize));
}

// WriteData(DataBuf);
// delete DataBuf;
WriteData(DataBuf);

m_ThisTimestepDataSize += DataBuf->Size();

if (!isFinal)
{
size_t tmp[2];
// aggregate start pos and data size to rank 0
tmp[0] = m_StartDataPos;
tmp[1] = DataBuf->Size();

std::vector<size_t> RecvBuffer;
if (m_Comm.Rank() == 0)
{
RecvBuffer.resize(m_Comm.Size() * 2);
}
m_Comm.GatherArrays(tmp, 2, RecvBuffer.data(), 0);
if (m_Comm.Rank() == 0)
{
FlushPosSizeInfo.push_back(RecvBuffer);
}
}
delete DataBuf;
}

void BP5Writer::Flush(const int transportIndex) { FlushData(false); }

void BP5Writer::DoClose(const int transportIndex)
{
PERFSTUBS_SCOPED_TIMER("BP5Writer::Close");

DoFlush(true, transportIndex);

m_FileDataManager.CloseFiles(transportIndex);
// Delete files from temporary storage if draining was on

Expand Down
10 changes: 9 additions & 1 deletion source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class BP5Writer : public BP5Engine, public core::Engine
size_t CurrentStep() const final;
void PerformPuts() final;
void EndStep() final;
void Flush(const int transportIndex = -1) final;

private:
/** Single object controlling BP buffering */
Expand Down Expand Up @@ -131,7 +132,7 @@ class BP5Writer : public BP5Engine, public core::Engine
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

void DoFlush(const bool isFinal = false, const int transportIndex = -1);
void FlushData(const bool isFinal = false);

void DoClose(const int transportIndex = -1) final;

Expand Down Expand Up @@ -199,6 +200,11 @@ class BP5Writer : public BP5Engine, public core::Engine
*/
uint64_t m_DataPos = 0;

/*
* Total data written this timestep
*/
uint64_t m_ThisTimestepDataSize = 0;

/** rank 0 collects m_StartDataPos in this vector for writing it
* to the index file
*/
Expand All @@ -210,6 +216,8 @@ class BP5Writer : public BP5Engine, public core::Engine
// where each writer rank writes its data, init in InitBPBuffer;
std::vector<uint64_t> m_Assignment;

std::vector<std::vector<size_t>> FlushPosSizeInfo;

void MakeHeader(format::BufferSTL &b, const std::string fileType,
const bool isActive);
};
Expand Down
22 changes: 20 additions & 2 deletions source/adios2/toolkit/format/bp5/BP5Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,14 @@ void BP5Serializer::Marshal(void *Variable, const char *Name,

if (span == nullptr)
{
DataOffset = CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
DataOffset = m_PriorDataBufferSizeTotal +
CurDataBuffer->AddToVec(ElemCount * ElemSize, Data,
ElemSize, Sync);
}
else
{
*span = CurDataBuffer->Allocate(ElemCount * ElemSize, ElemSize);
DataOffset = span->globalPos;
DataOffset = m_PriorDataBufferSizeTotal + span->globalPos;
}

if (!AlreadyWritten)
Expand Down Expand Up @@ -636,6 +637,21 @@ void BP5Serializer::InitStep(BufferV *DataBuffer)
throw std::logic_error("BP5Serializer:: InitStep without prior close");
}
CurDataBuffer = DataBuffer;
m_PriorDataBufferSizeTotal = 0;
}

BufferV *BP5Serializer::ReinitStepData(BufferV *DataBuffer)
{
if (CurDataBuffer == NULL)
{
throw std::logic_error("BP5Serializer:: ReinitStep without prior Init");
}
m_PriorDataBufferSizeTotal += CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

BufferV *tmp = CurDataBuffer;
CurDataBuffer = DataBuffer;
return tmp;
}

BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
Expand Down Expand Up @@ -696,6 +712,8 @@ BP5Serializer::TimestepInfo BP5Serializer::CloseTimestep(int timestep)
MBase->DataBlockSize = CurDataBuffer->AddToVec(
0, NULL, sizeof(max_align_t), true); // output block size aligned

MBase->DataBlockSize += m_PriorDataBufferSizeTotal;

void *MetaDataBlock = FFSencode(MetaEncodeBuffer, Info.MetaFormat,
MetadataBuf, &MetaDataSize);
BufferFFS *Metadata =
Expand Down
Loading

0 comments on commit c5d6778

Please sign in to comment.