Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added attributes into dataman #984

Merged
merged 2 commits into from
Nov 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,16 @@ StepStatus DataManReader::BeginStep(StepMode stepMode,
"[DataManReader::BeginStep] Step mode is not supported!"));
}

vars = m_MetaDataMap[m_CurrentStep];
auto currentStepIt = m_MetaDataMap.find(m_CurrentStep);
if (currentStepIt != m_MetaDataMap.end())
{
vars = currentStepIt->second;
}
}

if (m_CurrentStep == 0)
{
m_DataManDeserializer.GetAttributes(m_IO);
}

for (const auto &i : *vars)
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ void DataManWriter::EndStep()
{
for (size_t i = 0; i < m_TransportChannels; ++i)
{
if (m_CurrentStep == 0)
{
m_DataManSerializer[i]->PutAttributes(m_IO, m_MPIRank);
}
const std::shared_ptr<std::vector<char>> buf =
m_DataManSerializer[i]->Get();
m_BufferSize = buf->size() * 2;
Expand Down
42 changes: 41 additions & 1 deletion source/adios2/toolkit/format/dataman/DataManDeserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ int DataManDeserializer::Put(

for (auto stepMapIt = metaJ.begin(); stepMapIt != metaJ.end(); ++stepMapIt)
{
if (stepMapIt.key() == "G" || stepMapIt.key() == "A")
{
for (const auto &rankVec : stepMapIt.value())
{
for (const auto &gVar : rankVec)
{
m_GlobalVars[gVar["N"].get<std::string>()] = gVar;
}
}
continue;
}

for (auto rankMapIt = stepMapIt.value().begin();
rankMapIt != stepMapIt.value().end(); ++rankMapIt)
{
Expand All @@ -69,10 +81,10 @@ int DataManDeserializer::Put(
try
{
// compulsory properties
var.step = stoull(stepMapIt.key());
var.name = varBlock["N"].get<std::string>();
var.start = varBlock["O"].get<Dims>();
var.count = varBlock["C"].get<Dims>();
var.step = varBlock["T"].get<size_t>();
var.size = varBlock["I"].get<size_t>();

// optional properties
Expand Down Expand Up @@ -246,5 +258,33 @@ bool DataManDeserializer::HasOverlap(Dims in_start, Dims in_count,
return true;
}

void DataManDeserializer::GetAttributes(core::IO &io)
{
std::lock_guard<std::mutex> l(m_Mutex);
for (const auto &j : m_GlobalVars)
{
const std::string type(j["Y"].get<std::string>());
if (type == "unknown")
{
}
#define declare_type(T) \
else if (type == helper::GetType<T>()) \
{ \
if (j["V"].get<bool>()) \
{ \
io.DefineAttribute<T>(j["N"].get<std::string>(), j["G"].get<T>()); \
} \
else \
{ \
io.DefineAttribute<T>(j["N"].get<std::string>(), \
j["G"].get<std::vector<T>>().data(), \
j["G"].get<std::vector<T>>().size()); \
} \
}
ADIOS2_FOREACH_ATTRIBUTE_TYPE_1ARG(declare_type)
#undef declare_type
}
}

} // namespace format
} // namespace adios2
3 changes: 3 additions & 0 deletions source/adios2/toolkit/format/dataman/DataManDeserializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <nlohmann/json.hpp>

#include "adios2/ADIOSTypes.h"
#include "adios2/core/IO.h"
#include "adios2/core/Variable.h"

#include <mutex>
Expand Down Expand Up @@ -56,6 +57,7 @@ class DataManDeserializer
GetMetaData(const size_t step);
const std::unordered_map<size_t, std::shared_ptr<std::vector<DataManVar>>>
GetMetaData();
void GetAttributes(core::IO &io);

private:
bool HasOverlap(Dims in_start, Dims in_count, Dims out_start,
Expand All @@ -69,6 +71,7 @@ class DataManDeserializer
size_t m_MinStep = std::numeric_limits<size_t>::max();
bool m_IsRowMajor;
bool m_IsLittleEndian;
nlohmann::json m_GlobalVars;

std::mutex m_Mutex;
};
Expand Down
23 changes: 22 additions & 1 deletion source/adios2/toolkit/format/dataman/DataManSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void DataManSerializer::New(size_t size)

const std::shared_ptr<std::vector<char>> DataManSerializer::Get()
{
std::vector<uint8_t> metacbor = nlohmann::json::to_msgpack(m_Metadata);
std::vector<uint8_t> metacbor(nlohmann::json::to_msgpack(m_Metadata));
size_t metasize = metacbor.size();
m_Buffer->resize(m_Position + metasize);
(reinterpret_cast<uint64_t *>(m_Buffer->data()))[0] = m_Position;
Expand Down Expand Up @@ -97,5 +97,26 @@ bool DataManSerializer::IsCompressionAvailable(const std::string &method,
return false;
}

void DataManSerializer::PutAttributes(core::IO &io, const int rank)
{
const auto attributesDataMap = io.GetAttributesDataMap();
for (const auto &attributePair : attributesDataMap)
{
const std::string name(attributePair.first);
const std::string type(attributePair.second.first);
if (type == "unknown")
{
}
#define declare_type(T) \
else if (type == helper::GetType<T>()) \
{ \
core::Attribute<T> &attribute = *io.InquireAttribute<T>(name); \
PutAttribute(attribute, rank); \
}
ADIOS2_FOREACH_ATTRIBUTE_TYPE_1ARG(declare_type)
#undef declare_type
}
}

} // namespace format
} // namespace adios2
33 changes: 25 additions & 8 deletions source/adios2/toolkit/format/dataman/DataManSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <nlohmann/json.hpp>

#include "adios2/ADIOSTypes.h"
#include "adios2/core/IO.h"
#include "adios2/core/Variable.h"

#include <mutex>
Expand All @@ -23,13 +24,15 @@
// C - Count
// D - Data Object ID or File Name
// E - Endian
// G - Global Value
// H - Meatadata Hash
// I - Data Size
// M - Major
// N - Variable Name
// O - Start
// P - Position of Memory Block
// S - Shape
// V - Is Single Value
// X - Index (Used only in deserializer)
// Y - Data Type
// Z - Compression Method
Expand All @@ -44,39 +47,53 @@ class DataManSerializer
{
public:
DataManSerializer(bool isRowMajor, bool isLittleEndian);

void New(size_t size);

template <class T>
void Put(const T *inputData, const std::string &varName,
const Dims &varShape, const Dims &varStart, const Dims &varCount,
const std::string &doid, const size_t step, const int rank,
std::string address, const Params &params);
const std::string &address, const Params &params);

template <class T>
void Put(const core::Variable<T> &variable, const std::string &doid,
const size_t step, const int rank, std::string address,
const size_t step, const int rank, const std::string &address,
const Params &params);

void PutAttributes(core::IO &io, const int rank);

const std::shared_ptr<std::vector<char>> Get();

float GetMetaRatio();

static std::shared_ptr<std::vector<char>> EndSignal(size_t step);

private:
std::shared_ptr<std::vector<char>> m_Buffer;
nlohmann::json m_Metadata;
std::vector<char> m_CompressBuffer;
size_t m_Position = 0;
bool m_IsRowMajor;
bool m_IsLittleEndian;
template <class T>
bool Zfp(nlohmann::json &metaj, size_t &datasize, const T *inputData,
const Dims &varCount, const Params &params);

template <class T>
bool Sz(nlohmann::json &metaj, size_t &datasize, const T *inputData,
const Dims &varCount, const Params &params);

template <class T>
bool BZip2(nlohmann::json &metaj, size_t &datasize, const T *inputData,
const Dims &varCount, const Params &params);

template <class T>
void PutAttribute(const core::Attribute<T> &attribute, const int rank);

bool IsCompressionAvailable(const std::string &method,
const std::string &type, const Dims &count);

std::shared_ptr<std::vector<char>> m_Buffer;
nlohmann::json m_Metadata;
std::vector<char> m_CompressBuffer;
size_t m_Position = 0;
bool m_IsRowMajor;
bool m_IsLittleEndian;
};

} // end namespace format
Expand Down
28 changes: 23 additions & 5 deletions source/adios2/toolkit/format/dataman/DataManSerializer.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace format
template <class T>
void DataManSerializer::Put(const core::Variable<T> &variable,
const std::string &doid, const size_t step,
const int rank, std::string address,
const int rank, const std::string &address,
const Params &params)
{
Put(variable.GetData(), variable.m_Name, variable.m_Shape, variable.m_Start,
Expand All @@ -47,7 +47,7 @@ void DataManSerializer::Put(const T *inputData, const std::string &varName,
const Dims &varShape, const Dims &varStart,
const Dims &varCount, const std::string &doid,
const size_t step, const int rank,
std::string address, const Params &params)
const std::string &address, const Params &params)
{

nlohmann::json metaj;
Expand All @@ -56,7 +56,6 @@ void DataManSerializer::Put(const T *inputData, const std::string &varName,
metaj["O"] = varStart;
metaj["C"] = varCount;
metaj["S"] = varShape;
metaj["T"] = step;
metaj["D"] = doid;
metaj["M"] = m_IsRowMajor;
metaj["E"] = m_IsLittleEndian;
Expand Down Expand Up @@ -126,7 +125,7 @@ void DataManSerializer::Put(const T *inputData, const std::string &varName,

if (m_Buffer->capacity() < m_Position + datasize)
{
m_Buffer->reserve(m_Buffer->capacity() * 2);
m_Buffer->reserve((m_Position + datasize) * 2);
}

m_Buffer->resize(m_Position + datasize);
Expand All @@ -142,7 +141,7 @@ void DataManSerializer::Put(const T *inputData, const std::string &varName,
}
m_Position += datasize;

m_Metadata[std::to_string(step)][std::to_string(rank)].push_back(metaj);
m_Metadata[std::to_string(step)][std::to_string(rank)].emplace_back(metaj);
}

template <class T>
Expand Down Expand Up @@ -263,6 +262,25 @@ bool DataManSerializer::BZip2(nlohmann::json &metaj, size_t &datasize,
return false;
}

template <class T>
void DataManSerializer::PutAttribute(const core::Attribute<T> &attribute,
const int rank)
{
m_Metadata["A"][std::to_string(rank)].emplace_back();
auto &j = m_Metadata["A"][std::to_string(rank)].back();
j["N"] = attribute.m_Name;
j["Y"] = attribute.m_Type;
j["V"] = attribute.m_IsSingleValue;
if (attribute.m_IsSingleValue)
{
j["G"] = attribute.m_DataSingleValue;
}
else
{
j["G"] = attribute.m_DataArray;
}
}

} // namespace format
} // namespace adios2

Expand Down
4 changes: 4 additions & 0 deletions testing/adios2/engine/dataman/TestDataMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ void DataManWriter(const Dims &shape, const Dims &start, const Dims &count,
"bpComplexes", shape, start, count);
auto bpDComplexes = dataManIO.DefineVariable<std::complex<double>>(
"bpDComplexes", shape, start, count);
dataManIO.DefineAttribute<int>("AttInt", 110);
adios2::Engine dataManWriter =
dataManIO.Open("stream", adios2::Mode::Write);
for (int i = 0; i < steps; ++i)
Expand Down Expand Up @@ -383,6 +384,9 @@ void DataManReaderP2P(const Dims &shape, const Dims &start, const Dims &count,
break;
}
}
auto attInt = dataManIO.InquireAttribute<int>("AttInt");
ASSERT_EQ(110, attInt.Data()[0]);
ASSERT_NE(111, attInt.Data()[0]);
ASSERT_EQ(i, steps);
dataManReader.Close();
print_lines = 0;
Expand Down