diff --git a/docs/user_guide/source/engines/ssc.rst b/docs/user_guide/source/engines/ssc.rst index b5b8a555a4..24578e6ccf 100644 --- a/docs/user_guide/source/engines/ssc.rst +++ b/docs/user_guide/source/engines/ssc.rst @@ -10,7 +10,7 @@ The SSC engine takes the following parameters: 2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait. -3. ``Threading``: Default **False**. SSC will use threads to hide the time cost of metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. +3. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors. =============================== ================== ================================================ **Key** **Value Format** **Default** and Examples diff --git a/source/adios2/engine/ssc/SscReader.cpp b/source/adios2/engine/ssc/SscReader.cpp index 24083d53fe..a645849f49 100644 --- a/source/adios2/engine/ssc/SscReader.cpp +++ b/source/adios2/engine/ssc/SscReader.cpp @@ -31,6 +31,7 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode); helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity); + helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading); helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs); @@ -43,6 +44,49 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode, SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); } +void SscReader::BeginStepConsequentFixed() +{ + if (m_MpiMode == "twosided") + { + MPI_Waitall(static_cast(m_MpiRequests.size()), + m_MpiRequests.data(), MPI_STATUS_IGNORE); + m_MpiRequests.clear(); + } + else if (m_MpiMode == "onesidedfencepush") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpush") + { + MPI_Win_wait(m_MpiWin); + } + else if (m_MpiMode == "onesidedfencepull") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpull") + { + MPI_Win_complete(m_MpiWin); + } +} + +void SscReader::BeginStepFlexible(StepStatus &status) +{ + m_AllReceivingWriterRanks.clear(); + m_Buffer.resize(1, 0); + m_GlobalWritePattern.clear(); + m_GlobalWritePattern.resize(m_StreamSize); + m_LocalReadPattern.clear(); + m_GlobalWritePatternJson.clear(); + bool finalStep = SyncWritePattern(); + if (finalStep) + { + status = StepStatus::EndOfStream; + return; + } + MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin); +} + StepStatus SscReader::BeginStep(const StepMode stepMode, const float timeoutSeconds) { @@ -62,45 +106,23 @@ StepStatus SscReader::BeginStep(const StepMode stepMode, if (m_CurrentStep == 0 || m_WriterDefinitionsLocked == false || m_ReaderSelectionsLocked == false) { - m_AllReceivingWriterRanks.clear(); - m_Buffer.resize(1, 0); - m_GlobalWritePattern.clear(); - m_GlobalWritePattern.resize(m_StreamSize); - m_LocalReadPattern.clear(); - m_GlobalWritePatternJson.clear(); - bool finalStep = SyncWritePattern(); - if (finalStep) - { - return StepStatus::EndOfStream; - } - - MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin); - } - else - { - if (m_MpiMode == "twosided") + if (m_Threading && m_EndStepThread.joinable()) { - MPI_Waitall(static_cast(m_MpiRequests.size()), - m_MpiRequests.data(), MPI_STATUS_IGNORE); - m_MpiRequests.clear(); + m_EndStepThread.join(); } - else if (m_MpiMode == "onesidedfencepush") + else { - MPI_Win_fence(0, m_MpiWin); + BeginStepFlexible(m_StepStatus); } - else if (m_MpiMode == "onesidedpostpush") + if (m_StepStatus == StepStatus::EndOfStream) { - MPI_Win_wait(m_MpiWin); - } - else if (m_MpiMode == "onesidedfencepull") - { - MPI_Win_fence(0, m_MpiWin); - } - else if (m_MpiMode == "onesidedpostpull") - { - MPI_Win_complete(m_MpiWin); + return StepStatus::EndOfStream; } } + else + { + BeginStepConsequentFixed(); + } for (const auto &r : m_GlobalWritePattern) { @@ -254,6 +276,75 @@ void SscReader::PerformGets() size_t SscReader::CurrentStep() const { return m_CurrentStep; } +void SscReader::EndStepFixed() +{ + if (m_CurrentStep == 0) + { + MPI_Win_free(&m_MpiWin); + SyncReadPattern(); + MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, + m_StreamComm, &m_MpiWin); + } + if (m_MpiMode == "twosided") + { + for (const auto &i : m_AllReceivingWriterRanks) + { + m_MpiRequests.emplace_back(); + MPI_Irecv(m_Buffer.data() + i.second.first, + static_cast(i.second.second), MPI_CHAR, i.first, 0, + m_StreamComm, &m_MpiRequests.back()); + } + } + else if (m_MpiMode == "onesidedfencepush") + { + MPI_Win_fence(0, m_MpiWin); + } + else if (m_MpiMode == "onesidedpostpush") + { + MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin); + } + else if (m_MpiMode == "onesidedfencepull") + { + MPI_Win_fence(0, m_MpiWin); + for (const auto &i : m_AllReceivingWriterRanks) + { + MPI_Get(m_Buffer.data() + i.second.first, + static_cast(i.second.second), MPI_CHAR, i.first, 0, + static_cast(i.second.second), MPI_CHAR, m_MpiWin); + } + } + else if (m_MpiMode == "onesidedpostpull") + { + MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin); + for (const auto &i : m_AllReceivingWriterRanks) + { + MPI_Get(m_Buffer.data() + i.second.first, + static_cast(i.second.second), MPI_CHAR, i.first, 0, + static_cast(i.second.second), MPI_CHAR, m_MpiWin); + } + } +} + +void SscReader::EndStepFirstFlexible() +{ + MPI_Win_free(&m_MpiWin); + SyncReadPattern(); +} + +void SscReader::EndStepConsequentFlexible() { MPI_Win_free(&m_MpiWin); } + +void SscReader::EndBeginStepFirstFlexible() +{ + EndStepFirstFlexible(); + BeginStepFlexible(m_StepStatus); +} + +void SscReader::EndBeginStepConsequentFlexible() +{ + EndStepConsequentFlexible(); + BeginStepFlexible(m_StepStatus); +} + void SscReader::EndStep() { TAU_SCOPED_TIMER_FUNC(); @@ -267,69 +358,41 @@ void SscReader::EndStep() PerformGets(); - if (m_WriterDefinitionsLocked && - m_ReaderSelectionsLocked) // fixed IO pattern + if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked) + { + EndStepFixed(); + } + else { if (m_CurrentStep == 0) { - MPI_Win_free(&m_MpiWin); - SyncReadPattern(); - MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL, - m_StreamComm, &m_MpiWin); - } - if (m_MpiMode == "twosided") - { - for (const auto &i : m_AllReceivingWriterRanks) + if (m_Threading) { - m_MpiRequests.emplace_back(); - MPI_Irecv(m_Buffer.data() + i.second.first, - static_cast(i.second.second), MPI_CHAR, i.first, - 0, m_StreamComm, &m_MpiRequests.back()); + m_EndStepThread = + std::thread(&SscReader::EndBeginStepFirstFlexible, this); } - } - else if (m_MpiMode == "onesidedfencepush") - { - MPI_Win_fence(0, m_MpiWin); - } - else if (m_MpiMode == "onesidedpostpush") - { - MPI_Win_post(m_MpiAllWritersGroup, 0, m_MpiWin); - } - else if (m_MpiMode == "onesidedfencepull") - { - MPI_Win_fence(0, m_MpiWin); - for (const auto &i : m_AllReceivingWriterRanks) + else { - MPI_Get(m_Buffer.data() + i.second.first, - static_cast(i.second.second), MPI_CHAR, i.first, 0, - static_cast(i.second.second), MPI_CHAR, m_MpiWin); + EndStepFirstFlexible(); } } - else if (m_MpiMode == "onesidedpostpull") + else { - MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin); - for (const auto &i : m_AllReceivingWriterRanks) + if (m_Threading) { - MPI_Get(m_Buffer.data() + i.second.first, - static_cast(i.second.second), MPI_CHAR, i.first, 0, - static_cast(i.second.second), MPI_CHAR, m_MpiWin); + m_EndStepThread = std::thread( + &SscReader::EndBeginStepConsequentFlexible, this); + } + else + { + EndStepConsequentFlexible(); } - } - } - else // flexible IO pattern - { - MPI_Win_free(&m_MpiWin); - if (m_CurrentStep == 0) - { - SyncReadPattern(); } } m_StepBegun = false; } -// PRIVATE - void SscReader::SyncMpiPattern() { TAU_SCOPED_TIMER_FUNC(); diff --git a/source/adios2/engine/ssc/SscReader.h b/source/adios2/engine/ssc/SscReader.h index 27e503fe4d..22ee005d93 100644 --- a/source/adios2/engine/ssc/SscReader.h +++ b/source/adios2/engine/ssc/SscReader.h @@ -52,8 +52,9 @@ class SscReader : public Engine MPI_Win m_MpiWin; MPI_Group m_MpiAllWritersGroup; MPI_Comm m_StreamComm; - std::string m_MpiMode = "twosided"; std::vector m_MpiRequests; + StepStatus m_StepStatus; + std::thread m_EndStepThread; int m_StreamRank; int m_StreamSize; @@ -63,6 +64,13 @@ class SscReader : public Engine void SyncMpiPattern(); bool SyncWritePattern(); void SyncReadPattern(); + void BeginStepConsequentFixed(); + void BeginStepFlexible(StepStatus &status); + void EndStepFixed(); + void EndStepFirstFlexible(); + void EndStepConsequentFlexible(); + void EndBeginStepFirstFlexible(); + void EndBeginStepConsequentFlexible(); #define declare_type(T) \ void DoGetSync(Variable &, T *) final; \ @@ -89,6 +97,8 @@ class SscReader : public Engine int m_Verbosity = 0; int m_OpenTimeoutSecs = 10; + bool m_Threading = false; + std::string m_MpiMode = "twosided"; }; } // end namespace engine