From 5eaca129ba1416a246bc60a9fbe73d1384b6c970 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 2 Apr 2019 14:30:41 -0400 Subject: [PATCH 1/6] added blocking in wdm reader for timeoutSeconds=-1 --- source/adios2/engine/wdm/WdmReader.cpp | 120 ++++++++++--------------- source/adios2/engine/wdm/WdmReader.h | 5 +- testing/adios2/engine/wdm/WdmTest.cpp | 2 +- 3 files changed, 50 insertions(+), 77 deletions(-) diff --git a/source/adios2/engine/wdm/WdmReader.cpp b/source/adios2/engine/wdm/WdmReader.cpp index 2143775f41..0fad4e4d01 100644 --- a/source/adios2/engine/wdm/WdmReader.cpp +++ b/source/adios2/engine/wdm/WdmReader.cpp @@ -57,17 +57,9 @@ WdmReader::~WdmReader() } } -StepStatus WdmReader::BeginStep(const StepMode stepMode, - const float timeoutSeconds) +StepStatus WdmReader::BeginStepIterator(StepMode stepMode, + format::DmvVecPtr &vars) { - - Log(5, - "WdmReader::BeginStep() start. Last step " + - std::to_string(m_CurrentStep), - true, true); - - ++m_CurrentStep; - if (not m_AttributesSet) { RequestMetadata(-3); @@ -78,34 +70,6 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, RequestMetadata(); m_MetaDataMap = m_DataManSerializer.GetMetaData(); - auto startTime = std::chrono::system_clock::now(); - while (m_MetaDataMap.empty()) - { - RequestMetadata(); - m_MetaDataMap = m_DataManSerializer.GetMetaData(); - auto nowTime = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast( - nowTime - startTime); - if (duration.count() > timeoutSeconds) - { - Log(5, - "WdmReader::BeginStep() returned EndOfStream because of " - "timeout.", - true, true); - return StepStatus::EndOfStream; - } - } - - if (m_Verbosity >= 5) - { - std::cout << "WdmReader::BeginStep() MetadataMap contains "; - for (const auto &i : m_MetaDataMap) - { - std::cout << i.first << ", "; - } - std::cout << std::endl; - } - size_t maxStep = std::numeric_limits::min(); size_t minStep = std::numeric_limits::max(); @@ -129,20 +93,7 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, } if (m_CurrentStep > maxStep) { - ++m_RetryTimes; - --m_CurrentStep; - if (m_RetryTimes > m_RetryMax) - { - Log(5, - "WdmReader::BeginStep() returned EndOfStream because " - "reaching max try times for waiting next available step.", - true, true); - return StepStatus::EndOfStream; - } - else - { - return StepStatus::NotReady; - } + return StepStatus::NotReady; } } else if (stepMode == StepMode::LatestAvailable) @@ -155,7 +106,6 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, "[WdmReader::BeginStep] Step mode is not supported!")); } - format::DmvVecPtr vars = nullptr; auto currentStepIt = m_MetaDataMap.find(m_CurrentStep); if (currentStepIt == m_MetaDataMap.end()) { @@ -177,42 +127,64 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, if (vars == nullptr) { - ++m_RetryTimes; - if (m_RetryTimes > m_RetryMax) + return StepStatus::NotReady; + } +} + +StepStatus WdmReader::BeginStep(const StepMode stepMode, + const float timeoutSeconds) +{ + + Log(5, + "WdmReader::BeginStep() start. Last step " + + std::to_string(m_CurrentStep), + true, true); + + ++m_CurrentStep; + + format::DmvVecPtr vars = nullptr; + + auto startTime = std::chrono::system_clock::now(); + + while (vars == nullptr) + { + if (BeginStepIterator(stepMode, vars) == StepStatus::OK) + { + m_RetryTimes = 0; + break; + } + auto nowTime = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast( + nowTime - startTime); + if (duration.count() > timeoutSeconds && timeoutSeconds >= 0) { Log(5, - "WdmReader::BeginStep() returned EndOfStream because reaching " - "max try times for waiting valid metadata map element.", + "WdmReader::BeginStep() returned EndOfStream because of " + "timeout.", true, true); return StepStatus::EndOfStream; } - else - { - return StepStatus::NotReady; - } } - else + + for (const auto &i : *vars) { - for (const auto &i : *vars) + if (i.step == m_CurrentStep) { - if (i.step == m_CurrentStep) + if (i.type == "compound") { - if (i.type == "compound") - { - throw("Compound type is not supported yet."); - } + throw("Compound type is not supported yet."); + } #define declare_type(T) \ else if (i.type == helper::GetType()) \ { \ CheckIOVariable(i.name, i.shape, i.start, i.count); \ } - ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) + ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) #undef declare_type - else - { - throw("Unknown type caught in " - "DataManReader::BeginStepSubscribe."); - } + else + { + throw("Unknown type caught in " + "DataManReader::BeginStepSubscribe."); } } } diff --git a/source/adios2/engine/wdm/WdmReader.h b/source/adios2/engine/wdm/WdmReader.h index fd9e4ee057..8bfa5f40f6 100644 --- a/source/adios2/engine/wdm/WdmReader.h +++ b/source/adios2/engine/wdm/WdmReader.h @@ -37,8 +37,9 @@ class WdmReader : public Engine ~WdmReader(); StepStatus BeginStep( - StepMode mode = StepMode::NextAvailable, + StepMode stepMode = StepMode::NextAvailable, const float timeoutSeconds = std::numeric_limits::max()) final; + StepStatus BeginStepIterator(StepMode stepMode, format::DmvVecPtr &vars); void PerformGets() final; size_t CurrentStep() const final; void EndStep() final; @@ -115,7 +116,7 @@ class WdmReader : public Engine void Log(const int level, const std::string &message, const bool mpi, const bool endline); - int m_Verbosity = 0; + int m_Verbosity = 5; }; } // end namespace engine diff --git a/testing/adios2/engine/wdm/WdmTest.cpp b/testing/adios2/engine/wdm/WdmTest.cpp index 7df6560888..899e4c84fe 100644 --- a/testing/adios2/engine/wdm/WdmTest.cpp +++ b/testing/adios2/engine/wdm/WdmTest.cpp @@ -227,7 +227,6 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, { received_steps = true; const auto &vars = dataManIO.AvailableVariables(); - ASSERT_EQ(vars.size(), 10); if (print_lines == 0) { std::cout << "All available variables : "; @@ -237,6 +236,7 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, } std::cout << std::endl; } + ASSERT_EQ(vars.size(), 10); size_t currentStep = dataManReader.CurrentStep(); // ASSERT_EQ(i, currentStep); adios2::Variable bpChars = From 4a0b8840c0b185999814f7825edd2a91b719bbb4 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 2 Apr 2019 14:35:21 -0400 Subject: [PATCH 2/6] cleaning up --- source/adios2/engine/wdm/WdmReader.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/adios2/engine/wdm/WdmReader.h b/source/adios2/engine/wdm/WdmReader.h index 8bfa5f40f6..b561d8ed9c 100644 --- a/source/adios2/engine/wdm/WdmReader.h +++ b/source/adios2/engine/wdm/WdmReader.h @@ -116,7 +116,7 @@ class WdmReader : public Engine void Log(const int level, const std::string &message, const bool mpi, const bool endline); - int m_Verbosity = 5; + int m_Verbosity = 0; }; } // end namespace engine From 666a398ae928cf55de07b90706ee6872b5e770e8 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 2 Apr 2019 14:53:08 -0400 Subject: [PATCH 3/6] fixed other step modes for wdm --- source/adios2/engine/wdm/WdmReader.cpp | 31 ++++++++++++++++++-------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/source/adios2/engine/wdm/WdmReader.cpp b/source/adios2/engine/wdm/WdmReader.cpp index 0fad4e4d01..d5e1f04799 100644 --- a/source/adios2/engine/wdm/WdmReader.cpp +++ b/source/adios2/engine/wdm/WdmReader.cpp @@ -148,22 +148,35 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, while (vars == nullptr) { - if (BeginStepIterator(stepMode, vars) == StepStatus::OK) + auto stepStatus = BeginStepIterator(stepMode, vars); + if (stepStatus == StepStatus::OK) { m_RetryTimes = 0; break; } - auto nowTime = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast( - nowTime - startTime); - if (duration.count() > timeoutSeconds && timeoutSeconds >= 0) + else if (stepStatus == StepStatus::EndOfStream) { - Log(5, - "WdmReader::BeginStep() returned EndOfStream because of " - "timeout.", - true, true); return StepStatus::EndOfStream; } + if (timeoutSeconds >= 0) + { + auto nowTime = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast( + nowTime - startTime); + if (duration.count() > timeoutSeconds) + { + Log(5, + "WdmReader::BeginStep() returned EndOfStream because of " + "timeout.", + true, true); + return StepStatus::EndOfStream; + } + else + { + Log(5, "WdmReader::BeginStep() returned NotReady.", true, true); + return StepStatus::NotReady; + } + } } for (const auto &i : *vars) From 4faba0482b803f188d37de3bc1ec77e66e6a8693 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 2 Apr 2019 15:24:09 -0400 Subject: [PATCH 4/6] more bug fixes --- source/adios2/engine/wdm/WdmReader.cpp | 5 ----- testing/adios2/engine/wdm/WdmTest.cpp | 9 ++------- 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/source/adios2/engine/wdm/WdmReader.cpp b/source/adios2/engine/wdm/WdmReader.cpp index d5e1f04799..689b935620 100644 --- a/source/adios2/engine/wdm/WdmReader.cpp +++ b/source/adios2/engine/wdm/WdmReader.cpp @@ -169,11 +169,6 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, "WdmReader::BeginStep() returned EndOfStream because of " "timeout.", true, true); - return StepStatus::EndOfStream; - } - else - { - Log(5, "WdmReader::BeginStep() returned NotReady.", true, true); return StepStatus::NotReady; } } diff --git a/testing/adios2/engine/wdm/WdmTest.cpp b/testing/adios2/engine/wdm/WdmTest.cpp index 899e4c84fe..460a70c6ce 100644 --- a/testing/adios2/engine/wdm/WdmTest.cpp +++ b/testing/adios2/engine/wdm/WdmTest.cpp @@ -221,7 +221,7 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, GenData(myDComplexes, i, start, count, shape); adios2::StepStatus status = - dataManReader.BeginStep(StepMode::NextAvailable, 5); + dataManReader.BeginStep(StepMode::NextAvailable); if (status == adios2::StepStatus::OK) { @@ -296,11 +296,6 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, VerifyData(myDComplexes.data(), currentStep, start, count, shape); dataManReader.EndStep(); } - else - { - std::cout << "End of stream at Step " << i << std::endl; - break; - } } if (received_steps) { @@ -329,7 +324,7 @@ TEST_F(WdmEngineTest, BaseTest) Dims shape = {10, (size_t)mpiSize * 2}; Dims start = {2, (size_t)mpiRank * 2}; Dims count = {5, 2}; - size_t steps = 1000; + size_t steps = 10000; if (mpiGroup == 0) { From b4d2ee31c417087f6d221d5d0afe7c399a82e960 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Tue, 2 Apr 2019 16:30:45 -0400 Subject: [PATCH 5/6] fixed a port conflict bug for running multiple apps on a single node --- source/adios2/helper/adiosNetwork.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/adios2/helper/adiosNetwork.cpp b/source/adios2/helper/adiosNetwork.cpp index b8e6b8d76b..823ff20314 100644 --- a/source/adios2/helper/adiosNetwork.cpp +++ b/source/adios2/helper/adiosNetwork.cpp @@ -15,6 +15,7 @@ #ifndef _WIN32 #if defined(ADIOS2_HAVE_DATAMAN) || defined(ADIOS2_HAVE_WDM) +#include #include #include //AvailableIpAddresses() inet_ntoa @@ -132,6 +133,8 @@ void HandshakeWriter(MPI_Comm mpiComm, size_t &appID, remove(".staging.lock"); } + appID = helper::BroadcastValue(appID, mpiComm); + // Make full addresses for (int i = 0; i < channelsPerRank; ++i) { From d96dad733efb191ebd462a42056575dc4d13c183 Mon Sep 17 00:00:00 2001 From: Ruonan Wang Date: Tue, 2 Apr 2019 20:30:52 -0400 Subject: [PATCH 6/6] fixed another bug in wdm reader which causes dead lock --- source/adios2/engine/wdm/WdmReader.cpp | 25 +++++++++++++++++++++++++ testing/adios2/engine/wdm/WdmTest.cpp | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/source/adios2/engine/wdm/WdmReader.cpp b/source/adios2/engine/wdm/WdmReader.cpp index 689b935620..5222712e01 100644 --- a/source/adios2/engine/wdm/WdmReader.cpp +++ b/source/adios2/engine/wdm/WdmReader.cpp @@ -70,6 +70,11 @@ StepStatus WdmReader::BeginStepIterator(StepMode stepMode, RequestMetadata(); m_MetaDataMap = m_DataManSerializer.GetMetaData(); + if (m_MetaDataMap.empty()) + { + return StepStatus::NotReady; + } + size_t maxStep = std::numeric_limits::min(); size_t minStep = std::numeric_limits::max(); @@ -93,6 +98,12 @@ StepStatus WdmReader::BeginStepIterator(StepMode stepMode, } if (m_CurrentStep > maxStep) { + Log(5, + "WdmReader::BeginStepIterator() returned NotReady because " + "current step is larger than max step in buffer " + + std::to_string(m_CurrentStep) + ">" + + std::to_string(maxStep), + true, true); return StepStatus::NotReady; } } @@ -127,8 +138,14 @@ StepStatus WdmReader::BeginStepIterator(StepMode stepMode, if (vars == nullptr) { + Log(5, + "WdmReader::BeginStepIterator() returned NotReady because vars == " + "nullptr", + true, true); return StepStatus::NotReady; } + + return StepStatus::OK; } StepStatus WdmReader::BeginStep(const StepMode stepMode, @@ -344,6 +361,14 @@ void WdmReader::Init() srand(time(NULL)); InitParameters(); helper::HandshakeReader(m_MPIComm, m_AppID, m_FullAddresses, m_Name); + + if (m_Verbosity >= 5) + { + for (const auto &i : m_FullAddresses) + { + std::cout << i << std::endl; + } + } } void WdmReader::InitParameters() diff --git a/testing/adios2/engine/wdm/WdmTest.cpp b/testing/adios2/engine/wdm/WdmTest.cpp index 460a70c6ce..15a2beea38 100644 --- a/testing/adios2/engine/wdm/WdmTest.cpp +++ b/testing/adios2/engine/wdm/WdmTest.cpp @@ -324,7 +324,7 @@ TEST_F(WdmEngineTest, BaseTest) Dims shape = {10, (size_t)mpiSize * 2}; Dims start = {2, (size_t)mpiRank * 2}; Dims count = {5, 2}; - size_t steps = 10000; + size_t steps = 1000; if (mpiGroup == 0) {