Skip to content

Commit

Permalink
Merge pull request #2168 from JasonRuonanWang/dataman
Browse files Browse the repository at this point in the history
fixed a problem in dataman where readers may miss some initial steps
  • Loading branch information
JasonRuonanWang authored Apr 26, 2020
2 parents 5dad838 + 12c0cb6 commit 150b5fe
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 14 deletions.
6 changes: 2 additions & 4 deletions source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ DataManReader::DataManReader(IO &io, const std::string &name,
helper::GetParameter(m_IO.m_Parameters, "Port", m_Port);
helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "RendezvousReaderCount",
m_RendezvousReaderCount);
helper::GetParameter(m_IO.m_Parameters, "RendezvousMilliseconds",
m_RendezvousMilliseconds);
helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer);

m_ZmqRequester.OpenRequester(m_Timeout, m_ReceiverBufferSize);
Expand Down Expand Up @@ -74,6 +70,8 @@ DataManReader::DataManReader(IO &io, const std::string &name,
}
m_SubscriberThread = std::thread(&DataManReader::SubscriberThread, this);

m_ZmqRequester.Request("Ready", 5, address);

if (m_Verbosity >= 5)
{
std::cout << "DataManReader::DataManReader() Rank " << m_MpiRank
Expand Down
2 changes: 0 additions & 2 deletions source/adios2/engine/dataman/DataManReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class DataManReader : public Engine
std::string m_IPAddress;
int m_Port = 50001;
int m_Timeout = 5;
int m_RendezvousReaderCount = 1;
int m_RendezvousMilliseconds = 1000;
int m_Verbosity = 0;
bool m_DoubleBuffer = true;

Expand Down
13 changes: 6 additions & 7 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
helper::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
helper::GetParameter(m_IO.m_Parameters, "RendezvousReaderCount",
m_RendezvousReaderCount);
helper::GetParameter(m_IO.m_Parameters, "RendezvousMilliseconds",
m_RendezvousMilliseconds);
helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer);

if (m_IPAddress.empty())
Expand Down Expand Up @@ -77,6 +75,8 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
addJson["ControlAddresses"] = caVec;
m_AllAddresses = addJson.dump() + '\0';

m_DataPublisher.OpenPublisher(m_DataAddress, m_Timeout, m_DoubleBuffer);

if (m_RendezvousReaderCount == 0)
{
m_ReplyThread = std::thread(&DataManWriter::ReplyThread, this,
Expand All @@ -87,11 +87,6 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
ReplyThread(m_ControlAddress, m_RendezvousReaderCount);
}

m_DataPublisher.OpenPublisher(m_DataAddress, m_Timeout, m_DoubleBuffer);

std::this_thread::sleep_for(
std::chrono::milliseconds(m_RendezvousMilliseconds));

if (m_Verbosity >= 5)
{
std::cout << "DataManWriter::DataManWriter() Rank " << m_MpiRank
Expand Down Expand Up @@ -202,6 +197,10 @@ void DataManWriter::ReplyThread(const std::string &address, const int times)
if (r == "Address")
{
replier.SendReply(m_AllAddresses.data(), m_AllAddresses.size());
}
else if (r == "Ready")
{
replier.SendReply("OK", 2);
++count;
}
}
Expand Down
1 change: 0 additions & 1 deletion source/adios2/engine/dataman/DataManWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ class DataManWriter : public Engine
std::string m_IPAddress;
int m_Port = 50001;
int m_RendezvousReaderCount = 1;
int m_RendezvousMilliseconds = 1000;
int m_Timeout = 5;
int m_Verbosity = 0;
bool m_DoubleBuffer = false;
Expand Down

0 comments on commit 150b5fe

Please sign in to comment.