Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed one-sided MPI options for fixed IO pattern in SSC #2685

Merged
merged 3 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions docs/user_guide/source/engines/ssc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@ The SSC engine takes the following parameters:

1. ``OpenTimeoutSecs``: Default **10**. Timeout in seconds for opening a stream. The SSC engine's open function will block until the RendezvousAppCount is reached, or timeout, whichever comes first. If it reaches the timeout, SSC will throw an exception.

2. ``MpiMode``: Default **TwoSided**. MPI communication modes to use. Besides the default TwoSided mode using two sided MPI communications, MPI_Isend and MPI_Irecv, for data transport, there are four one sided MPI modes: OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, and OneSidedPostPull. Modes with **Push** are based on the push model and use MPI_Put for data transport, while modes with **Pull** are based on the pull model and use MPI_Get. Modes with **Fence** use MPI_Win_fence for synchronization, while modes with **Post** use MPI_Win_start, MPI_Win_complete, MPI_Win_post and MPI_Win_wait.

3. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.
2. ``Threading``: Default **False**. SSC will use threads to hide the time cost for metadata manipulation and data transfer when this parameter is set to **true**. SSC will check if MPI is initialized with multi-thread enabled, and if not, then SSC will force this parameter to be **false**. Please do NOT enable threading when multiple I/O streams are opened in an application, as it will cause unpredictable errors.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
=============================== ================== ================================================
OpenTimeoutSecs integer **10**, 2, 20, 200
MpiMode string **TwoSided**, OneSidedFencePush, OneSidedPostPush, OneSidedFencePull, OneSidedPostPull
Threading bool **false**, true
=============================== ================== ================================================

Expand Down
74 changes: 8 additions & 66 deletions source/adios2/engine/ssc/SscReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
{
TAU_SCOPED_TIMER_FUNC();

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
Expand All @@ -40,28 +39,9 @@ SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }

void SscReader::BeginStepConsequentFixed()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_wait(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_complete(m_MpiWin);
}
MPI_Waitall(static_cast<int>(m_MpiRequests.size()), m_MpiRequests.data(),
MPI_STATUS_IGNORE);
m_MpiRequests.clear();
}

void SscReader::BeginStepFlexible(StepStatus &status)
Expand Down Expand Up @@ -278,46 +258,13 @@ void SscReader::EndStepFixed()
{
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
if (m_MpiMode == "twosided")
for (const auto &i : m_AllReceivingWriterRanks)
{
for (const auto &i : m_AllReceivingWriterRanks)
{
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_post(m_WriterGroup, 0, m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_start(m_WriterGroup, 0, m_MpiWin);
for (const auto &i : m_AllReceivingWriterRanks)
{
MPI_Get(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
static_cast<int>(i.second.second), MPI_CHAR, m_MpiWin);
}
m_MpiRequests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first,
static_cast<int>(i.second.second), MPI_CHAR, i.first, 0,
m_StreamComm, &m_MpiRequests.back());
}
}

Expand Down Expand Up @@ -567,11 +514,6 @@ void SscReader::DoClose(const int transportIndex)
{
BeginStep();
}

if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MPI_Win_free(&m_MpiWin);
}
}

} // end namespace engine
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/ssc/SscReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class SscReader : public Engine
int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down
120 changes: 14 additions & 106 deletions source/adios2/engine/ssc/SscWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
{
TAU_SCOPED_TIMER_FUNC();

helper::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "Threading", m_Threading);
helper::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs",
Expand Down Expand Up @@ -108,53 +107,16 @@ void SscWriter::EndStepFirst()
m_StreamComm, &m_MpiWin);
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
if (m_WriterDefinitionsLocked && m_ReaderSelectionsLocked)
{
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
m_StreamComm, &m_MpiWin);
}
}

void SscWriter::EndStepConsequentFixed()
{
TAU_SCOPED_TIMER_FUNC();
if (m_MpiMode == "twosided")
{
for (const auto &i : m_AllSendingReaderRanks)
{
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, 0, m_StreamComm,
&m_MpiRequests.back());
}
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_ReaderGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), static_cast<int>(m_Buffer.size()),
MPI_CHAR, i.first, i.second.first,
static_cast<int>(m_Buffer.size()), MPI_CHAR, m_MpiWin);
}
}
else if (m_MpiMode == "onesidedfencepull")
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_post(m_ReaderGroup, 0, m_MpiWin);
m_MpiRequests.emplace_back();
MPI_Isend(m_Buffer.data(), static_cast<int>(m_Buffer.size()), MPI_CHAR,
i.first, 0, m_StreamComm, &m_MpiRequests.back());
}
}

Expand Down Expand Up @@ -213,28 +175,9 @@ void SscWriter::Flush(const int transportIndex) { TAU_SCOPED_TIMER_FUNC(); }

void SscWriter::MpiWait()
{
if (m_MpiMode == "twosided")
{
MPI_Waitall(static_cast<int>(m_MpiRequests.size()),
m_MpiRequests.data(), MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
{
MPI_Win_wait(m_MpiWin);
}
MPI_Waitall(static_cast<int>(m_MpiRequests.size()), m_MpiRequests.data(),
MPI_STATUSES_IGNORE);
m_MpiRequests.clear();
}

void SscWriter::SyncMpiPattern()
Expand Down Expand Up @@ -418,50 +361,15 @@ void SscWriter::DoClose(const int transportIndex)

m_Buffer[0] = 1;

if (m_MpiMode == "twosided")
{
std::vector<MPI_Request> requests;
for (const auto &i : m_AllSendingReaderRanks)
{
requests.emplace_back();
MPI_Isend(m_Buffer.data(), 1, MPI_CHAR, i.first, 0,
m_StreamComm, &requests.back());
}
MPI_Waitall(static_cast<int>(requests.size()), requests.data(),
MPI_STATUS_IGNORE);
}
else if (m_MpiMode == "onesidedfencepush")
{
MPI_Win_fence(0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, 1, MPI_CHAR,
m_MpiWin);
}
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpush")
{
MPI_Win_start(m_ReaderGroup, 0, m_MpiWin);
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Put(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, 1, MPI_CHAR,
m_MpiWin);
}
MPI_Win_complete(m_MpiWin);
}
else if (m_MpiMode == "onesidedfencepull")
{
MPI_Win_fence(0, m_MpiWin);
MPI_Win_fence(0, m_MpiWin);
}
else if (m_MpiMode == "onesidedpostpull")
std::vector<MPI_Request> requests;
for (const auto &i : m_AllSendingReaderRanks)
{
MPI_Win_post(m_ReaderGroup, 0, m_MpiWin);
MPI_Win_wait(m_MpiWin);
requests.emplace_back();
MPI_Isend(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, m_StreamComm,
&requests.back());
}

MPI_Win_free(&m_MpiWin);
MPI_Waitall(static_cast<int>(requests.size()), requests.data(),
MPI_STATUS_IGNORE);
}
else
{
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/ssc/SscWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class SscWriter : public Engine
int m_Verbosity = 0;
int m_OpenTimeoutSecs = 10;
bool m_Threading = false;
std::string m_MpiMode = "twosided";
};

} // end namespace engine
Expand Down
12 changes: 0 additions & 12 deletions testing/adios2/engine/ssc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,6 @@ if(ADIOS2_HAVE_MPI)
gtest_add_tests_helper(OnlyTwoSteps MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOnlyTwoSteps.MPI "" TRUE)

gtest_add_tests_helper(OneSidedFencePush MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedFencePush.MPI "" TRUE)

gtest_add_tests_helper(OneSidedPostPush MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedPostPush.MPI "" TRUE)

gtest_add_tests_helper(OneSidedFencePull MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedFencePull.MPI "" TRUE)

gtest_add_tests_helper(OneSidedPostPull MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscOneSidedPostPull.MPI "" TRUE)

gtest_add_tests_helper(Unbalanced MPI_ONLY Ssc Engine.SSC. "")
SetupTestPipeline(Engine.SSC.SscEngineTest.TestSscUnbalanced.MPI "" TRUE)

Expand Down
Loading