Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable memory selection in ssc reader #3313

Merged
merged 4 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions source/adios2/engine/ssc/SscHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ void DeserializeVariable(const Buffer &input, const ShapeID shapeId,
else
{
helper::Throw<std::runtime_error>("Engine", "SscHelper",
"Deserialize",
"DeserializeVariable",
"unknown variable data type");
}
}
Expand Down Expand Up @@ -416,7 +416,7 @@ void DeserializeAttribute(const Buffer &input, uint64_t &pos, IO &io,
else
{
helper::Throw<std::runtime_error>(
"Engine", "SscHelper", "Deserialize",
"Engine", "SscHelper", "DeserializeAttribute",
"unknown attribute data type");
}
}
Expand All @@ -428,6 +428,10 @@ void SerializeStructDefinitions(
const std::unordered_map<std::string, StructDefinition> &definitions,
Buffer &output)
{
if (definitions.empty())
{
return;
}
uint64_t pos = output.value<uint64_t>();
output[pos] = 65;
++pos;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/engine/ssc/SscHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ struct BlockInfo
Dims shape;
Dims start;
Dims count;
Dims memStart;
Dims memCount;
size_t bufferStart;
size_t bufferCount;
std::vector<char> value;
Expand Down
32 changes: 24 additions & 8 deletions source/adios2/engine/ssc/SscReaderGeneric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,11 @@ void SscReaderGeneric::PerformGets()
reinterpret_cast<char *>(br.data),
helper::CoreDims(br.start),
helper::CoreDims(br.count), true, true,
static_cast<int>(b.elementSize));
static_cast<int>(b.elementSize),
helper::CoreDims(b.start),
helper::CoreDims(b.count),
helper::CoreDims(br.memStart),
helper::CoreDims(br.memCount));
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
Expand Down Expand Up @@ -442,12 +446,16 @@ void SscReaderGeneric::GetDeferredDeltaCommon(VariableBase &variable,
Dims vStart = variable.m_Start;
Dims vCount = variable.m_Count;
Dims vShape = variable.m_Shape;
Dims vMemStart = variable.m_MemoryStart;
Dims vMemCount = variable.m_MemoryCount;

if (m_IO.m_ArrayOrder != ArrayOrdering::RowMajor)
{
std::reverse(vStart.begin(), vStart.end());
std::reverse(vCount.begin(), vCount.end());
std::reverse(vShape.begin(), vShape.end());
std::reverse(vMemStart.begin(), vMemStart.end());
std::reverse(vMemCount.begin(), vMemCount.end());
}

m_LocalReadPattern.emplace_back();
Expand All @@ -459,6 +467,8 @@ void SscReaderGeneric::GetDeferredDeltaCommon(VariableBase &variable,
b.start = vStart;
b.count = vCount;
b.shape = vShape;
b.memStart = vMemStart;
b.memCount = vMemCount;
b.bufferStart = 0;
b.bufferCount = 0;
b.data = data;
Expand Down Expand Up @@ -507,12 +517,16 @@ void SscReaderGeneric::GetDeferred(VariableBase &variable, void *data)
helper::DimsArray vStart(variable.m_Start);
helper::DimsArray vCount(variable.m_Count);
helper::DimsArray vShape(variable.m_Shape);
helper::DimsArray vMemStart(variable.m_MemoryStart);
helper::DimsArray vMemCount(variable.m_MemoryCount);

if (m_IO.m_ArrayOrder != ArrayOrdering::RowMajor)
{
std::reverse(vStart.begin(), vStart.end());
std::reverse(vCount.begin(), vCount.end());
std::reverse(vShape.begin(), vShape.end());
std::reverse(vMemStart.begin(), vMemStart.end());
std::reverse(vMemCount.begin(), vMemCount.end());
}

if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false ||
Expand Down Expand Up @@ -546,13 +560,15 @@ void SscReaderGeneric::GetDeferred(VariableBase &variable, void *data)
if (b.shapeId == ShapeID::GlobalArray ||
b.shapeId == ShapeID::LocalArray)
{
helper::NdCopy(
m_Buffer.data<char>() + b.bufferStart,
helper::CoreDims(b.start),
helper::CoreDims(b.count), true, true,
reinterpret_cast<char *>(data), vStart, vCount,
true, true,
static_cast<int>(variable.m_ElementSize));
helper::NdCopy(m_Buffer.data<char>() + b.bufferStart,
helper::CoreDims(b.start),
helper::CoreDims(b.count), true, true,
reinterpret_cast<char *>(data), vStart,
vCount, true, true,
static_cast<int>(variable.m_ElementSize),
helper::CoreDims(b.start),
helper::CoreDims(b.count), vMemStart,
vMemCount);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
Expand Down
10 changes: 7 additions & 3 deletions source/adios2/engine/ssc/SscReaderNaive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,15 @@ void SscReaderNaive::GetDeferred(VariableBase &variable, void *data)

helper::DimsArray vStart(variable.m_Start);
helper::DimsArray vCount(variable.m_Count);
// Dims vShape = variable.m_Shape;
helper::DimsArray vMemStart(variable.m_MemoryStart);
helper::DimsArray vMemCount(variable.m_MemoryCount);

if (m_IO.m_ArrayOrder != ArrayOrdering::RowMajor)
{
std::reverse(vStart.begin(), vStart.end());
std::reverse(vCount.begin(), vCount.end());
// std::reverse(vShape.begin(), vShape.end());
std::reverse(vMemStart.begin(), vMemStart.end());
std::reverse(vMemCount.begin(), vMemCount.end());
}

for (const auto &b : m_BlockMap[variable.m_Name])
Expand All @@ -200,7 +202,9 @@ void SscReaderNaive::GetDeferred(VariableBase &variable, void *data)
helper::CoreDims(b.start), helper::CoreDims(b.count),
true, true, reinterpret_cast<char *>(data), vStart,
vCount, true, true,
static_cast<int>(variable.m_ElementSize));
static_cast<int>(variable.m_ElementSize),
helper::CoreDims(b.start), helper::CoreDims(b.count),
vMemStart, vMemCount);
}
else if (b.shapeId == ShapeID::GlobalValue ||
b.shapeId == ShapeID::LocalValue)
Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/engine/ssc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ if(ADIOS2_HAVE_MPI AND NOT MSVC)
gtest_add_tests_helper(Base MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscBase.MPI "" TRUE)

gtest_add_tests_helper(3DMemSelect MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSsc3DMemSelect.MPI "" TRUE)

gtest_add_tests_helper(Struct MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscStruct.MPI "" TRUE)

Expand Down
205 changes: 205 additions & 0 deletions testing/adios2/engine/ssc/TestSsc3DMemSelect.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* TestSsc3DMemSelect.cpp
*
* Created on: Aug 12, 2022
* Author: Jason Wang
*/

#include <adios2.h>
#include <gtest/gtest.h>
#include <numeric>
#include <thread>

using namespace adios2;
int mpiRank = 0;
int mpiSize = 1;
MPI_Comm mpiComm;

size_t steps = 100;

Dims shape = {4, 4, 4};
std::vector<int> global_data = {0, 1, 2, 3, 10, 11, 12, 13,
20, 21, 22, 23, 30, 31, 32, 33,

100, 101, 102, 103, 110, 111, 112, 113,
120, 121, 122, 123, 130, 131, 132, 133,

200, 201, 202, 203, 210, 211, 212, 213,
220, 221, 222, 223, 230, 231, 232, 233,

300, 301, 302, 303, 310, 311, 312, 313,
320, 321, 322, 323, 330, 331, 332, 333};

Dims start = {1, 2, 1};
Dims count = {2, 1, 2};
std::vector<int> writer_data = {121, 122, 221, 222};

Dims memStart = {0, 1, 1};
Dims memCount = {3, 3, 3};
std::vector<int> reader_data = {11, 12, 13, 21, 22, 23, 31, 32, 33,

111, 112, 113, 121, 122, 123, 131, 132, 133,

211, 212, 213, 221, 222, 223, 231, 232, 233};

class SscEngineTest : public ::testing::Test
{
public:
SscEngineTest() = default;
};

template <class T>
void PrintData(const T *data, const size_t step, const Dims &start,
const Dims &count)
{
size_t size = std::accumulate(count.begin(), count.end(), 1,
std::multiplies<size_t>());
std::cout << "Step: " << step << " Size:" << size << "\n";
size_t printsize = 128;

if (size < printsize)
{
printsize = size;
}
int s = 0;
for (size_t i = 0; i < printsize; ++i)
{
++s;
std::cout << data[i] << " ";
if (s == count[1])
{
std::cout << std::endl;
s = 0;
}
}

std::cout << "]" << std::endl;
}

void VerifyData(const int *data, size_t step, const Dims &start,
const Dims &count, const Dims &shape)
{
size_t size = std::accumulate(count.begin(), count.end(), 1,
std::multiplies<size_t>());
bool compressed = false;
for (size_t i = 0; i < size; ++i)
{
if (!compressed)
{
ASSERT_EQ(data[i], reader_data[i]);
}
}
}

void Writer(const Params &engineParams)
{
adios2::ADIOS adios(mpiComm);
adios2::IO io = adios.DeclareIO("io");
io.SetEngine("ssc");
io.SetParameters(engineParams);
auto varInts = io.DefineVariable<int>("varInts", shape, start, count);
adios2::Engine engine = io.Open("stream", adios2::Mode::Write);
for (size_t i = 0; i < steps; ++i)
{
engine.BeginStep();
engine.Put(varInts, writer_data.data(), adios2::Mode::Sync);
engine.EndStep();
}
engine.Close();
}

void Reader(const Params &engineParams)
{
adios2::ADIOS adios(mpiComm);
adios2::IO io = adios.DeclareIO("io");
io.SetEngine("ssc");
io.SetParameters(engineParams);
adios2::Engine engine = io.Open("stream", adios2::Mode::Read);
std::vector<int> myInts = reader_data;
while (true)
{
adios2::StepStatus status = engine.BeginStep(StepMode::Read, 5);
if (status == adios2::StepStatus::OK)
{
const auto &vars = io.AvailableVariables();
ASSERT_EQ(vars.size(), 1);
size_t currentStep = engine.CurrentStep();
adios2::Variable<int> varInts = io.InquireVariable<int>("varInts");
varInts.SetSelection({start, count});
varInts.SetMemorySelection({memStart, memCount});
engine.Get(varInts, myInts.data(), adios2::Mode::Sync);
VerifyData(myInts.data(), currentStep, memStart, memCount, shape);
engine.EndStep();
}
else if (status == adios2::StepStatus::EndOfStream)
{
break;
}
else if (status == adios2::StepStatus::NotReady)
{
continue;
}
}
engine.Close();
}

TEST_F(SscEngineTest, TestSsc3DMemSelect)
{
{
adios2::Params engineParams = {{"Verbose", "0"}};
int worldRank, worldSize;
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
int mpiGroup = worldRank / (worldSize / 2);
MPI_Comm_split(MPI_COMM_WORLD, mpiGroup, worldRank, &mpiComm);
MPI_Comm_rank(mpiComm, &mpiRank);
MPI_Comm_size(mpiComm, &mpiSize);
if (mpiGroup == 0)
{
Writer(engineParams);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (mpiGroup == 1)
{
Reader(engineParams);
}
MPI_Barrier(MPI_COMM_WORLD);
}
{
adios2::Params engineParams = {{"Verbose", "0"},
{"EngineMode", "naive"}};
int worldRank, worldSize;
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
int mpiGroup = worldRank / (worldSize / 2);
MPI_Comm_split(MPI_COMM_WORLD, mpiGroup, worldRank, &mpiComm);
MPI_Comm_rank(mpiComm, &mpiRank);
MPI_Comm_size(mpiComm, &mpiSize);
if (mpiGroup == 0)
{
Writer(engineParams);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
if (mpiGroup == 1)
{
Reader(engineParams);
}
MPI_Barrier(MPI_COMM_WORLD);
}
}

int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
int worldRank, worldSize;
MPI_Comm_rank(MPI_COMM_WORLD, &worldRank);
MPI_Comm_size(MPI_COMM_WORLD, &worldSize);
::testing::InitGoogleTest(&argc, argv);
int result = RUN_ALL_TESTS();

MPI_Finalize();
return result;
}