Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine DataMan to allow N-to-M decomposition #1685

Merged
merged 7 commits into from
Aug 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions source/adios2/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,8 @@ endif()

if(ADIOS2_HAVE_ZeroMQ)
target_sources(adios2 PRIVATE
toolkit/transport/socket/SocketZmq.cpp
toolkit/transport/socket/SocketZmqReqRep.cpp
toolkit/transport/socket/SocketZmqPubSub.cpp
toolkit/transportman/wanman/WANMan.cpp
toolkit/transportman/stagingman/StagingMan.cpp
toolkit/zmq/zmqreqrep/ZmqReqRep.cpp
toolkit/zmq/zmqpubsub/ZmqPubSub.cpp
)
target_link_libraries(adios2 PRIVATE ZeroMQ::ZMQ)
endif()
Expand Down
57 changes: 26 additions & 31 deletions source/adios2/engine/dataman/DataManCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* DataManReader.cpp
* DataManCommon.cpp
*
* Created on: Feb 12, 2018
* Author: Jason Wang
Expand All @@ -21,39 +21,22 @@ DataManCommon::DataManCommon(const std::string engineType, IO &io,
const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: Engine(engineType, io, name, mode, mpiComm),
m_FileTransport(mpiComm, m_DebugMode)
m_IsRowMajor(helper::IsRowMajor(io.m_HostLanguage)),
m_DataManSerializer(mpiComm, m_IsRowMajor)
{

// initialize parameters
MPI_Comm_rank(mpiComm, &m_MpiRank);
MPI_Comm_size(mpiComm, &m_MpiSize);
m_IsLittleEndian = helper::IsLittleEndian();
m_IsRowMajor = helper::IsRowMajor(io.m_HostLanguage);
GetStringParameter(m_IO.m_Parameters, "WorkflowMode", m_WorkflowMode);
GetBoolParameter(m_IO.m_Parameters, "AlwaysProvideLatestTimestep",
m_ProvideLatest);
if (m_WorkflowMode != "file" && m_WorkflowMode != "stream")
{
throw(std::invalid_argument(
"WorkflowMode parameter for DataMan must be File or Stream"));
}
m_Channels = m_IO.m_TransportsParameters.size();
if (m_Channels == 0)
{
m_Channels = 1;
m_IO.m_TransportsParameters.push_back({{"Library", "ZMQ"},
{"IPAddress", "127.0.0.1"},
{"Port", "12306"},
{"Name", m_Name}});
}
for (size_t i = 0; i < m_Channels; ++i)
{
m_IO.m_TransportsParameters[i]["Name"] = m_Name + std::to_string(i);
}
GetParameter(m_IO.m_Parameters, "IPAddress", m_IPAddress);
GetParameter(m_IO.m_Parameters, "Port", m_Port);
GetParameter(m_IO.m_Parameters, "StagingMode", m_StagingMode);
GetParameter(m_IO.m_Parameters, "Timeout", m_Timeout);
GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
}

bool DataManCommon::GetStringParameter(Params &params, std::string key,
std::string &value)
DataManCommon::~DataManCommon() {}

bool DataManCommon::GetParameter(const Params &params, const std::string &key,
std::string &value)
{
auto it = params.find(key);
if (it != params.end())
Expand All @@ -65,8 +48,20 @@ bool DataManCommon::GetStringParameter(Params &params, std::string key,
return false;
}

bool DataManCommon::GetBoolParameter(Params &params, std::string key,
bool &value)
bool DataManCommon::GetParameter(const Params &params, const std::string &key,
int &value)
{
auto it = params.find(key);
if (it != params.end())
{
value = stoi(it->second);
return true;
}
return false;
}

bool DataManCommon::GetParameter(const Params &params, const std::string &key,
bool &value)
{
auto it = params.find(key);
if (it != params.end())
Expand Down
43 changes: 20 additions & 23 deletions source/adios2/engine/dataman/DataManCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
#include "adios2/helper/adiosSystem.h"
#include "adios2/toolkit/format/dataman/DataManSerializer.h"
#include "adios2/toolkit/format/dataman/DataManSerializer.tcc"
#include "adios2/toolkit/transport/file/FileFStream.h"
#include "adios2/toolkit/transportman/wanman/WANMan.h"
#include "adios2/toolkit/zmq/zmqpubsub/ZmqPubSub.h"
#include "adios2/toolkit/zmq/zmqreqrep/ZmqReqRep.h"

namespace adios2
{
Expand All @@ -33,35 +33,32 @@ class DataManCommon : public Engine
public:
DataManCommon(const std::string engineType, IO &io, const std::string &name,
const Mode mode, MPI_Comm mpiComm);

virtual ~DataManCommon() = default;
virtual ~DataManCommon();

protected:
// external paremeters
int m_Verbosity = 0;
size_t m_SerializerBufferSize = 128 * 1024 * 1024;
size_t m_ReceiverBufferSize = 128 * 1024 * 1024;
std::string m_StagingMode = "wide";
int m_Timeout = 5;

// internal variables
int m_MpiRank;
int m_MpiSize;
int m_Channels;
std::string m_WorkflowMode = "stream";
bool m_ProvideLatest = false;
size_t m_BufferSize = 1024 * 1024 * 1024;
bool m_DoMonitor = false;
int64_t m_CurrentStep = -1;

bool m_IsLittleEndian;
bool m_ThreadActive = true;
bool m_IsRowMajor;
bool m_ContiguousMajor = true;

int m_Verbosity = 0;

transport::FileFStream m_FileTransport;

std::vector<std::string> m_StreamNames;
std::string m_IPAddress;
int m_Port = 50001;

std::shared_ptr<transportman::WANMan> m_WANMan;
std::shared_ptr<std::thread> m_DataThread;
format::DataManSerializer m_DataManSerializer;

bool GetStringParameter(Params &params, std::string key,
std::string &value);
bool GetBoolParameter(Params &params, std::string key, bool &value);
bool GetParameter(const Params &params, const std::string &key,
bool &value);
bool GetParameter(const Params &params, const std::string &key, int &value);
bool GetParameter(const Params &params, const std::string &key,
std::string &value);

}; // end class DataManCommon

Expand Down
Loading