Skip to content

Commit

Permalink
Merge pull request #1685 from JasonRuonanWang/dataman
Browse files Browse the repository at this point in the history
Refine DataMan to allow N-to-M decomposition
  • Loading branch information
JasonRuonanWang authored Aug 20, 2019
2 parents e3ae2f9 + 2ed2b80 commit 2598482
Show file tree
Hide file tree
Showing 40 changed files with 962 additions and 1,531 deletions.
7 changes: 2 additions & 5 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,8 @@ endif()

if(ADIOS2_HAVE_ZeroMQ)
target_sources(adios2 PRIVATE
toolkit/transport/socket/SocketZmq.cpp
toolkit/transport/socket/SocketZmqReqRep.cpp
toolkit/transport/socket/SocketZmqPubSub.cpp
toolkit/transportman/wanman/WANMan.cpp
toolkit/transportman/stagingman/StagingMan.cpp
toolkit/zmq/zmqreqrep/ZmqReqRep.cpp
toolkit/zmq/zmqpubsub/ZmqPubSub.cpp
)
target_link_libraries(adios2 PRIVATE ZeroMQ::ZMQ)
endif()
Expand Down
57 changes: 26 additions & 31 deletions source/adios2/engine/dataman/DataManCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* DataManReader.cpp
* DataManCommon.cpp
*
* Created on: Feb 12, 2018
* Author: Jason Wang
Expand All @@ -21,39 +21,22 @@ DataManCommon::DataManCommon(const std::string engineType, IO &io,
const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: Engine(engineType, io, name, mode, mpiComm),
m_FileTransport(mpiComm, m_DebugMode)
m_IsRowMajor(helper::IsRowMajor(io.m_HostLanguage)),
m_DataManSerializer(mpiComm, m_IsRowMajor)
{

// initialize parameters
MPI_Comm_rank(mpiComm, &m_MpiRank);
MPI_Comm_size(mpiComm, &m_MpiSize);
m_IsLittleEndian = helper::IsLittleEndian();
m_IsRowMajor = helper::IsRowMajor(io.m_HostLanguage);
GetStringParameter(m_IO.m_Parameters, "WorkflowMode", m_WorkflowMode);
GetBoolParameter(m_IO.m_Parameters, "AlwaysProvideLatestTimestep",
m_ProvideLatest);
if (m_WorkflowMode != "file" && m_WorkflowMode != "stream")
{
throw(std::invalid_argument(
"WorkflowMode parameter for DataMan must be File or Stream"));
}
m_Channels = m_IO.m_TransportsParameters.size();
if (m_Channels == 0)
{
m_Channels = 1;
m_IO.m_TransportsParameters.push_back({{"Library", "ZMQ"},
{"IPAddress", "127.0.0.1"},
{"Port", "12306"},
{"Name", m_Name}});
}
for (size_t i = 0; i < m_Channels; ++i)
{
m_IO.m_TransportsParameters[i]["Name"] = m_Name + std::to_string(i);
}
GetParameter(m_IO.m_Parameters, "IPAddress", m_IPAddress);
GetParameter(m_IO.m_Parameters, "Port", m_Port);
GetParameter(m_IO.m_Parameters, "StagingMode", m_StagingMode);
GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
}

bool DataManCommon::GetStringParameter(Params &params, std::string key,
std::string &value)
DataManCommon::~DataManCommon() {}

bool DataManCommon::GetParameter(const Params &params, const std::string &key,
std::string &value)
{
auto it = params.find(key);
if (it != params.end())
Expand All @@ -65,8 +48,20 @@ bool DataManCommon::GetStringParameter(Params &params, std::string key,
return false;
}

bool DataManCommon::GetBoolParameter(Params &params, std::string key,
bool &value)
bool DataManCommon::GetParameter(const Params &params, const std::string &key,
int &value)
{
auto it = params.find(key);
if (it != params.end())
{
value = stoi(it->second);
return true;
}
return false;
}

bool DataManCommon::GetParameter(const Params &params, const std::string &key,
bool &value)
{
auto it = params.find(key);
if (it != params.end())
Expand Down
43 changes: 20 additions & 23 deletions source/adios2/engine/dataman/DataManCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "adios2/helper/adiosSystem.h"
#include "adios2/toolkit/format/dataman/DataManSerializer.h"
#include "adios2/toolkit/format/dataman/DataManSerializer.tcc"
#include "adios2/toolkit/transport/file/FileFStream.h"
#include "adios2/toolkit/transportman/wanman/WANMan.h"
#include "adios2/toolkit/zmq/zmqpubsub/ZmqPubSub.h"
#include "adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h"

namespace adios2
{
Expand All @@ -33,35 +33,32 @@ class DataManCommon : public Engine
public:
DataManCommon(const std::string engineType, IO &io, const std::string &name,
const Mode mode, MPI_Comm mpiComm);

virtual ~DataManCommon() = default;
virtual ~DataManCommon();

protected:
// external paremeters
int m_Verbosity = 0;
size_t m_SerializerBufferSize = 128 * 1024 * 1024;
size_t m_ReceiverBufferSize = 128 * 1024 * 1024;
std::string m_StagingMode = "wide";
int m_Timeout = 5;

// internal variables
int m_MpiRank;
int m_MpiSize;
int m_Channels;
std::string m_WorkflowMode = "stream";
bool m_ProvideLatest = false;
size_t m_BufferSize = 1024 * 1024 * 1024;
bool m_DoMonitor = false;
int64_t m_CurrentStep = -1;

bool m_IsLittleEndian;
bool m_ThreadActive = true;
bool m_IsRowMajor;
bool m_ContiguousMajor = true;

int m_Verbosity = 0;

transport::FileFStream m_FileTransport;

std::vector<std::string> m_StreamNames;
std::string m_IPAddress;
int m_Port = 50001;

std::shared_ptr<transportman::WANMan> m_WANMan;
std::shared_ptr<std::thread> m_DataThread;
format::DataManSerializer m_DataManSerializer;

bool GetStringParameter(Params &params, std::string key,
std::string &value);
bool GetBoolParameter(Params &params, std::string key, bool &value);
bool GetParameter(const Params &params, const std::string &key,
bool &value);
bool GetParameter(const Params &params, const std::string &key, int &value);
bool GetParameter(const Params &params, const std::string &key,
std::string &value);

}; // end class DataManCommon

Expand Down
Loading

0 comments on commit 2598482

Please sign in to comment.