diff --git a/source/adios2/engine/wdm/WdmReader.cpp b/source/adios2/engine/wdm/WdmReader.cpp index 2143775f41..5222712e01 100644 --- a/source/adios2/engine/wdm/WdmReader.cpp +++ b/source/adios2/engine/wdm/WdmReader.cpp @@ -57,17 +57,9 @@ WdmReader::~WdmReader() } } -StepStatus WdmReader::BeginStep(const StepMode stepMode, - const float timeoutSeconds) +StepStatus WdmReader::BeginStepIterator(StepMode stepMode, + format::DmvVecPtr &vars) { - - Log(5, - "WdmReader::BeginStep() start. Last step " + - std::to_string(m_CurrentStep), - true, true); - - ++m_CurrentStep; - if (not m_AttributesSet) { RequestMetadata(-3); @@ -78,32 +70,9 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, RequestMetadata(); m_MetaDataMap = m_DataManSerializer.GetMetaData(); - auto startTime = std::chrono::system_clock::now(); - while (m_MetaDataMap.empty()) + if (m_MetaDataMap.empty()) { - RequestMetadata(); - m_MetaDataMap = m_DataManSerializer.GetMetaData(); - auto nowTime = std::chrono::system_clock::now(); - auto duration = std::chrono::duration_cast( - nowTime - startTime); - if (duration.count() > timeoutSeconds) - { - Log(5, - "WdmReader::BeginStep() returned EndOfStream because of " - "timeout.", - true, true); - return StepStatus::EndOfStream; - } - } - - if (m_Verbosity >= 5) - { - std::cout << "WdmReader::BeginStep() MetadataMap contains "; - for (const auto &i : m_MetaDataMap) - { - std::cout << i.first << ", "; - } - std::cout << std::endl; + return StepStatus::NotReady; } size_t maxStep = std::numeric_limits::min(); @@ -129,20 +98,13 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, } if (m_CurrentStep > maxStep) { - ++m_RetryTimes; - --m_CurrentStep; - if (m_RetryTimes > m_RetryMax) - { - Log(5, - "WdmReader::BeginStep() returned EndOfStream because " - "reaching max try times for waiting next available step.", - true, true); - return StepStatus::EndOfStream; - } - else - { - return StepStatus::NotReady; - } + Log(5, + "WdmReader::BeginStepIterator() returned NotReady because " + "current step is larger than max step in buffer " + + std::to_string(m_CurrentStep) + ">" + + std::to_string(maxStep), + true, true); + return StepStatus::NotReady; } } else if (stepMode == StepMode::LatestAvailable) @@ -155,7 +117,6 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, "[WdmReader::BeginStep] Step mode is not supported!")); } - format::DmvVecPtr vars = nullptr; auto currentStepIt = m_MetaDataMap.find(m_CurrentStep); if (currentStepIt == m_MetaDataMap.end()) { @@ -177,42 +138,78 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode, if (vars == nullptr) { - ++m_RetryTimes; - if (m_RetryTimes > m_RetryMax) + Log(5, + "WdmReader::BeginStepIterator() returned NotReady because vars == " + "nullptr", + true, true); + return StepStatus::NotReady; + } + + return StepStatus::OK; +} + +StepStatus WdmReader::BeginStep(const StepMode stepMode, + const float timeoutSeconds) +{ + + Log(5, + "WdmReader::BeginStep() start. Last step " + + std::to_string(m_CurrentStep), + true, true); + + ++m_CurrentStep; + + format::DmvVecPtr vars = nullptr; + + auto startTime = std::chrono::system_clock::now(); + + while (vars == nullptr) + { + auto stepStatus = BeginStepIterator(stepMode, vars); + if (stepStatus == StepStatus::OK) + { + m_RetryTimes = 0; + break; + } + else if (stepStatus == StepStatus::EndOfStream) { - Log(5, - "WdmReader::BeginStep() returned EndOfStream because reaching " - "max try times for waiting valid metadata map element.", - true, true); return StepStatus::EndOfStream; } - else + if (timeoutSeconds >= 0) { - return StepStatus::NotReady; + auto nowTime = std::chrono::system_clock::now(); + auto duration = std::chrono::duration_cast( + nowTime - startTime); + if (duration.count() > timeoutSeconds) + { + Log(5, + "WdmReader::BeginStep() returned EndOfStream because of " + "timeout.", + true, true); + return StepStatus::NotReady; + } } } - else + + for (const auto &i : *vars) { - for (const auto &i : *vars) + if (i.step == m_CurrentStep) { - if (i.step == m_CurrentStep) + if (i.type == "compound") { - if (i.type == "compound") - { - throw("Compound type is not supported yet."); - } + throw("Compound type is not supported yet."); + } #define declare_type(T) \ else if (i.type == helper::GetType()) \ { \ CheckIOVariable(i.name, i.shape, i.start, i.count); \ } - ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) + ADIOS2_FOREACH_STDTYPE_1ARG(declare_type) #undef declare_type - else - { - throw("Unknown type caught in " - "DataManReader::BeginStepSubscribe."); - } + else + { + throw("Unknown type caught in " + "DataManReader::BeginStepSubscribe."); } } } @@ -364,6 +361,14 @@ void WdmReader::Init() srand(time(NULL)); InitParameters(); helper::HandshakeReader(m_MPIComm, m_AppID, m_FullAddresses, m_Name); + + if (m_Verbosity >= 5) + { + for (const auto &i : m_FullAddresses) + { + std::cout << i << std::endl; + } + } } void WdmReader::InitParameters() diff --git a/source/adios2/engine/wdm/WdmReader.h b/source/adios2/engine/wdm/WdmReader.h index fd9e4ee057..b561d8ed9c 100644 --- a/source/adios2/engine/wdm/WdmReader.h +++ b/source/adios2/engine/wdm/WdmReader.h @@ -37,8 +37,9 @@ class WdmReader : public Engine ~WdmReader(); StepStatus BeginStep( - StepMode mode = StepMode::NextAvailable, + StepMode stepMode = StepMode::NextAvailable, const float timeoutSeconds = std::numeric_limits::max()) final; + StepStatus BeginStepIterator(StepMode stepMode, format::DmvVecPtr &vars); void PerformGets() final; size_t CurrentStep() const final; void EndStep() final; diff --git a/source/adios2/helper/adiosNetwork.cpp b/source/adios2/helper/adiosNetwork.cpp index b8e6b8d76b..823ff20314 100644 --- a/source/adios2/helper/adiosNetwork.cpp +++ b/source/adios2/helper/adiosNetwork.cpp @@ -15,6 +15,7 @@ #ifndef _WIN32 #if defined(ADIOS2_HAVE_DATAMAN) || defined(ADIOS2_HAVE_WDM) +#include #include #include //AvailableIpAddresses() inet_ntoa @@ -132,6 +133,8 @@ void HandshakeWriter(MPI_Comm mpiComm, size_t &appID, remove(".staging.lock"); } + appID = helper::BroadcastValue(appID, mpiComm); + // Make full addresses for (int i = 0; i < channelsPerRank; ++i) { diff --git a/testing/adios2/engine/wdm/WdmTest.cpp b/testing/adios2/engine/wdm/WdmTest.cpp index 7df6560888..15a2beea38 100644 --- a/testing/adios2/engine/wdm/WdmTest.cpp +++ b/testing/adios2/engine/wdm/WdmTest.cpp @@ -221,13 +221,12 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, GenData(myDComplexes, i, start, count, shape); adios2::StepStatus status = - dataManReader.BeginStep(StepMode::NextAvailable, 5); + dataManReader.BeginStep(StepMode::NextAvailable); if (status == adios2::StepStatus::OK) { received_steps = true; const auto &vars = dataManIO.AvailableVariables(); - ASSERT_EQ(vars.size(), 10); if (print_lines == 0) { std::cout << "All available variables : "; @@ -237,6 +236,7 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, } std::cout << std::endl; } + ASSERT_EQ(vars.size(), 10); size_t currentStep = dataManReader.CurrentStep(); // ASSERT_EQ(i, currentStep); adios2::Variable bpChars = @@ -296,11 +296,6 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count, VerifyData(myDComplexes.data(), currentStep, start, count, shape); dataManReader.EndStep(); } - else - { - std::cout << "End of stream at Step " << i << std::endl; - break; - } } if (received_steps) {