Skip to content

Commit

Permalink
Merge pull request #3350 from eisenhauer/BP5Struct
Browse files Browse the repository at this point in the history
  • Loading branch information
eisenhauer authored Sep 26, 2022
2 parents f5107e3 + 47fc6bc commit f5b1724
Show file tree
Hide file tree
Showing 20 changed files with 963 additions and 35 deletions.
12 changes: 12 additions & 0 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,18 @@ void BP5Reader::DoGetAbsoluteSteps(const VariableBase &variable,
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type

void BP5Reader::DoGetStructSync(VariableStruct &variable, void *data)
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Get");
GetSyncCommon(variable, data);
}

void BP5Reader::DoGetStructDeferred(VariableStruct &variable, void *data)
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Get");
GetDeferredCommon(variable, data);
}

void BP5Reader::DoClose(const int transportIndex)
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Close");
Expand Down
9 changes: 5 additions & 4 deletions source/adios2/engine/bp5/BP5Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,12 @@ class BP5Reader : public BP5Engine, public Engine

void DoClose(const int transportIndex = -1) final;

template <class T>
void GetSyncCommon(Variable<T> &variable, T *data);
void GetSyncCommon(VariableBase &variable, void *data);

template <class T>
void GetDeferredCommon(Variable<T> &variable, T *data);
void GetDeferredCommon(VariableBase &variable, void *data);

void DoGetStructSync(VariableStruct &, void *);
void DoGetStructDeferred(VariableStruct &, void *);

template <class T>
void ReadVariableBlocks(Variable<T> &variable);
Expand Down
6 changes: 2 additions & 4 deletions source/adios2/engine/bp5/BP5Reader.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,14 @@ namespace core
namespace engine
{

template <class T>
inline void BP5Reader::GetSyncCommon(Variable<T> &variable, T *data)
inline void BP5Reader::GetSyncCommon(VariableBase &variable, void *data)
{
bool need_sync = m_BP5Deserializer->QueueGet(variable, data);
if (need_sync)
PerformGets();
}

template <class T>
void BP5Reader::GetDeferredCommon(Variable<T> &variable, T *data)
void BP5Reader::GetDeferredCommon(VariableBase &variable, void *data)
{
(void)m_BP5Deserializer->QueueGet(variable, data);
}
Expand Down
28 changes: 23 additions & 5 deletions source/adios2/engine/bp5/BP5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1756,12 +1756,21 @@ void BP5Writer::PutCommon(VariableBase &variable, const void *values, bool sync)
Count = variable.m_Count.data();
}

size_t ObjSize;
if (variable.m_Type == DataType::Struct)
{
ObjSize = variable.m_ElementSize;
}
else
{
ObjSize = helper::GetDataTypeSize(variable.m_Type);
}

if (!sync)
{
/* If arrays is small, force copying to internal buffer to aggregate
* small writes */
size_t n = helper::GetTotalSize(variable.m_Count) *
helper::GetDataTypeSize(variable.m_Type);
size_t n = helper::GetTotalSize(variable.m_Count) * ObjSize;
if (n < m_Parameters.MinDeferredSize)
{
sync = true;
Expand All @@ -1787,9 +1796,8 @@ void BP5Writer::PutCommon(VariableBase &variable, const void *values, bool sync)
(const char *)values, helper::CoreDims(ZeroDims),
variable.m_MemoryCount, sourceRowMajor, false, (char *)ptr,
variable.m_MemoryStart, variable.m_Count, sourceRowMajor, false,
helper::GetDataTypeSize(variable.m_Type), helper::CoreDims(),
helper::CoreDims(), helper::CoreDims(), helper::CoreDims(),
false /* safemode */, MemorySpace::Host);
ObjSize, helper::CoreDims(), helper::CoreDims(), helper::CoreDims(),
helper::CoreDims(), false /* safemode */, MemorySpace::Host);
}
else
{
Expand Down Expand Up @@ -1847,6 +1855,16 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

void BP5Writer::DoPutStructSync(VariableStruct &variable, const void *data)
{
PutCommon(variable, data, true);
}

void BP5Writer::DoPutStructDeferred(VariableStruct &variable, const void *data)
{
PutCommon(variable, data, false);
}

} // end namespace engine
} // end namespace core
} // end namespace adios2
5 changes: 5 additions & 0 deletions source/adios2/engine/bp5/BP5Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ class BP5Writer : public BP5Engine, public core::Engine
ADIOS2_FOREACH_PRIMITVE_STDTYPE_2ARGS(declare_type)
#undef declare_type

void DoPutStructSync(VariableStruct &, const void *) final;
void DoPutStructDeferred(VariableStruct &, const void *) final;

void PutStruct(VariableStruct &, const void *, bool);

void FlushData(const bool isFinal = false);

void DoClose(const int transportIndex = -1) final;
Expand Down
45 changes: 45 additions & 0 deletions source/adios2/engine/sst/SstReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,51 @@ void SstReader::Init()
ADIOS2_FOREACH_STDTYPE_1ARG(declare_gets)
#undef declare_gets

void SstReader::DoGetStructSync(VariableStruct &variable, void *data)
{
PERFSTUBS_SCOPED_TIMER("BP5Reader::Get");
if (m_WriterMarshalMethod != SstMarshalBP5)
{
helper::Throw<std::runtime_error>(
"Engine", "SstReader", "GetStructSync",
"SST only supports struct transmission when BP5 marshalling is "
"selected");
}
bool need_sync = m_BP5Deserializer->QueueGet(variable, data);
if (need_sync)
BP5PerformGets();
}

void SstReader::DoGetStructDeferred(VariableStruct &variable, void *data)
{
PERFSTUBS_SCOPED_TIMER("SstReader::Get");
if (m_WriterMarshalMethod != SstMarshalBP5)
{
helper::Throw<std::runtime_error>(
"Engine", "SstReader", "GetStructSync",
"SST only supports struct transmission when BP5 marshalling is "
"selected");
}
m_BP5Deserializer->QueueGet(variable, data);
}

Dims *SstReader::VarShape(const VariableBase &Var, const size_t Step) const
{
if (m_WriterMarshalMethod != SstMarshalBP5)
return nullptr;

return m_BP5Deserializer->VarShape(Var, Step);
}

bool SstReader::VariableMinMax(const VariableBase &Var, const size_t Step,
MinMaxStruct &MinMax)
{
if (m_WriterMarshalMethod != SstMarshalBP5)
return false;

return m_BP5Deserializer->VariableMinMax(Var, Step, MinMax);
}

void SstReader::BP5PerformGets()
{
size_t maxReadSize;
Expand Down
6 changes: 6 additions & 0 deletions source/adios2/engine/sst/SstReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class SstReader : public Engine
void PerformGets();
void Flush(const int transportIndex = -1) final;
MinVarInfo *MinBlocksInfo(const VariableBase &, const size_t Step) const;
Dims *VarShape(const VariableBase &, const size_t Step) const;
bool VariableMinMax(const VariableBase &, const size_t Step,
MinMaxStruct &MinMax);

private:
template <class T>
Expand Down Expand Up @@ -95,6 +98,9 @@ class SstReader : public Engine
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type

void DoGetStructSync(VariableStruct &, void *);
void DoGetStructDeferred(VariableStruct &, void *);

/**
* Called if destructor is called on an open engine. Should warn or take
* any non-complex measure that might help recover.
Expand Down
52 changes: 52 additions & 0 deletions source/adios2/engine/sst/SstWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ StepStatus SstWriter::BeginStep(StepMode mode, const float timeout_sec)
{
m_BP5Serializer = std::unique_ptr<format::BP5Serializer>(
new format::BP5Serializer());
m_BP5Serializer->m_StatsLevel = Params.StatsLevel;
}
m_BP5Serializer->InitStep(new format::MallocV("SstWriter", true));
m_BP5Serializer->m_Engine = this;
Expand Down Expand Up @@ -435,6 +436,57 @@ void SstWriter::Init()
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type

void SstWriter::PutStructCommon(VariableBase &variable, const void *data)
{
size_t *Shape = NULL;
size_t *Start = NULL;
size_t *Count = NULL;
size_t DimCount = 0;

if (m_BetweenStepPairs == false)
{
helper::Throw<std::logic_error>("Engine", "SstWriter", "PutSyncCommon",
"When using the SST engine in ADIOS2, "
"Put() calls must appear between "
"BeginStep/EndStep pairs");
}

if (Params.MarshalMethod != SstMarshalBP5)
{
helper::Throw<std::logic_error>(
"Engine", "SstWriter", "PutStructCommon",
"Support for struct types only exists when using BP5 marshalling");
}

if (variable.m_ShapeID == ShapeID::GlobalArray)
{
DimCount = variable.m_Shape.size();
Shape = variable.m_Shape.data();
Start = variable.m_Start.data();
Count = variable.m_Count.data();
}
else if (variable.m_ShapeID == ShapeID::LocalArray)
{
DimCount = variable.m_Count.size();
Count = variable.m_Count.data();
}
m_BP5Serializer->Marshal((void *)&variable, variable.m_Name.c_str(),
variable.m_Type, variable.m_ElementSize, DimCount,
Shape, Count, Start, data, true, nullptr);
}

void SstWriter::DoPutStructSync(VariableStruct &variable, const void *data)
{
PutStructCommon(variable, data);
}

void SstWriter::DoPutStructDeferred(VariableStruct &variable, const void *data)
{
PutStructCommon(variable, data);
}

void PutStruct(VariableStruct &, const void *, bool);

void SstWriter::DoClose(const int transportIndex) { SstWriterClose(m_Output); }

/**
Expand Down
5 changes: 5 additions & 0 deletions source/adios2/engine/sst/SstWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ class SstWriter : public Engine
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type

void DoPutStructSync(VariableStruct &, const void *) final;
void DoPutStructDeferred(VariableStruct &, const void *) final;

void PutStructCommon(VariableBase &, const void *);

template <class T>
void PutSyncCommon(Variable<T> &variable, const T *values);

Expand Down
20 changes: 16 additions & 4 deletions source/adios2/engine/sst/SstWriter.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,22 @@ void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values)
}
else
{
m_BP5Serializer->Marshal((void *)&variable, variable.m_Name.c_str(),
variable.m_Type, variable.m_ElementSize,
DimCount, Shape, Count, Start, values,
true, nullptr);
if (variable.m_Type == DataType::String)
{
std::string &source = *(std::string *)values;
void *p = &(source[0]);
m_BP5Serializer->Marshal(
(void *)&variable, variable.m_Name.c_str(), variable.m_Type,
variable.m_ElementSize, DimCount, Shape, Count, Start, &p,
true, nullptr);
}
else
{
m_BP5Serializer->Marshal(
(void *)&variable, variable.m_Name.c_str(), variable.m_Type,
variable.m_ElementSize, DimCount, Shape, Count, Start,
values, true, nullptr);
}
}
}
else if (Params.MarshalMethod == SstMarshalBP)
Expand Down
Loading

0 comments on commit f5b1724

Please sign in to comment.