Skip to content

Commit

Permalink
Add backward compatible read with BLOSC compression to BP4 files crea…
Browse files Browse the repository at this point in the history
…ted pre-2.8.

Bzip2 compressed file and testing is added to show how to test for failure.
  • Loading branch information
pnorbert committed Jul 7, 2022
1 parent 499d268 commit 62f0cca
Show file tree
Hide file tree
Showing 24 changed files with 682 additions and 24 deletions.
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ add_library(adios2_core
toolkit/format/bp/bp4/BP4Base.cpp
toolkit/format/bp/bp4/BP4Serializer.cpp toolkit/format/bp/bp4/BP4Serializer.tcc
toolkit/format/bp/bp4/BP4Deserializer.cpp toolkit/format/bp/bp4/BP4Deserializer.tcc
toolkit/format/bp/bpBackCompatOperation/compress/BPBackCompatBlosc.cpp

toolkit/profiling/iochrono/Timer.cpp
toolkit/profiling/iochrono/IOChrono.cpp
Expand Down
13 changes: 13 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

#include "adios2/helper/adiosFunctions.h"

#include "adios2/toolkit/format/bp/bpBackCompatOperation/compress/BPBackCompatBlosc.h"

namespace adios2
{
namespace format
Expand Down Expand Up @@ -487,5 +489,16 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_template_instantiation)

size_t BPBase::DebugGetDataBufferSize() const { return m_Data.DebugGetSize(); }

std::shared_ptr<BPBackCompatOperation>
BPBase::SetBPBackCompatOperation(const std::string type) const noexcept
{
std::shared_ptr<BPBackCompatOperation> bpOp;
if (type == "blosc")
{
bpOp = std::make_shared<BPBackCompatBlosc>();
}
return bpOp;
}

} // end namespace format
} // end namespace adios2
13 changes: 13 additions & 0 deletions source/adios2/toolkit/format/bp/BPBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "adios2/common/ADIOSTypes.h"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/toolkit/aggregator/mpi/MPIChain.h"
#include "adios2/toolkit/format/bp/bpBackCompatOperation/BPBackCompatOperation.h"
#include "adios2/toolkit/format/buffer/Buffer.h"
#include "adios2/toolkit/format/buffer/heap/BufferSTL.h"
#include "adios2/toolkit/profiling/iochrono/IOChrono.h"
Expand Down Expand Up @@ -144,6 +145,10 @@ class BPBase
uint64_t VarsIndexStart = 0;
uint64_t AttributesIndexStart = 0;
int8_t Version = -1;
uint8_t ADIOSVersionMajor = 0;
uint8_t ADIOSVersionMinor = 0;
uint8_t ADIOSVersionPatch = 0;
uint32_t ADIOSVersion = 0; // major*1M + minor*1k + patch e.g. 2007001
bool IsLittleEndian = true;
bool HasSubFiles = false;

Expand Down Expand Up @@ -472,6 +477,14 @@ class BPBase
TransformTypes TransformTypeEnum(const std::string transformType) const
noexcept;

/**
* Returns the proper derived class for BPOperation based on type
* @param type input, must be a supported type under BPOperation
* @return derived class if supported, false pointer if type not supported
*/
std::shared_ptr<BPBackCompatOperation>
SetBPBackCompatOperation(const std::string type) const noexcept;

struct ProcessGroupIndex
{
uint64_t Offset;
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ class BP4Base : virtual public BPBase
static constexpr size_t m_ActiveFlagPosition = 38;
static constexpr size_t m_VersionTagPosition = 0;
static constexpr size_t m_VersionTagLength = 32;
static constexpr size_t m_VersionMajorPosition = 32;
static constexpr size_t m_VersionMinorPosition = 33;
static constexpr size_t m_VersionPatchPosition = 34;

/**
* Unique constructor
Expand Down
17 changes: 17 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,23 @@ void BP4Deserializer::ParseMetadataIndex(BufferSTL &bufferSTL,
// This has no flag in BP4 header. Always true
m_Minifooter.HasSubFiles = true;

// Writer's ADIOS version
position = m_VersionMajorPosition;
uint8_t ascii = helper::ReadValue<uint8_t>(buffer, position,
m_Minifooter.IsLittleEndian);
m_Minifooter.ADIOSVersionMajor = ascii - (uint8_t)'0';
position = m_VersionMinorPosition;
ascii = helper::ReadValue<uint8_t>(buffer, position,
m_Minifooter.IsLittleEndian);
m_Minifooter.ADIOSVersionMinor = ascii - (uint8_t)'0';
position = m_VersionPatchPosition;
ascii = helper::ReadValue<uint8_t>(buffer, position,
m_Minifooter.IsLittleEndian);
m_Minifooter.ADIOSVersionPatch = ascii - (uint8_t)'0';
m_Minifooter.ADIOSVersion = m_Minifooter.ADIOSVersionMajor * 1000000 +
m_Minifooter.ADIOSVersionMinor * 1000 +
m_Minifooter.ADIOSVersionPatch;

// BP version
position = m_BPVersionPosition;
m_Minifooter.Version = helper::ReadValue<uint8_t>(
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class BP4Deserializer : virtual public BP4Base
const bool isRowMajorDestination,
const size_t threadID = 0);

void BackCompatDecompress(const helper::SubStreamBoxInfo &subStreamBoxInfo,
const size_t threadID = 0);

/**
* Clips and assigns memory to blockInfo.Data from a contiguous memory
* input
Expand Down
121 changes: 97 additions & 24 deletions source/adios2/toolkit/format/bp/bp4/BP4Deserializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,30 @@ void BP4Deserializer::SetVariableBlockInfo(
blockOperation.PreSizeOf = sizeof(T);

// read metadata from supported type and populate Info
std::memcpy(&blockOperation.PayloadSize, bpOpInfo.Metadata.data() + 8,
8);
if (m_Minifooter.ADIOSVersion >= 2008000)
{
std::memcpy(&blockOperation.PayloadSize,
bpOpInfo.Metadata.data() + 8, 8);
}
else
{
// Files made before 2.8 have incompatible compression operators
// Add backward compatible fixes here to parse compression metadata
// directly from the BP4 metadata format
std::shared_ptr<BPBackCompatOperation> bpOp =
SetBPBackCompatOperation(bpOpInfo.Type);
if (bpOp)
{
bpOp->GetMetadata(bpOpInfo.Metadata, blockOperation.Info);
blockOperation.PayloadSize = static_cast<size_t>(
std::stoull(blockOperation.Info.at("OutputSize")));
}
else
{
std::memcpy(&blockOperation.PayloadSize,
bpOpInfo.Metadata.data() + 8, 8);
}
}

subStreamInfo.OperationsInfo.push_back(std::move(blockOperation));
};
Expand Down Expand Up @@ -520,34 +542,43 @@ void BP4Deserializer::PostDataRead(
{
if (subStreamBoxInfo.OperationsInfo.size() > 0)
{
const helper::BlockOperationInfo &blockOperationInfo =
InitPostOperatorBlockData(subStreamBoxInfo.OperationsInfo);
if (m_Minifooter.ADIOSVersion >= 2008000)
{
const helper::BlockOperationInfo &blockOperationInfo =
InitPostOperatorBlockData(subStreamBoxInfo.OperationsInfo);

const size_t preOpPayloadSize =
helper::GetTotalSize(blockOperationInfo.PreCount) *
blockOperationInfo.PreSizeOf;
m_ThreadBuffers[threadID][0].resize(preOpPayloadSize);
const size_t preOpPayloadSize =
helper::GetTotalSize(blockOperationInfo.PreCount) *
blockOperationInfo.PreSizeOf;
m_ThreadBuffers[threadID][0].resize(preOpPayloadSize);

// get original block back
char *preOpData = m_ThreadBuffers[threadID][0].data();
const char *postOpData = m_ThreadBuffers[threadID][1].data();
// get original block back
char *preOpData = m_ThreadBuffers[threadID][0].data();
const char *postOpData = m_ThreadBuffers[threadID][1].data();

std::shared_ptr<core::Operator> op = nullptr;
for (auto &o : blockInfo.Operations)
{
if (o->m_Category == "compress" || o->m_Category == "plugin")
std::shared_ptr<core::Operator> op = nullptr;
for (auto &o : blockInfo.Operations)
{
op = o;
break;
if (o->m_Category == "compress" || o->m_Category == "plugin")
{
op = o;
break;
}
}
}
core::Decompress(postOpData, blockOperationInfo.PayloadSize, preOpData,
op);
core::Decompress(postOpData, blockOperationInfo.PayloadSize,
preOpData, op);

// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
subStreamBoxInfo.Seeks.first,
subStreamBoxInfo.Seeks.second);
// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
subStreamBoxInfo.Seeks.first,
subStreamBoxInfo.Seeks.second);
}
else
{
// Files made before 2.8 have incompatible compression operators
// Add backward compatible fixes in the function below
BackCompatDecompress(subStreamBoxInfo, threadID);
}
}

#ifdef ADIOS2_HAVE_ENDIAN_REVERSE
Expand All @@ -570,6 +601,48 @@ void BP4Deserializer::PostDataRead(
endianReverse, blockInfo.IsGPU);
}

void BP4Deserializer::BackCompatDecompress(
const helper::SubStreamBoxInfo &subStreamBoxInfo, const size_t threadID)
{
// Files made before 2.8 have incompatible compression operators
// Add backward compatible fixes here
const helper::BlockOperationInfo &blockOperationInfo =
InitPostOperatorBlockData(subStreamBoxInfo.OperationsInfo);

const size_t preOpPayloadSize =
helper::GetTotalSize(blockOperationInfo.PreCount) *
blockOperationInfo.PreSizeOf;
m_ThreadBuffers[threadID][0].resize(preOpPayloadSize);

std::string opType = blockOperationInfo.Info.at("Type");

// get original block back
char *preOpData = m_ThreadBuffers[threadID][0].data();
const char *postOpData = m_ThreadBuffers[threadID][1].data();
// get the right bp4Op
std::shared_ptr<BPBackCompatOperation> bp4Op =
SetBPBackCompatOperation(opType);

if (bp4Op)
{
bp4Op->GetData(postOpData, blockOperationInfo, preOpData);
// clip block to match selection
helper::ClipVector(m_ThreadBuffers[threadID][0],
subStreamBoxInfo.Seeks.first,
subStreamBoxInfo.Seeks.second);
}
else
{
helper::Throw<std::runtime_error>(
"Toolkit", "format::bp::BP4Deserializer", "PostDataRead",
"This file was created by pre-ADIOS 2.8.0 using "
"compression type " +
opType +
", for which there is no backward compatible reader in this "
"ADIOS version");
}
}

template <class T>
std::map<size_t, std::vector<typename core::Variable<T>::BPInfo>>
BP4Deserializer::AllStepsBlocksInfo(const core::Variable<T> &variable) const
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BPOperation.h :
*
* Created on: Jul 12, 2018
* Author: William F Godoy godoywf@ornl.gov
*/

#ifndef ADIOS2_TOOLKIT_FORMAT_BP_BPOPERATION_BPOPERATION_H_
#define ADIOS2_TOOLKIT_FORMAT_BP_BPOPERATION_BPOPERATION_H_

#include <string>
#include <vector>

#include "adios2/common/ADIOSMacros.h"
#include "adios2/core/Variable.h"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/toolkit/format/buffer/heap/BufferSTL.h"

namespace adios2
{
namespace format
{

class BPBackCompatOperation
{
public:
BPBackCompatOperation() = default;
virtual ~BPBackCompatOperation() = default;

/**
* Deserializes metadata in the form of parameters
* @param buffer contains serialized metadata buffer
* @param info parameters info from metadata buffer
*/
virtual void GetMetadata(const std::vector<char> &buffer,
Params &info) const noexcept = 0;

virtual void GetData(const char *input,
const helper::BlockOperationInfo &blockOperationInfo,
char *dataOutput) const = 0;
};

} // end namespace format
} // end namespace adios2

#endif /* ADIOS2_TOOLKIT_FORMAT_BP_OPERATION_BPOPERATION_H_ */
Loading

0 comments on commit 62f0cca

Please sign in to comment.