Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

- Add header to all BP4 data files, metadata files and index table #1571

Merged
merged 1 commit into from
Jun 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 38 additions & 112 deletions source/adios2/engine/bp4/BP4Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,23 +280,7 @@ void BP4Writer::InitBPBuffer()
{
// Set the flag in the header of metadata index table to 0 again
// to indicate a new run begins
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(),
'\0');
metadataIndex.m_Position = 0;

const uint64_t currentRunIsOver = 0;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position,
&currentRunIsOver);

m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56,
0);
m_FileMetadataIndexManager.FlushFiles();

m_FileMetadataIndexManager.SeekToFileEnd();
UpdateActiveFlag(true);

// Get the size of existing metadata file
m_BP4Serializer.m_PreMetadataFileLength =
Expand All @@ -307,6 +291,22 @@ void BP4Writer::InitBPBuffer()
}
}

if (m_BP4Serializer.m_PreDataFileLength == 0)
{
/* This is a new file.
* Make headers in data buffer and metadata buffer
*/
if (m_BP4Serializer.m_RankMPI == 0)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Metadata, "Metadata",
false);
}
if (m_BP4Serializer.m_Aggregator.m_IsConsumer)
{
m_BP4Serializer.MakeHeader(m_BP4Serializer.m_Data, "Data", false);
}
}

m_BP4Serializer.PutProcessGroupIndex(
m_IO.m_Name, m_IO.m_HostLanguage,
m_FileDataManager.GetTransportsTypes());
Expand Down Expand Up @@ -409,68 +409,17 @@ void BP4Writer::WriteProfilingJSONFile()
}
}

/*generate the header for the metadata index file*/
void BP4Writer::PopulateMetadataIndexFileHeader(std::vector<char> &buffer,
size_t &position,
const uint8_t version,
const bool addSubfiles)
{
TAU_SCOPED_TIMER("BP4Writer::PopulateMetadataIndexFileHeader");
auto lf_CopyVersionChar = [](const std::string version,
std::vector<char> &buffer, size_t &position) {
helper::CopyToBuffer(buffer, position, version.c_str());
};

const std::string majorVersion(std::to_string(ADIOS2_VERSION_MAJOR));
const std::string minorVersion(std::to_string(ADIOS2_VERSION_MINOR));
const std::string patchVersion(std::to_string(ADIOS2_VERSION_PATCH));

const std::string versionLongTag("ADIOS-BP v" + majorVersion + "." +
minorVersion + "." + patchVersion);
const size_t versionLongTagSize = versionLongTag.size();
if (versionLongTagSize < 24)
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(),
versionLongTagSize);
position += 24 - versionLongTagSize;
}
else
{
helper::CopyToBuffer(buffer, position, versionLongTag.c_str(), 24);
}

lf_CopyVersionChar(majorVersion, buffer, position);
lf_CopyVersionChar(minorVersion, buffer, position);
lf_CopyVersionChar(patchVersion, buffer, position);
++position;

const uint8_t endianness = helper::IsLittleEndian() ? 0 : 1;
helper::CopyToBuffer(buffer, position, &endianness);

if (addSubfiles)
{
const uint8_t zeros1 = 0;
helper::CopyToBuffer(buffer, position, &zeros1);
helper::CopyToBuffer(buffer, position, &version);
}
else
{
const uint16_t zeros2 = 0;
helper::CopyToBuffer(buffer, position, &zeros2);
}
helper::CopyToBuffer(buffer, position, &version);
position += 32;
}

/*write the content of metadata index file*/
void BP4Writer::PopulateMetadataIndexFileContent(
const uint64_t currentStep, const uint64_t mpirank,
BufferSTL &b, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp, std::vector<char> &buffer,
size_t &position)
const uint64_t currentTimeStamp)
{
TAU_SCOPED_TIMER("BP4Writer::PopulateMetadataIndexFileContent");
auto &buffer = b.m_Buffer;
auto &position = b.m_Position;
auto &absolutePosition = b.m_AbsolutePosition;
helper::CopyToBuffer(buffer, position, &currentStep);
helper::CopyToBuffer(buffer, position, &mpirank);
helper::CopyToBuffer(buffer, position, &pgIndexStart);
Expand All @@ -481,6 +430,15 @@ void BP4Writer::PopulateMetadataIndexFileContent(
position += 8;
}

void BP4Writer::UpdateActiveFlag(const bool active)
{
const char activeChar = (active ? '\1' : '\0');
m_FileMetadataIndexManager.WriteFileAt(
&activeChar, 1, m_BP4Serializer.m_ActiveFlagPosition, 0);
m_FileMetadataIndexManager.FlushFiles();
m_FileMetadataIndexManager.SeekToFileEnd();
}

void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{

Expand All @@ -495,18 +453,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
{
// But the flag in the header of metadata index table needs to be
// modified to indicate current run is over.
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;

const uint64_t currentRunIsOver = 1;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position, &currentRunIsOver);

m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56, 0);
m_FileMetadataIndexManager.FlushFiles();
UpdateActiveFlag(false);
}
return;
}
Expand Down Expand Up @@ -540,9 +487,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
m_BP4Serializer.m_PreMetadataFileLength;

BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(64);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
metadataIndex.Resize(128, "BP4 Index Table Entry");

uint64_t currentStep;
if (isFinal && m_BP4Serializer.m_MetadataSet.DataPGCount > 0)
Expand All @@ -562,23 +507,15 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)

if (currentStep == 1) // TimeStep starts from 1
{
PopulateMetadataIndexFileHeader(metadataIndex.m_Buffer,
metadataIndex.m_Position, 4, true);
m_FileMetadataIndexManager.WriteFiles(metadataIndex.m_Buffer.data(),
metadataIndex.m_Position);

metadataIndex.m_Buffer.resize(64);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;
m_BP4Serializer.MakeHeader(metadataIndex, "Index Table", true);
}

std::time_t currentTimeStamp = std::time(nullptr);

PopulateMetadataIndexFileContent(
currentStep, m_BP4Serializer.m_RankMPI, pgIndexStartMetadataFile,
varIndexStartMetadataFile, attrIndexStartMetadataFile,
currentStepEndPos, currentTimeStamp, metadataIndex.m_Buffer,
metadataIndex.m_Position);
metadataIndex, currentStep, m_BP4Serializer.m_RankMPI,
pgIndexStartMetadataFile, varIndexStartMetadataFile,
attrIndexStartMetadataFile, currentStepEndPos, currentTimeStamp);

m_FileMetadataIndexManager.WriteFiles(metadataIndex.m_Buffer.data(),
metadataIndex.m_Position);
Expand All @@ -592,18 +529,7 @@ void BP4Writer::WriteCollectiveMetadataFile(const bool isFinal)
// Only one step of metadata is generated at close.
// The flag in the header of metadata index table
// needs to be modified to indicate current run is over.
BufferSTL metadataIndex;
metadataIndex.m_Buffer.resize(8);
metadataIndex.m_Buffer.assign(metadataIndex.m_Buffer.size(), '\0');
metadataIndex.m_Position = 0;

const uint64_t currentRunIsOver = 1;
helper::CopyToBuffer(metadataIndex.m_Buffer,
metadataIndex.m_Position, &currentRunIsOver);

m_FileMetadataIndexManager.WriteFileAt(
metadataIndex.m_Buffer.data(), metadataIndex.m_Position, 56, 0);
m_FileMetadataIndexManager.FlushFiles();
UpdateActiveFlag(false);
}
}
/*Clear the local indices buffer at the end of each step*/
Expand Down
9 changes: 3 additions & 6 deletions source/adios2/engine/bp4/BP4Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,13 @@ class BP4Writer : public core::Engine
* profilers*/
void WriteProfilingJSONFile();

void PopulateMetadataIndexFileHeader(std::vector<char> &buffer,
size_t &position, const uint8_t,
const bool addSubfiles);
void UpdateActiveFlag(const bool active);

void PopulateMetadataIndexFileContent(
const uint64_t currentStep, const uint64_t mpirank,
BufferSTL &buffer, const uint64_t currentStep, const uint64_t mpirank,
const uint64_t pgIndexStart, const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart, const uint64_t currentStepEndPos,
const uint64_t currentTimeStamp, std::vector<char> &buffer,
size_t &position);
const uint64_t currentTimeStamp);

void WriteCollectiveMetadataFile(const bool isFinal = false);

Expand Down
9 changes: 8 additions & 1 deletion source/adios2/toolkit/format/bp4/BP4Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ class BP4Base
size_t m_PreMetadataFileLength = 0;
size_t m_PreDataFileLength = 0;

/** Positions of flags in Index Table Header that Reader uses */
const size_t m_EndianFlagPosition = 36;
const size_t m_BPVersionPosition = 37;
const size_t m_ActiveFlagPosition = 38;
const size_t m_VersionTagPosition = 0;
const size_t m_VersionTagLength = 32;

/**
* Unique constructor
* @param mpiComm for m_BP1Aggregator
Expand Down Expand Up @@ -294,7 +301,7 @@ class BP4Base
};

/**
* Resizes the data buffer to hold new dataIn size
* Resizes the data buffer to hold additional new dataIn size
* @param dataIn input size for new data
* @param hint for exception handling
* @return
Expand Down
45 changes: 23 additions & 22 deletions source/adios2/toolkit/format/bp4/BP4Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL)
{
const auto &buffer = bufferSTL.m_Buffer;
const size_t bufferSize = buffer.size();
size_t position = 0;
position += 28;

// Read header (64 bytes)

// long version string
size_t position = m_VersionTagPosition;
m_Minifooter.VersionTag.assign(&buffer[position], m_VersionTagLength);

position = m_EndianFlagPosition;
const uint8_t endianness = helper::ReadValue<uint8_t>(buffer, position);
m_Minifooter.IsLittleEndian = (endianness == 0) ? true : false;
#ifndef ADIOS2_HAVE_ENDIAN_REVERSE
Expand All @@ -78,33 +84,28 @@ void BP4Deserializer::ParseMetadataIndex(const BufferSTL &bufferSTL)
}
#endif

position += 1;

const int8_t fileType = helper::ReadValue<int8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (fileType >= 3)
{
m_Minifooter.HasSubFiles = true;
}
else if (fileType == 0 || fileType == 2)
{
m_Minifooter.HasSubFiles = false;
}
// This has no flag in BP4 header. Always true
m_Minifooter.HasSubFiles = true;

// BP version
position = m_BPVersionPosition;
m_Minifooter.Version = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);
if (m_Minifooter.Version < 3)
if (m_Minifooter.Version != 4)
{
throw std::runtime_error("ERROR: ADIOS2 only supports bp format "
"version 3 and above, found " +
std::to_string(m_Minifooter.Version) +
" version \n");
throw std::runtime_error(
"ERROR: ADIOS2 BP4 Engine only supports bp format "
"version 4, found " +
std::to_string(m_Minifooter.Version) + " version \n");
}

position = 0;
m_Minifooter.VersionTag.assign(&buffer[position], 28);
// active flag, not used yet in the reader
position = m_ActiveFlagPosition;
const uint8_t activeFlag = helper::ReadValue<uint8_t>(
buffer, position, m_Minifooter.IsLittleEndian);

position += 64;
// Read each record now
position = 64;
while (position < bufferSize)
{
std::vector<uint64_t> ptrs;
Expand Down
Loading