Skip to content

Commit

Permalink
Fix an issue when reading blocks in streaming mode, when some produce…
Browse files Browse the repository at this point in the history
…rs did not write any block to output. (#4332)
  • Loading branch information
pnorbert authored Aug 30, 2024
1 parent fa5d3aa commit 75d801f
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 1 deletion.
2 changes: 1 addition & 1 deletion source/adios2/toolkit/format/bp5/BP5Deserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1628,7 +1628,7 @@ BP5Deserializer::GenerateReadRequests(const bool doAllocTempBuffers, size_t *max
MetaArrayRecOperator *writer_meta_base =
(MetaArrayRecOperator *)GetMetadataBase((struct BP5VarRec *)Req->VarRec,
Step, WriterRank);
if (!writer_meta_base)
if (!writer_meta_base || !writer_meta_base->BlockCount)
{
continue; // Not writen on this step
}
Expand Down
194 changes: 194 additions & 0 deletions testing/adios2/engine/bp/TestBPWriteReadLocalVariables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1798,6 +1798,200 @@ TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocal2DChangeCount)
}
}

template <class T>
void print_vector(std::vector<T> &v)
{
std::cout << "{ ";
for (auto &e : v)
{
std::cout << e << " ";
}
std::cout << "}" << std::endl;
}

template <class T, size_t N>
void print_array(std::array<T, N> &a)
{
std::cout << "{ ";
for (auto &e : a)
{
std::cout << e << " ";
}
std::cout << "}" << std::endl;
}

TEST_F(BPWriteReadLocalVariables, ADIOS2BPWriteReadLocalVaryingNumberOfBlocks)
{
/* Write different number of blocks per step, skipping some rank in some steps.
Each block size is different too. Test reading each block properly.
Only the first 8 MPI ranks do work, the others just loop empty.
*/
constexpr int NSTEPS = 3;
constexpr int MAXMPISIZE = 8;
const int blocks[NSTEPS][MAXMPISIZE] = {
{1, 1, 1, 1, 1, 1, 1, 1},
{1, 0, 1, 0, 1, 0, 1, 0},
{0, 1, 0, 1, 0, 1, 0, 1},
};
const size_t blocksizes[NSTEPS][MAXMPISIZE] = {
{9, 8, 7, 6, 5, 6, 7, 3},
{2, 0, 4, 0, 6, 0, 9, 0},
{0, 1, 0, 4, 0, 7, 0, 5},
};

int mpiRank = 0;
int mpiSize = 1;
#if ADIOS2_USE_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
const std::string fname("BPWRLocalVaryingNumberOfBlocks_" + engineName + "_MPI.bp");
#else
const std::string fname("BPWRLocalVaryingNumberOfBlocks_" + engineName + ".bp");
#endif

#if ADIOS2_USE_MPI
adios2::ADIOS adios(MPI_COMM_WORLD);
#else
adios2::ADIOS adios;
#endif
{
adios2::IO io = adios.DeclareIO("TestIO");
const adios2::Dims shape{};
const adios2::Dims start{};
const adios2::Dims count{(size_t)1};

auto var_r32 = io.DefineVariable<float>("r32", shape, start, count);

adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

for (int step = 0; step < NSTEPS; ++step)
{
std::cout << "--- generateNewSmallTestData Step " << step << " rank " << mpiRank
<< " nprocs " << mpiSize << " ---" << std::endl;
SmallTestData currentTestData =
generateNewSmallTestData(m_TestData, static_cast<int>(step), mpiRank, mpiSize);
print_array(currentTestData.R32);

int bpos = (mpiRank < MAXMPISIZE ? mpiRank : -1);

bpWriter.BeginStep();
if (bpos >= 0 && blocks[step][bpos])
{
size_t nx = blocksizes[step][bpos];
var_r32.SetSelection({{}, {nx}});
bpWriter.Put(var_r32, currentTestData.R32.data());
}
bpWriter.EndStep();
}
bpWriter.Close();
}
#if ADIOS2_USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

if (!mpiRank)
{
adios2::IO io = adios.DeclareIO("ReaderIO");
#if ADIOS2_USE_MPI
adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read, MPI_COMM_SELF);
#else
adios2::Engine bpReader = io.Open(fname, adios2::Mode::Read);
#endif

for (int step = 0; step < NSTEPS; ++step)
{
std::cout << "====== Read Step " << step << " ==========" << std::endl;
bpReader.BeginStep();
auto var_r32 = io.InquireVariable<float>("r32");

size_t nblocks = 0;
int nproc = (mpiSize < MAXMPISIZE ? mpiSize : MAXMPISIZE);
for (size_t b = 0; b < static_cast<size_t>(nproc); ++b)
{
if (blocks[step][b] == 0)
{
continue;
}

std::cout << " --- generateNewSmallTestData Step " << step << " rank " << mpiRank
<< " nprocs " << mpiSize << " b=" << b << " nblocks=" << nblocks << " ---"
<< std::endl;
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), static_cast<int>(b), mpiSize);
print_array(currentTestData.R32);

var_r32.SetBlockSelection(nblocks);
std::vector<float> dataIn;
bpReader.Get(var_r32, dataIn, adios2::Mode::Sync);
EXPECT_EQ(dataIn.size(), blocksizes[step][b]);

for (size_t i = 0; i < dataIn.size(); ++i)
{
EXPECT_EQ(dataIn[i], currentTestData.R32[i]);
}

++nblocks;
}
bpReader.EndStep();
}
bpReader.Close();
}
#if ADIOS2_USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

if (!mpiRank)
{
adios2::IO io = adios.DeclareIO("ReaderIORRA");
#if ADIOS2_USE_MPI
adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess, MPI_COMM_SELF);
#else
adios2::Engine bpReader = io.Open(fname, adios2::Mode::ReadRandomAccess);
#endif

auto var_r32 = io.InquireVariable<float>("r32");

for (int step = 0; step < NSTEPS; ++step)
{
std::cout << "====== Read Step " << step << " ==========" << std::endl;
var_r32.SetStepSelection({step, 1});

size_t nblocks = 0;
int nproc = (mpiSize < MAXMPISIZE ? mpiSize : MAXMPISIZE);
for (size_t b = 0; b < static_cast<size_t>(nproc); ++b)
{
if (blocks[step][b] == 0)
{
continue;
}

std::cout << " --- generateNewSmallTestData Step " << step << " rank " << mpiRank
<< " nprocs " << mpiSize << " b=" << b << " nblocks=" << nblocks << " ---"
<< std::endl;
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), static_cast<int>(b), mpiSize);
print_array(currentTestData.R32);

var_r32.SetBlockSelection(nblocks);
std::vector<float> dataIn;
bpReader.Get(var_r32, dataIn, adios2::Mode::Sync);
EXPECT_EQ(dataIn.size(), blocksizes[step][b]);

for (size_t i = 0; i < dataIn.size(); ++i)
{
EXPECT_EQ(dataIn[i], currentTestData.R32[i]);
}

++nblocks;
}
}
bpReader.Close();
}
#if ADIOS2_USE_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif
}

int main(int argc, char **argv)
{
#if ADIOS2_USE_MPI
Expand Down

0 comments on commit 75d801f

Please sign in to comment.