Skip to content

Commit

Permalink
Merge pull request #1571 from pnorbert/bp4headers
Browse files Browse the repository at this point in the history
- Add header to all BP4 data files, metadata files and index table
  • Loading branch information
pnorbert authored Jun 29, 2019
2 parents 8d759c3 + e1f8717 commit 7c2bf0e
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 164 deletions.
150 changes: 38 additions & 112 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,23 +280,7 @@ void BP4Writer::InitBPBuffer()
{
// Set the flag in the header of metadata index table to 0 again
// to indicate a new run begins
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(),
'\0');
metadataIndex.m_Position = 0;

const uint64_t currentRunIsOver = 0;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position,
&currentRunIsOver);

m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56,
0);
m_FileMetadataIndexManager.FlushFiles();

m_FileMetadataIndexManager.SeekToFileEnd();
UpdateActiveFlag(true);

// Get the size of existing metadata file
m_BP4Serializer.m_PreMetadataFileLength =
Expand All @@ -307,6 +291,22 @@ void BP4Writer::InitBPBuffer()
}
}

if (m_BP4Serializer.m_PreDataFileLength == 0)
{
/* This is a new file.
* Make headers in data buffer and metadata buffer
*/
if (m_BP4Serializer.m_RankMPI == 0)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Metadata, "Metadata",
false);
}
if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Data, "Data", false);
}
}

m_BP4Serializer.PutProcessGroupIndex(
m_IO.m_Name, m_IO.m_HostLanguage,
m_FileDataManager.GetTransportsTypes());
Expand Down Expand Up @@ -409,68 +409,17 @@ void BP4Writer::WriteProfilingJSONFile()
}
}

/*generate the header for the metadata index file*/
void BP4Writer::PopulateMetadataIndexFileHeader(std::vector<char> &buffer,
size_t &position,
const uint8_t version,
const bool addSubfiles)
{
TAU_SCOPED_TIMER("BP4Writer::PopulateMetadataIndexFileHeader");
auto lf_CopyVersionChar = [](const std::string version,
std::vector<char> &buffer, size_t &position) {
helper::CopyToBuffer(buffer, position, version.c_str());
};

const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR));
const std::string minorVersion(std::to_string(ADIOS2_VERSION_MINOR));
const std::string patchVersion(std::to_string(ADIOS2_VERSION_PATCH));

const std::string versionLongTag("ADIOS-BP v" + majorVersion + "." +
minorVersion + "." + patchVersion);
const size_t versionLongTagSize = versionLongTag.size();
if (versionLongTagSize < 24)
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(),
versionLongTagSize);
position += 24 - versionLongTagSize;
}
else
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(), 24);
}

lf_CopyVersionChar(majorVersion, buffer, position);
lf_CopyVersionChar(minorVersion, buffer, position);
lf_CopyVersionChar(patchVersion, buffer, position);
++position;

const uint8_t endianness = helper::IsLittleEndian() ? 0 : 1;
helper::CopyToBuffer(buffer, position, &endianness);

if (addSubfiles)
{
const uint8_t zeros1 = 0;
helper::CopyToBuffer(buffer, position, &zeros1);
helper::CopyToBuffer(buffer, position, &version);
}
else
{
const uint16_t zeros2 = 0;
helper::CopyToBuffer(buffer, position, &zeros2);
}
helper::CopyToBuffer(buffer, position, &version);
position += 32;
}

/*write the content of metadata index file*/
void BP4Writer::PopulateMetadataIndexFileContent(
const uint64_t currentStep, const uint64_t mpirank,
BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp, std::vector<char> &buffer,
size_t &position)
const uint64_t currentTimeStamp)
{
TAU_SCOPED_TIMER("BP4Writer::PopulateMetadataIndexFileContent");
auto &buffer = b.m_Buffer;
auto &position = b.m_Position;
auto &absolutePosition = b.m_AbsolutePosition;
helper::CopyToBuffer(buffer, position, &currentStep);
helper::CopyToBuffer(buffer, position, &mpirank);
helper::CopyToBuffer(buffer, position, &pgIndexStart);
Expand All @@ -481,6 +430,15 @@ void BP4Writer::PopulateMetadataIndexFileContent(
position += 8;
}

void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
}

void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{

Expand All @@ -495,18 +453,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{
// But the flag in the header of metadata index table needs to be
// modified to indicate current run is over.
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;

const uint64_t currentRunIsOver = 1;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position, &currentRunIsOver);

m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56, 0);
m_FileMetadataIndexManager.FlushFiles();
UpdateActiveFlag(false);
}
return;
}
Expand Down Expand Up @@ -540,9 +487,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
m_BP4Serializer.m_PreMetadataFileLength;

BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(64);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
metadataIndex.Resize(128, "BP4 Index Table Entry");

uint64_t currentStep;
if (isFinal && m_BP4Serializer.m_MetadataSet.DataPGCount > 0)
Expand All @@ -562,23 +507,15 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (currentStep == 1) // TimeStep starts from 1
{
PopulateMetadataIndexFileHeader(metadataIndex.m_Buffer,
metadataIndex.m_Position, 4, true);
m_FileMetadataIndexManager.WriteFiles(metadataIndex.m_Buffer.data(),
metadataIndex.m_Position);

metadataIndex.m_Buffer.resize(64);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
m_BP4Serializer.MakeHeader(metadataIndex, "Index Table", true);
}

std::time_t currentTimeStamp = std::time(nullptr);

PopulateMetadataIndexFileContent(
currentStep, m_BP4Serializer.m_RankMPI, pgIndexStartMetadataFile,
varIndexStartMetadataFile, attrIndexStartMetadataFile,
currentStepEndPos, currentTimeStamp, metadataIndex.m_Buffer,
metadataIndex.m_Position);
metadataIndex, currentStep, m_BP4Serializer.m_RankMPI,
pgIndexStartMetadataFile, varIndexStartMetadataFile,
attrIndexStartMetadataFile, currentStepEndPos, currentTimeStamp);

m_FileMetadataIndexManager.WriteFiles(metadataIndex.m_Buffer.data(),
metadataIndex.m_Position);
Expand All @@ -592,18 +529,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
// Only one step of metadata is generated at close.
// The flag in the header of metadata index table
// needs to be modified to indicate current run is over.
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;

const uint64_t currentRunIsOver = 1;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position, &currentRunIsOver);

m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56, 0);
m_FileMetadataIndexManager.FlushFiles();
UpdateActiveFlag(false);
}
}
/*Clear the local indices buffer at the end of each step*/
Expand Down
9 changes: 3 additions & 6 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,13 @@ class BP4Writer : public core::Engine
* profilers*/
void WriteProfilingJSONFile();

void PopulateMetadataIndexFileHeader(std::vector<char> &buffer,
size_t &position, const uint8_t,
const bool addSubfiles);
void UpdateActiveFlag(const bool active);

void PopulateMetadataIndexFileContent(
const uint64_t currentStep, const uint64_t mpirank,
BufferSTL &buffer, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp, std::vector<char> &buffer,
size_t &position);
const uint64_t currentTimeStamp);

void WriteCollectiveMetadataFile(const bool isFinal = false);

Expand Down
9 changes: 8 additions & 1 deletion source/adios2/toolkit/format/bp4/BP4Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ class BP4Base
size_t m_PreMetadataFileLength = 0;
size_t m_PreDataFileLength = 0;

/** Positions of flags in Index Table Header that Reader uses */
const size_t m_EndianFlagPosition = 36;
const size_t m_BPVersionPosition = 37;
const size_t m_ActiveFlagPosition = 38;
const size_t m_VersionTagPosition = 0;
const size_t m_VersionTagLength = 32;

/**
* Unique constructor
* @param mpiComm for m_BP1Aggregator
Expand Down Expand Up @@ -294,7 +301,7 @@ class BP4Base
};

/**
* Resizes the data buffer to hold new dataIn size
* Resizes the data buffer to hold additional new dataIn size
* @param dataIn input size for new data
* @param hint for exception handling
* @return
Expand Down
45 changes: 23 additions & 22 deletions source/adios2/toolkit/format/bp4/BP4Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL)
{
const auto &buffer = bufferSTL.m_Buffer;
const size_t bufferSize = buffer.size();
size_t position = 0;
position += 28;

// Read header (64 bytes)

// long version string
size_t position = m_VersionTagPosition;
m_Minifooter.VersionTag.assign(&buffer[position], m_VersionTagLength);

position = m_EndianFlagPosition;
const uint8_t endianness = helper::ReadValue<uint8_t>(buffer, position);
m_Minifooter.IsLittleEndian = (endianness == 0) ? true : false;
#ifndef ADIOS2_HAVE_ENDIAN_REVERSE
Expand All @@ -78,33 +84,28 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL)
}
#endif

position += 1;

const int8_t fileType = helper::ReadValue<int8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (fileType >= 3)
{
m_Minifooter.HasSubFiles = true;
}
else if (fileType == 0 || fileType == 2)
{
m_Minifooter.HasSubFiles = false;
}
// This has no flag in BP4 header. Always true
m_Minifooter.HasSubFiles = true;

// BP version
position = m_BPVersionPosition;
m_Minifooter.Version = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (m_Minifooter.Version < 3)
if (m_Minifooter.Version != 4)
{
throw std::runtime_error("ERROR: ADIOS2 only supports bp format "
"version 3 and above, found " +
std::to_string(m_Minifooter.Version) +
" version \n");
throw std::runtime_error(
"ERROR: ADIOS2 BP4 Engine only supports bp format "
"version 4, found " +
std::to_string(m_Minifooter.Version) + " version \n");
}

position = 0;
m_Minifooter.VersionTag.assign(&buffer[position], 28);
// active flag, not used yet in the reader
position = m_ActiveFlagPosition;
const uint8_t activeFlag = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);

position += 64;
// Read each record now
position = 64;
while (position < bufferSize)
{
std::vector<uint64_t> ptrs;
Expand Down
Loading

0 comments on commit 7c2bf0e

Please sign in to comment.