Skip to content

Commit

Permalink
Merge pull request #1347 from JasonRuonanWang/wdm-reader-blocking
Browse files Browse the repository at this point in the history
Fixed a WDM reader blocking issue
  • Loading branch information
JasonRuonanWang authored Apr 3, 2019
2 parents 2fb4be7 + d96dad7 commit 052a9c7
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 79 deletions.
147 changes: 76 additions & 71 deletions source/adios2/engine/wdm/WdmReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,9 @@ WdmReader::~WdmReader()
}
}

StepStatus WdmReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
StepStatus WdmReader::BeginStepIterator(StepMode stepMode,
format::DmvVecPtr &vars)
{

Log(5,
"WdmReader::BeginStep() start. Last step " +
std::to_string(m_CurrentStep),
true, true);

++m_CurrentStep;

if (not m_AttributesSet)
{
RequestMetadata(-3);
Expand All @@ -78,32 +70,9 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,
RequestMetadata();
m_MetaDataMap = m_DataManSerializer.GetMetaData();

auto startTime = std::chrono::system_clock::now();
while (m_MetaDataMap.empty())
if (m_MetaDataMap.empty())
{
RequestMetadata();
m_MetaDataMap = m_DataManSerializer.GetMetaData();
auto nowTime = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
nowTime - startTime);
if (duration.count() > timeoutSeconds)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because of "
"timeout.",
true, true);
return StepStatus::EndOfStream;
}
}

if (m_Verbosity >= 5)
{
std::cout << "WdmReader::BeginStep() MetadataMap contains ";
for (const auto &i : m_MetaDataMap)
{
std::cout << i.first << ", ";
}
std::cout << std::endl;
return StepStatus::NotReady;
}

size_t maxStep = std::numeric_limits<size_t>::min();
Expand All @@ -129,20 +98,13 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,
}
if (m_CurrentStep > maxStep)
{
++m_RetryTimes;
--m_CurrentStep;
if (m_RetryTimes > m_RetryMax)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because "
"reaching max try times for waiting next available step.",
true, true);
return StepStatus::EndOfStream;
}
else
{
return StepStatus::NotReady;
}
Log(5,
"WdmReader::BeginStepIterator() returned NotReady because "
"current step is larger than max step in buffer " +
std::to_string(m_CurrentStep) + ">" +
std::to_string(maxStep),
true, true);
return StepStatus::NotReady;
}
}
else if (stepMode == StepMode::LatestAvailable)
Expand All @@ -155,7 +117,6 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,
"[WdmReader::BeginStep] Step mode is not supported!"));
}

format::DmvVecPtr vars = nullptr;
auto currentStepIt = m_MetaDataMap.find(m_CurrentStep);
if (currentStepIt == m_MetaDataMap.end())
{
Expand All @@ -177,42 +138,78 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,

if (vars == nullptr)
{
++m_RetryTimes;
if (m_RetryTimes > m_RetryMax)
Log(5,
"WdmReader::BeginStepIterator() returned NotReady because vars == "
"nullptr",
true, true);
return StepStatus::NotReady;
}

return StepStatus::OK;
}

StepStatus WdmReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{

Log(5,
"WdmReader::BeginStep() start. Last step " +
std::to_string(m_CurrentStep),
true, true);

++m_CurrentStep;

format::DmvVecPtr vars = nullptr;

auto startTime = std::chrono::system_clock::now();

while (vars == nullptr)
{
auto stepStatus = BeginStepIterator(stepMode, vars);
if (stepStatus == StepStatus::OK)
{
m_RetryTimes = 0;
break;
}
else if (stepStatus == StepStatus::EndOfStream)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because reaching "
"max try times for waiting valid metadata map element.",
true, true);
return StepStatus::EndOfStream;
}
else
if (timeoutSeconds >= 0)
{
return StepStatus::NotReady;
auto nowTime = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
nowTime - startTime);
if (duration.count() > timeoutSeconds)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because of "
"timeout.",
true, true);
return StepStatus::NotReady;
}
}
}
else

for (const auto &i : *vars)
{
for (const auto &i : *vars)
if (i.step == m_CurrentStep)
{
if (i.step == m_CurrentStep)
if (i.type == "compound")
{
if (i.type == "compound")
{
throw("Compound type is not supported yet.");
}
throw("Compound type is not supported yet.");
}
#define declare_type(T) \
else if (i.type == helper::GetType<T>()) \
{ \
CheckIOVariable<T>(i.name, i.shape, i.start, i.count); \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type
else
{
throw("Unknown type caught in "
"DataManReader::BeginStepSubscribe.");
}
else
{
throw("Unknown type caught in "
"DataManReader::BeginStepSubscribe.");
}
}
}
Expand Down Expand Up @@ -364,6 +361,14 @@ void WdmReader::Init()
srand(time(NULL));
InitParameters();
helper::HandshakeReader(m_MPIComm, m_AppID, m_FullAddresses, m_Name);

if (m_Verbosity >= 5)
{
for (const auto &i : m_FullAddresses)
{
std::cout << i << std::endl;
}
}
}

void WdmReader::InitParameters()
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/wdm/WdmReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class WdmReader : public Engine

~WdmReader();
StepStatus BeginStep(
StepMode mode = StepMode::NextAvailable,
StepMode stepMode = StepMode::NextAvailable,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStepIterator(StepMode stepMode, format::DmvVecPtr &vars);
void PerformGets() final;
size_t CurrentStep() const final;
void EndStep() final;
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/helper/adiosNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef _WIN32
#if defined(ADIOS2_HAVE_DATAMAN) || defined(ADIOS2_HAVE_WDM)

#include <iostream>
#include <thread>

#include <arpa/inet.h> //AvailableIpAddresses() inet_ntoa
Expand Down Expand Up @@ -132,6 +133,8 @@ void HandshakeWriter(MPI_Comm mpiComm, size_t &appID,
remove(".staging.lock");
}

appID = helper::BroadcastValue(appID, mpiComm);

// Make full addresses
for (int i = 0; i < channelsPerRank; ++i)
{
Expand Down
9 changes: 2 additions & 7 deletions testing/adios2/engine/wdm/WdmTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,12 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count,
GenData(myDComplexes, i, start, count, shape);

adios2::StepStatus status =
dataManReader.BeginStep(StepMode::NextAvailable, 5);
dataManReader.BeginStep(StepMode::NextAvailable);

if (status == adios2::StepStatus::OK)
{
received_steps = true;
const auto &vars = dataManIO.AvailableVariables();
ASSERT_EQ(vars.size(), 10);
if (print_lines == 0)
{
std::cout << "All available variables : ";
Expand All @@ -237,6 +236,7 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count,
}
std::cout << std::endl;
}
ASSERT_EQ(vars.size(), 10);
size_t currentStep = dataManReader.CurrentStep();
// ASSERT_EQ(i, currentStep);
adios2::Variable<char> bpChars =
Expand Down Expand Up @@ -296,11 +296,6 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count,
VerifyData(myDComplexes.data(), currentStep, start, count, shape);
dataManReader.EndStep();
}
else
{
std::cout << "End of stream at Step " << i << std::endl;
break;
}
}
if (received_steps)
{
Expand Down

0 comments on commit 052a9c7

Please sign in to comment.