Skip to content

Commit

Permalink
Merge pull request #2167 from JasonRuonanWang/dataman
Browse files Browse the repository at this point in the history
Added options in DataMan for specifying double buffer or direct send / receive
  • Loading branch information
JasonRuonanWang authored Apr 25, 2020
2 parents 31644af + 130c22f commit 5dad838
Show file tree
Hide file tree
Showing 10 changed files with 827 additions and 28 deletions.
5 changes: 5 additions & 0 deletions docs/user_guide/source/engines/dataman.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,18 @@ The DataMan engine takes the following parameters:
A number >1 will cause the writer to wait for more readers, and a value of 0 will allow the writer to proceed without any readers present.
This value is interpreted by DataMan Writer engines only.

5. ``DoubleBuffer``: Default **true** for reader, **false** for writer. Whether to use double buffer for caching send and receive operations.
Enabling double buffer will cause extra overhead for managing threads and buffer queues, but will improve the continuity of data steps for the reader, for the pub/sub mode.
Advice for generic uses cases is to keep the default values, true for reader and false for writer.

=============================== ================== ================================================
**Key** **Value Format** **Default** and Examples
=============================== ================== ================================================
IPAddress string **N/A**, 22.195.18.29
Port integer **50001**, 22000, 33000
Timeout integer **5**, 10, 30
RendezvousReaderCount integer **1**, 0, 3
DoubleBuffer bool **true** for reader, **false** for writer
=============================== ================== ================================================


6 changes: 4 additions & 2 deletions source/adios2/engine/dataman/DataManReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ DataManReader::DataManReader(IO &io, const std::string &name,
m_RendezvousReaderCount);
helper::GetParameter(m_IO.m_Parameters, "RendezvousMilliseconds",
m_RendezvousMilliseconds);
helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer);

m_ZmqRequester.OpenRequester(m_Timeout, m_ReceiverBufferSize);

Expand Down Expand Up @@ -67,7 +68,8 @@ DataManReader::DataManReader(IO &io, const std::string &name,
for (const auto &address : m_DataAddresses)
{
auto dataZmq = std::make_shared<adios2::zmq::ZmqPubSub>();
dataZmq->OpenSubscriber(address, m_Timeout, m_ReceiverBufferSize);
dataZmq->OpenSubscriber(address, m_Timeout, m_DoubleBuffer,
m_ReceiverBufferSize);
m_ZmqSubscriberVec.push_back(dataZmq);
}
m_SubscriberThread = std::thread(&DataManReader::SubscriberThread, this);
Expand Down Expand Up @@ -203,7 +205,7 @@ void DataManReader::SubscriberThread()
{
for (auto &z : m_ZmqSubscriberVec)
{
auto buffer = z->PopBufferQueue();
auto buffer = z->Receive();
if (buffer != nullptr && buffer->size() > 0)
{
if (buffer->size() < 64)
Expand Down
14 changes: 8 additions & 6 deletions source/adios2/engine/dataman/DataManReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ class DataManReader : public Engine
void Flush(const int transportIndex = -1) final;

private:
std::string m_IPAddress;
int m_Port = 50001;
int m_Timeout = 5;
int m_RendezvousReaderCount = 1;
int m_RendezvousMilliseconds = 1000;
int m_Verbosity = 0;
bool m_DoubleBuffer = true;

bool m_InitFailed = false;
int64_t m_CurrentStep = -1;
size_t m_FinalStep = std::numeric_limits<size_t>::max();
Expand All @@ -50,12 +58,6 @@ class DataManReader : public Engine
format::DataManSerializer m_Serializer;
int m_MpiRank;
int m_MpiSize;
std::string m_IPAddress;
int m_Port = 50001;
int m_Timeout = 5;
int m_RendezvousReaderCount = 1;
int m_RendezvousMilliseconds = 1000;
int m_Verbosity = 0;
size_t m_ReceiverBufferSize = 128 * 1024 * 1024;
bool m_ThreadActive = true;

Expand Down
11 changes: 8 additions & 3 deletions source/adios2/engine/dataman/DataManWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
: Engine("DataManWriter", io, name, openMode, std::move(comm)),
m_Serializer(m_Comm, helper::IsRowMajor(io.m_HostLanguage))
{

m_MpiRank = m_Comm.Rank();
m_MpiSize = m_Comm.Size();

helper::GetParameter(m_IO.m_Parameters, "IPAddress", m_IPAddress);
helper::GetParameter(m_IO.m_Parameters, "Port", m_Port);
helper::GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
Expand All @@ -32,6 +34,7 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
m_RendezvousReaderCount);
helper::GetParameter(m_IO.m_Parameters, "RendezvousMilliseconds",
m_RendezvousMilliseconds);
helper::GetParameter(m_IO.m_Parameters, "DoubleBuffer", m_DoubleBuffer);

if (m_IPAddress.empty())
{
Expand Down Expand Up @@ -84,7 +87,7 @@ DataManWriter::DataManWriter(IO &io, const std::string &name,
ReplyThread(m_ControlAddress, m_RendezvousReaderCount);
}

m_DataPublisher.OpenPublisher(m_DataAddress, m_Timeout);
m_DataPublisher.OpenPublisher(m_DataAddress, m_Timeout, m_DoubleBuffer);

std::this_thread::sleep_for(
std::chrono::milliseconds(m_RendezvousMilliseconds));
Expand Down Expand Up @@ -137,7 +140,8 @@ void DataManWriter::EndStep()
m_Serializer.AttachAttributesToLocalPack();
const auto buf = m_Serializer.GetLocalPack();
m_SerializerBufferSize = buf->size();
m_DataPublisher.PushBufferQueue(buf);

m_DataPublisher.Send(buf);

if (m_Verbosity >= 5)
{
Expand Down Expand Up @@ -169,7 +173,7 @@ void DataManWriter::DoClose(const int transportIndex)
std::string s = endSignal.dump() + '\0';
auto cvp = std::make_shared<std::vector<char>>(s.size());
std::memcpy(cvp->data(), s.c_str(), s.size());
m_DataPublisher.PushBufferQueue(cvp);
m_DataPublisher.Send(cvp);

m_ThreadActive = false;
if (m_ReplyThread.joinable())
Expand All @@ -181,6 +185,7 @@ void DataManWriter::DoClose(const int transportIndex)
std::cout << "DataManWriter::DoClose() Rank " << m_MpiRank << ", Step "
<< m_CurrentStep << std::endl;
}
m_IsClosed = true;
}

void DataManWriter::ReplyThread(const std::string &address, const int times)
Expand Down
17 changes: 9 additions & 8 deletions source/adios2/engine/dataman/DataManWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,28 @@ class DataManWriter : public Engine
void Flush(const int transportIndex = -1) final;

private:
std::string m_DataAddress;
std::string m_ControlAddress;
std::string m_AllAddresses;
int m_CurrentReaderCount = 0;
int m_MpiRank;
int m_MpiSize;
std::string m_IPAddress;
int m_Port = 50001;
int m_RendezvousReaderCount = 1;
int m_RendezvousMilliseconds = 1000;
int m_Timeout = 5;
int m_Verbosity = 0;
bool m_DoubleBuffer = false;

std::string m_AllAddresses;
std::string m_DataAddress;
std::string m_ControlAddress;
int m_MpiRank;
int m_MpiSize;
int64_t m_CurrentStep = -1;
size_t m_SerializerBufferSize = 128 * 1024 * 1024;
bool m_ThreadActive = true;

format::DataManSerializer m_Serializer;
adios2::zmq::ZmqPubSub m_DataPublisher;
zmq::ZmqPubSub m_DataPublisher;

void ReplyThread(const std::string &address, const int times);
std::thread m_ReplyThread;
void ReplyThread(const std::string &address, const int times);

#define declare_type(T) \
void DoPutSync(Variable<T> &, const T *) final; \
Expand Down
56 changes: 51 additions & 5 deletions source/adios2/toolkit/zmq/zmqpubsub/ZmqPubSub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ ZmqPubSub::~ZmqPubSub()
}

m_ThreadActive = false;

if (m_Thread.joinable())
{
m_Thread.join();
Expand All @@ -60,9 +61,11 @@ ZmqPubSub::~ZmqPubSub()
}
}

void ZmqPubSub::OpenPublisher(const std::string &address, const int timeout)
void ZmqPubSub::OpenPublisher(const std::string &address, const int timeout,
const bool useDoubleBuffer)
{
m_Timeout = timeout;
m_DoubleBuffer = useDoubleBuffer;

m_ZmqContext = zmq_ctx_new();
if (not m_ZmqContext)
Expand All @@ -82,13 +85,18 @@ void ZmqPubSub::OpenPublisher(const std::string &address, const int timeout)
throw std::runtime_error("binding zmq socket failed");
}

m_Thread = std::thread(&ZmqPubSub::WriterThread, this);
if (m_DoubleBuffer)
{
m_Thread = std::thread(&ZmqPubSub::SendThread, this);
}
}

void ZmqPubSub::OpenSubscriber(const std::string &address, const int timeout,
const bool useDoubleBuffer,
const size_t bufferSize)
{
m_Timeout = timeout;
m_DoubleBuffer = useDoubleBuffer;

m_ZmqContext = zmq_ctx_new();
if (not m_ZmqContext)
Expand All @@ -112,7 +120,45 @@ void ZmqPubSub::OpenSubscriber(const std::string &address, const int timeout,

m_ReceiverBuffer.resize(bufferSize);

m_Thread = std::thread(&ZmqPubSub::ReaderThread, this);
if (m_DoubleBuffer)
{
m_Thread = std::thread(&ZmqPubSub::ReceiveThread, this);
}
}

void ZmqPubSub::Send(std::shared_ptr<std::vector<char>> buffer)
{
if (buffer != nullptr and buffer->size() > 0)
{
if (m_DoubleBuffer)
{
PushBufferQueue(buffer);
}
else
{
zmq_send(m_ZmqSocket, buffer->data(), buffer->size(), ZMQ_DONTWAIT);
}
}
}

std::shared_ptr<std::vector<char>> ZmqPubSub::Receive()
{
std::shared_ptr<std::vector<char>> buff = nullptr;
if (m_DoubleBuffer)
{
buff = PopBufferQueue();
}
else
{
int ret = zmq_recv(m_ZmqSocket, m_ReceiverBuffer.data(),
m_ReceiverBuffer.size(), ZMQ_DONTWAIT);
if (ret > 0)
{
buff = std::make_shared<std::vector<char>>(ret);
std::memcpy(buff->data(), m_ReceiverBuffer.data(), ret);
}
}
return buff;
}

void ZmqPubSub::PushBufferQueue(std::shared_ptr<std::vector<char>> buffer)
Expand All @@ -136,7 +182,7 @@ std::shared_ptr<std::vector<char>> ZmqPubSub::PopBufferQueue()
}
}

void ZmqPubSub::WriterThread()
void ZmqPubSub::SendThread()
{
while (m_ThreadActive)
{
Expand All @@ -148,7 +194,7 @@ void ZmqPubSub::WriterThread()
}
}

void ZmqPubSub::ReaderThread()
void ZmqPubSub::ReceiveThread()
{
while (m_ThreadActive)
{
Expand Down
14 changes: 10 additions & 4 deletions source/adios2/toolkit/zmq/zmqpubsub/ZmqPubSub.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,26 @@ class ZmqPubSub
ZmqPubSub();
~ZmqPubSub();

void OpenPublisher(const std::string &address, const int timeout);
void OpenPublisher(const std::string &address, const int timeout,
const bool useDoubleBuffer);
void OpenSubscriber(const std::string &address, const int timeout,
const bool useDoubleBuffer,
const size_t receiveBufferSize);

void Send(std::shared_ptr<std::vector<char>> buffer);
std::shared_ptr<std::vector<char>> Receive();

private:
void PushBufferQueue(std::shared_ptr<std::vector<char>> buffer);
std::shared_ptr<std::vector<char>> PopBufferQueue();

private:
// For buffer queue
std::queue<std::shared_ptr<std::vector<char>>> m_BufferQueue;
std::mutex m_BufferQueueMutex;

// For threads
void WriterThread();
void ReaderThread();
void SendThread();
void ReceiveThread();
std::thread m_Thread;
bool m_ThreadActive = true;

Expand All @@ -53,6 +58,7 @@ class ZmqPubSub
// parameters
int m_Timeout = 10;
int m_Verbosity = 0;
bool m_DoubleBuffer;
};

} // end namespace zmq
Expand Down
2 changes: 2 additions & 0 deletions testing/adios2/engine/dataman/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ if(ADIOS2_HAVE_MPI)
gtest_add_tests_helper(1D NOEXEC DataMan Engine.DataMan. "")
gtest_add_tests_helper(2DMemSelect NOEXEC DataMan Engine.DataMan. "")
gtest_add_tests_helper(3DMemSelect NOEXEC DataMan Engine.DataMan. "")
gtest_add_tests_helper(WriterDoubleBuffer NOEXEC DataMan Engine.DataMan. "")
gtest_add_tests_helper(ReaderDirectReceive NOEXEC DataMan Engine.DataMan. "")
endif()
Loading

0 comments on commit 5dad838

Please sign in to comment.