Skip to content

Commit

Permalink
Merge pull request #2019 from JasonRuonanWang/ssc-multistream
Browse files Browse the repository at this point in the history
Add MpiHandshake to enable XGC-COUPLER-GENE communication pattern
  • Loading branch information
JasonRuonanWang authored Mar 6, 2020
2 parents 4e57dba + 70baa4d commit ddaeeb0
Show file tree
Hide file tree
Showing 10 changed files with 539 additions and 503 deletions.
1 change: 1 addition & 0 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ if(ADIOS2_HAVE_MPI)
target_sources(adios2 PRIVATE
core/IOMPI.cpp
helper/adiosCommMPI.h helper/adiosCommMPI.cpp
helper/adiosMpiHandshake.h helper/adiosMpiHandshake.cpp
engine/insitumpi/InSituMPIWriter.cpp engine/insitumpi/InSituMPIWriter.tcc
engine/insitumpi/InSituMPIReader.cpp engine/insitumpi/InSituMPIReader.tcc
engine/insitumpi/InSituMPIFunctions.cpp engine/insitumpi/InSituMPISchedules.cpp
Expand Down
39 changes: 39 additions & 0 deletions source/adios2/engine/ssc/SscHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,45 @@ void PrintMpiInfo(const MpiInfo &writersInfo, const MpiInfo &readersInfo)
std::cout << std::endl;
}

bool GetParameter(const Params &params, const std::string &key, int &value)
{
auto it = params.find(key);
if (it == params.end())
{
return false;
}
else
{
try
{
value = std::stoi(it->second);
}
catch (...)
{
std::string error =
"Engine parameter " + key + " can only be integer numbers";
std::cerr << error << std::endl;
return false;
}
}
return true;
}

bool GetParameter(const Params &params, const std::string &key,
std::string &value)
{
auto it = params.find(key);
if (it == params.end())
{
return false;
}
else
{
value = it->second;
}
return true;
}

} // end namespace ssc
} // end namespace engine
} // end namespace core
Expand Down
4 changes: 4 additions & 0 deletions source/adios2/engine/ssc/SscHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output);

bool AreSameDims(const Dims &a, const Dims &b);

bool GetParameter(const Params &params, const std::string &key, int &value);
bool GetParameter(const Params &params, const std::string &key,
std::string &value);

} // end namespace ssc
} // end namespace engine
} // end namespace core
Expand Down
274 changes: 26 additions & 248 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "SscReader.tcc"
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosCommMPI.h"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/helper/adiosJSONcomplex.h"
#include "nlohmann/json.hpp"
Expand All @@ -31,24 +32,15 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
m_ReaderRank = m_Comm.Rank();
m_ReaderSize = m_Comm.Size();

auto it = m_IO.m_Parameters.find("MpiMode");
if (it != m_IO.m_Parameters.end())
{
m_MpiMode = it->second;
}
it = m_IO.m_Parameters.find("Verbose");
if (it != m_IO.m_Parameters.end())
{
try
{
m_Verbosity = std::stoi(it->second);
}
catch (...)
{
std::cerr << "Engine parameter Verbose can only be integer numbers"
<< std::endl;
}
}
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength",
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp",
m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs);

m_Buffer.resize(1);

Expand Down Expand Up @@ -116,12 +108,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
{
TAU_SCOPED_TIMER_FUNC();

if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}

if (m_InitialStep)
{
m_InitialStep = false;
Expand Down Expand Up @@ -153,6 +139,13 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
}
}

if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << ", Step "
<< m_CurrentStep << std::endl;
}

if (m_Buffer[0] == 1)
{
return StepStatus::EndOfStream;
Expand Down Expand Up @@ -191,239 +184,29 @@ void SscReader::EndStep()

void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();

if (m_Verbosity >= 5)
{
std::cout << "SscReader::SyncMpiPattern, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}

TAU_SCOPED_TIMER_FUNC();
if (m_WorldSize == m_ReaderSize)
{
throw(std::runtime_error("no writers are found"));
}

std::vector<int> lrbuf;
std::vector<int> grbuf;
m_MpiHandshake.Handshake(m_Name, 'r', m_OpenTimeoutSecs, m_MaxStreamsPerApp,
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));

// Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and
// find out all the m_WriterRank == 0
if (m_WorldRank == 0)
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
grbuf.resize(m_WorldSize);
}

MPI_Gather(&m_ReaderRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0,
MPI_COMM_WORLD);

std::vector<int> AppStart; // m_WorldRank of the local rank 0 process
if (m_WorldRank == 0)
{
for (int i = 0; i < m_WorldSize; ++i)
{
if (grbuf[i] == 0)
{
AppStart.push_back(i);
}
}
m_AppSize = AppStart.size();
}

// Each local rank 0 process send their type (0 for writer, 1 for reader) to
// the world rank 0 process The AppStart are re-ordered to put all writers
// ahead of all the readers.
std::vector<int>
AppType; // Vector to record the type of the local rank 0 process
if (m_ReaderRank == 0) // Send type from each local rank 0 process to the
// world rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
AppType.resize(m_AppSize);
for (int i = 0; i < m_AppSize; ++i)
{
if (i == 0)
{
AppType[i] = 1;
;
}
else
{
int tmp = 1;
MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
AppType[i] = tmp;
}
}
}
else
{
int tmp = 1; // type 1 for reader
MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); //
}
}

if (m_WorldRank == 0)
{
std::vector<int> AppWriter;
std::vector<int> AppReader;

for (int i = 0; i < m_AppSize; ++i)
{
if (AppType[i] == 0)
{
AppWriter.push_back(AppStart[i]);
}
else
{
AppReader.push_back(AppStart[i]);
}
}
m_WriterGlobalMpiInfo.resize(AppWriter.size());
m_ReaderGlobalMpiInfo.resize(AppReader.size());
AppStart = AppWriter;
AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end());
}

// Send the m_AppSize and m_AppID to each local rank 0 process
if (m_ReaderRank == 0) // Send m_AppID to each local rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_AppSize; ++i)
{
MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); //
}
}
else
{
MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}

m_Comm.Bcast(&m_AppID, sizeof(int),
0); // Local rank 0 process broadcast the m_AppID within the
// local communicator.

MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize

// In each local communicator, each local rank 0 process gathers the world
// rank of all the rest local processes.
if (m_ReaderRank == 0)
{
lrbuf.resize(m_ReaderSize);
}

m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0);

// Send the WorldRank vector of each local communicator to the m_WorldRank
// == 0 process.
int WriterInfoSize = 0;
int ReaderInfoSize = 0;
if (m_ReaderRank == 0)
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i)
{
if (i == 0)
{
m_WriterGlobalMpiInfo[i] = lrbuf;
++WriterInfoSize;
}
else
{
int j_writersize;
MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++WriterInfoSize;

m_WriterGlobalMpiInfo[i].resize(j_writersize);
MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize,
MPI_INT, AppStart[i], 98, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}

for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i)
{
if (i == 0)
{
m_ReaderGlobalMpiInfo[i] = lrbuf;
++ReaderInfoSize;
}
else
{
int j_readersize;
MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++ReaderInfoSize;

m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.resize(j_readersize);
MPI_Recv(
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.data(),
j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
}
else
{
MPI_Send(&m_ReaderSize, 1, MPI_INT, 0, 95, MPI_COMM_WORLD);
MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 97,
MPI_COMM_WORLD);
}
}

// Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the
// processes.
MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast writerinfo size
MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD);

m_WriterGlobalMpiInfo.resize(WriterInfoSize);
m_ReaderGlobalMpiInfo.resize(ReaderInfoSize);

for (int i = 0; i < WriterInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_WriterGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}

for (int i = 0; i < ReaderInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_ReaderGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_ReaderGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}

for (const auto &app : m_WriterGlobalMpiInfo)
{
for (int rank : app)
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
}
}

for (const auto &app : m_ReaderGlobalMpiInfo)
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app)
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
}
Expand All @@ -433,11 +216,6 @@ void SscReader::SyncMpiPattern()
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
&m_MpiAllWritersGroup);

if (m_Verbosity >= 10 and m_WorldRank == 0)
{
ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo);
}
}

void SscReader::SyncWritePattern()
Expand Down
Loading

0 comments on commit ddaeeb0

Please sign in to comment.