Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a WDM reader blocking issue #1347

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 76 additions & 71 deletions source/adios2/engine/wdm/WdmReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,9 @@ WdmReader::~WdmReader()
}
}

StepStatus WdmReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
StepStatus WdmReader::BeginStepIterator(StepMode stepMode,
format::DmvVecPtr &vars)
{

Log(5,
"WdmReader::BeginStep() start. Last step " +
std::to_string(m_CurrentStep),
true, true);

++m_CurrentStep;

if (not m_AttributesSet)
{
RequestMetadata(-3);
Expand All @@ -78,32 +70,9 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,
RequestMetadata();
m_MetaDataMap = m_DataManSerializer.GetMetaData();

auto startTime = std::chrono::system_clock::now();
while (m_MetaDataMap.empty())
if (m_MetaDataMap.empty())
{
RequestMetadata();
m_MetaDataMap = m_DataManSerializer.GetMetaData();
auto nowTime = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
nowTime - startTime);
if (duration.count() > timeoutSeconds)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because of "
"timeout.",
true, true);
return StepStatus::EndOfStream;
}
}

if (m_Verbosity >= 5)
{
std::cout << "WdmReader::BeginStep() MetadataMap contains ";
for (const auto &i : m_MetaDataMap)
{
std::cout << i.first << ", ";
}
std::cout << std::endl;
return StepStatus::NotReady;
}

size_t maxStep = std::numeric_limits<size_t>::min();
Expand All @@ -129,20 +98,13 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,
}
if (m_CurrentStep > maxStep)
{
++m_RetryTimes;
--m_CurrentStep;
if (m_RetryTimes > m_RetryMax)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because "
"reaching max try times for waiting next available step.",
true, true);
return StepStatus::EndOfStream;
}
else
{
return StepStatus::NotReady;
}
Log(5,
"WdmReader::BeginStepIterator() returned NotReady because "
"current step is larger than max step in buffer " +
std::to_string(m_CurrentStep) + ">" +
std::to_string(maxStep),
true, true);
return StepStatus::NotReady;
}
}
else if (stepMode == StepMode::LatestAvailable)
Expand All @@ -155,7 +117,6 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,
"[WdmReader::BeginStep] Step mode is not supported!"));
}

format::DmvVecPtr vars = nullptr;
auto currentStepIt = m_MetaDataMap.find(m_CurrentStep);
if (currentStepIt == m_MetaDataMap.end())
{
Expand All @@ -177,42 +138,78 @@ StepStatus WdmReader::BeginStep(const StepMode stepMode,

if (vars == nullptr)
{
++m_RetryTimes;
if (m_RetryTimes > m_RetryMax)
Log(5,
"WdmReader::BeginStepIterator() returned NotReady because vars == "
"nullptr",
true, true);
return StepStatus::NotReady;
}

return StepStatus::OK;
}

StepStatus WdmReader::BeginStep(const StepMode stepMode,
const float timeoutSeconds)
{

Log(5,
"WdmReader::BeginStep() start. Last step " +
std::to_string(m_CurrentStep),
true, true);

++m_CurrentStep;

format::DmvVecPtr vars = nullptr;

auto startTime = std::chrono::system_clock::now();

while (vars == nullptr)
{
auto stepStatus = BeginStepIterator(stepMode, vars);
if (stepStatus == StepStatus::OK)
{
m_RetryTimes = 0;
break;
}
else if (stepStatus == StepStatus::EndOfStream)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because reaching "
"max try times for waiting valid metadata map element.",
true, true);
return StepStatus::EndOfStream;
}
else
if (timeoutSeconds >= 0)
{
return StepStatus::NotReady;
auto nowTime = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
nowTime - startTime);
if (duration.count() > timeoutSeconds)
{
Log(5,
"WdmReader::BeginStep() returned EndOfStream because of "
"timeout.",
true, true);
return StepStatus::NotReady;
}
}
}
else

for (const auto &i : *vars)
{
for (const auto &i : *vars)
if (i.step == m_CurrentStep)
{
if (i.step == m_CurrentStep)
if (i.type == "compound")
{
if (i.type == "compound")
{
throw("Compound type is not supported yet.");
}
throw("Compound type is not supported yet.");
}
#define declare_type(T) \
else if (i.type == helper::GetType<T>()) \
{ \
CheckIOVariable<T>(i.name, i.shape, i.start, i.count); \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
#undef declare_type
else
{
throw("Unknown type caught in "
"DataManReader::BeginStepSubscribe.");
}
else
{
throw("Unknown type caught in "
"DataManReader::BeginStepSubscribe.");
}
}
}
Expand Down Expand Up @@ -364,6 +361,14 @@ void WdmReader::Init()
srand(time(NULL));
InitParameters();
helper::HandshakeReader(m_MPIComm, m_AppID, m_FullAddresses, m_Name);

if (m_Verbosity >= 5)
{
for (const auto &i : m_FullAddresses)
{
std::cout << i << std::endl;
}
}
}

void WdmReader::InitParameters()
Expand Down
3 changes: 2 additions & 1 deletion source/adios2/engine/wdm/WdmReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ class WdmReader : public Engine

~WdmReader();
StepStatus BeginStep(
StepMode mode = StepMode::NextAvailable,
StepMode stepMode = StepMode::NextAvailable,
const float timeoutSeconds = std::numeric_limits<float>::max()) final;
StepStatus BeginStepIterator(StepMode stepMode, format::DmvVecPtr &vars);
void PerformGets() final;
size_t CurrentStep() const final;
void EndStep() final;
Expand Down
3 changes: 3 additions & 0 deletions source/adios2/helper/adiosNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef _WIN32
#if defined(ADIOS2_HAVE_DATAMAN) || defined(ADIOS2_HAVE_WDM)

#include <iostream>
#include <thread>

#include <arpa/inet.h> //AvailableIpAddresses() inet_ntoa
Expand Down Expand Up @@ -132,6 +133,8 @@ void HandshakeWriter(MPI_Comm mpiComm, size_t &appID,
remove(".staging.lock");
}

appID = helper::BroadcastValue(appID, mpiComm);

// Make full addresses
for (int i = 0; i < channelsPerRank; ++i)
{
Expand Down
9 changes: 2 additions & 7 deletions testing/adios2/engine/wdm/WdmTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,12 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count,
GenData(myDComplexes, i, start, count, shape);

adios2::StepStatus status =
dataManReader.BeginStep(StepMode::NextAvailable, 5);
dataManReader.BeginStep(StepMode::NextAvailable);

if (status == adios2::StepStatus::OK)
{
received_steps = true;
const auto &vars = dataManIO.AvailableVariables();
ASSERT_EQ(vars.size(), 10);
if (print_lines == 0)
{
std::cout << "All available variables : ";
Expand All @@ -237,6 +236,7 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count,
}
std::cout << std::endl;
}
ASSERT_EQ(vars.size(), 10);
size_t currentStep = dataManReader.CurrentStep();
// ASSERT_EQ(i, currentStep);
adios2::Variable<char> bpChars =
Expand Down Expand Up @@ -296,11 +296,6 @@ void Reader(const Dims &shape, const Dims &start, const Dims &count,
VerifyData(myDComplexes.data(), currentStep, start, count, shape);
dataManReader.EndStep();
}
else
{
std::cout << "End of stream at Step " << i << std::endl;
break;
}
}
if (received_steps)
{
Expand Down