Skip to content

Commit

Permalink
Merge pull request #2408 from JasonRuonanWang/ssc-mpi-join
Browse files Browse the repository at this point in the history
refined mpi handshake in preparation for adding mpi join for ssc
  • Loading branch information
JasonRuonanWang authored Aug 4, 2020
2 parents 5607d31 + c058098 commit bce597a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 30 deletions.
17 changes: 6 additions & 11 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,14 @@ void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();

auto appRankMaps =
helper::Handshake(m_Name, 'r', m_OpenTimeoutSecs, CommAsMPI(m_Comm));

MPI_Group worldGroup;
MPI_Group streamGroup;
MPI_Group readerGroup;
MPI_Comm writerComm;
MPI_Comm readerComm;

MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, appRankMaps[1].size(), appRankMaps[1].data(),
&m_MpiAllWritersGroup);
MPI_Group_incl(worldGroup, appRankMaps[0].size(), appRankMaps[0].data(),
&streamGroup);

MPI_Comm_create_group(MPI_COMM_WORLD, streamGroup, 0, &m_StreamComm);
helper::HandshakeComm(m_Name, 'r', m_OpenTimeoutSecs, CommAsMPI(m_Comm),
streamGroup, m_MpiAllWritersGroup, readerGroup,
m_StreamComm, writerComm, readerComm);
}

bool SscReader::SyncWritePattern()
Expand Down
17 changes: 6 additions & 11 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,19 +186,14 @@ void SscWriter::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();

auto appRankMaps =
helper::Handshake(m_Name, 'w', m_OpenTimeoutSecs, CommAsMPI(m_Comm));

MPI_Group worldGroup;
MPI_Group streamGroup;
MPI_Group writerGroup;
MPI_Comm writerComm;
MPI_Comm readerComm;

MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, appRankMaps[2].size(), appRankMaps[2].data(),
&m_MpiAllReadersGroup);
MPI_Group_incl(worldGroup, appRankMaps[0].size(), appRankMaps[0].data(),
&streamGroup);

MPI_Comm_create_group(MPI_COMM_WORLD, streamGroup, 0, &m_StreamComm);
helper::HandshakeComm(m_Name, 'w', m_OpenTimeoutSecs, CommAsMPI(m_Comm),
streamGroup, writerGroup, m_MpiAllReadersGroup,
m_StreamComm, writerComm, readerComm);
}

void SscWriter::SyncWritePattern(bool finalStep)
Expand Down
30 changes: 26 additions & 4 deletions source/adios2/helper/adiosMpiHandshake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,32 @@ namespace adios2
namespace helper
{

const std::vector<std::vector<int>> Handshake(const std::string &filename,
const char mode,
const int timeoutSeconds,
MPI_Comm localComm)
#ifndef _WIN32
void HandshakeComm(const std::string &filename, const char mode,
const int timeoutSeconds, MPI_Comm localComm,
MPI_Group &streamGroup, MPI_Group &writerGroup,
MPI_Group &readerGroup, MPI_Comm &streamComm,
MPI_Comm &writerComm, MPI_Comm &readerComm)
{
auto appRankMaps = HandshakeRank(filename, mode, timeoutSeconds, localComm);
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, static_cast<int>(appRankMaps[0].size()),
appRankMaps[0].data(), &streamGroup);
MPI_Group_incl(worldGroup, static_cast<int>(appRankMaps[1].size()),
appRankMaps[1].data(), &writerGroup);
MPI_Group_incl(worldGroup, static_cast<int>(appRankMaps[2].size()),
appRankMaps[2].data(), &readerGroup);
MPI_Comm_create_group(MPI_COMM_WORLD, streamGroup, 0, &streamComm);
MPI_Comm_create_group(MPI_COMM_WORLD, writerGroup, 0, &writerComm);
MPI_Comm_create_group(MPI_COMM_WORLD, readerGroup, 0, &readerComm);
}
#endif

const std::vector<std::vector<int>> HandshakeRank(const std::string &filename,
const char mode,
const int timeoutSeconds,
MPI_Comm localComm)
{
std::vector<std::vector<int>> ret(3);

Expand Down
16 changes: 12 additions & 4 deletions source/adios2/helper/adiosMpiHandshake.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,18 @@ namespace helper
* for stream *filename*. [1] is the vector of all writer ranks for stream
* *filename*. [2] is the vector of all reader ranks for stream *filename*.
*/
const std::vector<std::vector<int>> Handshake(const std::string &filename,
const char mode,
const int timeoutSeconds,
MPI_Comm localComm);
const std::vector<std::vector<int>> HandshakeRank(const std::string &filename,
const char mode,
const int timeoutSeconds,
MPI_Comm localComm);

#ifndef _WIN32
void HandshakeComm(const std::string &filename, const char mode,
const int timeoutSeconds, MPI_Comm localComm,
MPI_Group &streamGroup, MPI_Group &writerGroup,
MPI_Group &readerGroup, MPI_Comm &streamComm,
MPI_Comm &writerComm, MPI_Comm &readerComm);
#endif

} // end namespace helper
} // end namespace adios2
Expand Down

0 comments on commit bce597a

Please sign in to comment.