Skip to content

Commit

Permalink
MessageTransport Network Send and Recv Events handled
Browse files Browse the repository at this point in the history
NetworkMessage class has been added

Temporary code for publishloop has been added

unit test for MessageTransport has been added

NetworkMessageProcessor class has been added
  • Loading branch information
upendar25 committed May 29, 2022
1 parent eda5c2d commit ab180e1
Show file tree
Hide file tree
Showing 15 changed files with 1,036 additions and 22 deletions.
50 changes: 50 additions & 0 deletions src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2022 Kai Huebl (kai@huebl-sgh.de)
Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser
Datei nur in Übereinstimmung mit der Lizenz erlaubt.
Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0.
Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart,
erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE
GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend.
Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen
im Rahmen der Lizenz finden Sie in der Lizenz.
Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com)
*/

#include "OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h"

namespace OpcUaStackPubSub
{

DataSetReaderBase::DataSetReaderBase(void)
{
}

DataSetReaderBase::~DataSetReaderBase(void)
{
}

void
DataSetReaderBase::dataSetReaderId(uint32_t dataSetReaderId)
{
dataSetReaderId_ = dataSetReaderId;
}

uint32_t
DataSetReaderBase::dataSetReaderId(void)
{
return dataSetReaderId_;
}

bool
DataSetReaderBase::encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size)
{
// FIXME: todo
return true;
}

}
50 changes: 50 additions & 0 deletions src/OpcUaStackPubSub/DataSetReader/DataSetReaderBase.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2022 Kai Huebl (kai@huebl-sgh.de)
Lizenziert gemäß Apache Licence Version 2.0 (die „Lizenz“); Nutzung dieser
Datei nur in Übereinstimmung mit der Lizenz erlaubt.
Eine Kopie der Lizenz erhalten Sie auf http://www.apache.org/licenses/LICENSE-2.0.
Sofern nicht gemäß geltendem Recht vorgeschrieben oder schriftlich vereinbart,
erfolgt die Bereitstellung der im Rahmen der Lizenz verbreiteten Software OHNE
GEWÄHR ODER VORBEHALTE – ganz gleich, ob ausdrücklich oder stillschweigend.
Informationen über die jeweiligen Bedingungen für Genehmigungen und Einschränkungen
im Rahmen der Lizenz finden Sie in der Lizenz.
Autor: Kai Huebl (kai@huebl-sgh.de), Aleksey Timin (atimin@gmail.com)
*/

#ifndef __OpcUaStackPubSub_DataSetReaderBase_h__
#define __OpcUaStackPubSub_DataSetReaderBase_h__

#include <boost/asio.hpp>

#include <map>

#include "OpcUaStackCore/Base/os.h"

namespace OpcUaStackPubSub
{

class DLLEXPORT DataSetReaderBase
{
public:
typedef boost::shared_ptr<DataSetReaderBase> SPtr;
typedef std::map<uint16_t, SPtr> Map;

DataSetReaderBase(void);
virtual ~DataSetReaderBase(void);

void dataSetReaderId(uint32_t dataSetReaderId);
uint32_t dataSetReaderId(void);

bool encodeDataSet(boost::asio::streambuf& dataSet, uint16_t& size);

public:
uint32_t dataSetReaderId_ = 0; // unique data set writer id
};

}

#endif
56 changes: 49 additions & 7 deletions src/OpcUaStackPubSub/MessageTransport/MessageTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

#include "OpcUaStackPubSub/MessageTransport/MessageTransport.h"
#include "OpcUaStackPubSub/NetworkMessage/NetworkMessage.h"
#include "OpcUaStackCore/Base/Log.h"
#include "OpcUaStackPubSub/Events/NetworkSendEvent.h"
#include "OpcUaStackPubSub/Events/NetworkRecvEvent.h"
Expand All @@ -42,6 +43,7 @@ namespace OpcUaStackPubSub
ServerServiceBase::ioThread_ = ioThread.get();
strand_ = ioThread->createStrand();
messageBus_ = messageBus;
connectionName_ = connectionName;

// register message bus receiver
MessageBusMemberConfig messageBusMemberConfig;
Expand All @@ -50,7 +52,7 @@ namespace OpcUaStackPubSub

// activate receiver
activateReceiver(
[this](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) {
[this, &messageBusMemberConfig](const MessageBusMember::WPtr& handleFrom, Message::SPtr& message) {
// receive message from internal message bus

auto event = boost::static_pointer_cast<Event>(message);
Expand All @@ -59,13 +61,29 @@ namespace OpcUaStackPubSub
case EventType::NetworkRecvEvent:
{
NetworkRecvEvent::SPtr event = boost::static_pointer_cast<NetworkRecvEvent>(message);
// FIXME: todo

std::iostream ios(&(event->streamBuf()));
NetworkMessage networkMessage;
networkMessage.opcUaBinaryDecode(ios);

auto publisherId = networkMessage.networkMessageHeader()->publisherId();

auto it = networkMessageProcessorMap_.find(publisherId);
if (it != networkMessageProcessorMap_.end()) {
auto readerGroupBusMember = messageBus_->registerMember(it->second, messageBusMemberConfig);
messageBus_->messageSend(messageBusMember_, readerGroupBusMember, event);
} else {
Log(Error, "network message processor does not exist for this networkmessage")
.parameter("PublisherId", publisherId);
}

break;
}
case EventType::NetworkSendEvent:
{
Log(Info, "Recieved NetworkSendEvent");
NetworkSendEvent::SPtr event = boost::static_pointer_cast<NetworkSendEvent>(message);
// FIXME: todo
messageBus_->messageSend(messageBusMember_, connectionBusMember_, event);
break;
}
default:
Expand All @@ -89,14 +107,21 @@ namespace OpcUaStackPubSub
bool
MessageTransport::startup(void)
{
// FIXME: todo
// get reference to connection from message bus
if (!messageBus_->existMember(connectionName_)) {
Log(Error, "udp connection message bus member don't exist")
.parameter("UdpConnectionName", connectionName_);
return false;
}
connectionBusMember_ = messageBus_->getMember(connectionName_);

return true;
}

bool
MessageTransport::shutdown(void)
{
// FIXME: todo
// FIXME: todo
return true;
}

Expand All @@ -106,7 +131,16 @@ namespace OpcUaStackPubSub
const std::string& networkMessageProcessorName // message bus member name
)
{
// FIXME: todo
auto it = networkMessageProcessorMap_.find(publisherId);
if (it != networkMessageProcessorMap_.end()) {
Log(Error, "register network message processor error, because network message processor d already exist")
.parameter("MessageTransport", serviceName_)
.parameter("NetworkMessageProcessor PublisherId", publisherId);
return false;
}

// add network message processor to map
networkMessageProcessorMap_.insert(std::make_pair(publisherId, networkMessageProcessorName));
return true;
}

Expand All @@ -115,7 +149,15 @@ namespace OpcUaStackPubSub
uint32_t publisherId
)
{
// FIXME: todo
auto it = networkMessageProcessorMap_.find(publisherId);
if (it == networkMessageProcessorMap_.end()) {
Log(Error, "deregister network message processor error, because network message processor does not exist")
.parameter("MessageTransport", serviceName_)
.parameter("NetworkMessageProcessor PublisherId", publisherId);
return false;
}

networkMessageProcessorMap_.erase(it);
return true;
}

Expand Down
3 changes: 3 additions & 0 deletions src/OpcUaStackPubSub/MessageTransport/MessageTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ namespace OpcUaStackPubSub

private:
OpcUaStackCore::IOThread::SPtr ioThread_; // smart pointer to io thread
std::string connectionName_;
OpcUaStackCore::MessageBusMember::WPtr connectionBusMember_;
std::map<uint32_t, std::string> networkMessageProcessorMap_;
};

}
Expand Down
84 changes: 84 additions & 0 deletions src/OpcUaStackPubSub/NetworkMessage/GroupHeader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include "OpcUaStackPubSub/NetworkMessage/GroupHeader.h"

namespace OpcUaStackPubSub
{

OpcUaByte& GroupHeader::groupFlags()
{
return groupFlags_;
}
void GroupHeader::groupFlags(OpcUaByte& groupFlags)
{
groupFlags_ = groupFlags;
}

OpcUaUInt16& GroupHeader::writerGroupId()
{
return writerGroupId_;
}
void GroupHeader::writerGroupId(OpcUaUInt16& writerGroupId)
{
writerGroupId_ = writerGroupId;
}

OpcUaByte& GroupHeader::groupVersion()
{
return groupVersion_;
}

void GroupHeader::groupVersion(OpcUaByte& groupVersion)
{
groupVersion_ = groupVersion;
}

OpcUaUInt16& GroupHeader::networkMessageNumber()
{
return networkMessageNumber_;
}
void GroupHeader::networkMessageNumber(OpcUaUInt16& networkMessageNumber)
{
networkMessageNumber_ = networkMessageNumber;
}
OpcUaUInt16& GroupHeader::sequenceNumber()
{
return sequenceNumber_;
}
void GroupHeader::sequenceNumber(OpcUaUInt16& sequenceNumber)
{
sequenceNumber_ = sequenceNumber;
}
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
//
// GroupHeader Encoding and Decoding
//
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
bool
GroupHeader::opcUaBinaryEncode(std::ostream& os) const
{
bool rc = true;
rc &= OpcUaNumber::opcUaBinaryEncode(os, groupFlags_);
rc &= OpcUaNumber::opcUaBinaryEncode(os, writerGroupId_);
rc &= OpcUaNumber::opcUaBinaryEncode(os, groupVersion_);
rc &= OpcUaNumber::opcUaBinaryEncode(os, networkMessageNumber_);
rc &= OpcUaNumber::opcUaBinaryEncode(os, sequenceNumber_);

return rc;
}
bool
GroupHeader::opcUaBinaryDecode(std::istream& is)
{
bool rc = true;
rc &= OpcUaNumber::opcUaBinaryDecode(is, groupFlags_);
rc &= OpcUaNumber::opcUaBinaryDecode(is, writerGroupId_);
rc &= OpcUaNumber::opcUaBinaryDecode(is, groupVersion_);
rc &= OpcUaNumber::opcUaBinaryDecode(is, networkMessageNumber_);
rc &= OpcUaNumber::opcUaBinaryDecode(is, sequenceNumber_);
return rc;
}




}
48 changes: 48 additions & 0 deletions src/OpcUaStackPubSub/NetworkMessage/GroupHeader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#ifndef __OpcUaStackPubSub_GroupHeader_h__
#define __OpcUaStackPubSub_GroupHeader_h__

#include "OpcUaStackCore/Base/os.h"
#include "OpcUaStackCore/BuildInTypes/OpcUaGuid.h"
#include "OpcUaStackCore/Base/Utility.h"
#include <vector>

using namespace OpcUaStackCore;

namespace OpcUaStackPubSub
{

class GroupHeader
{
public:
using SPtr = boost::shared_ptr<GroupHeader>;
GroupHeader() = default;
virtual ~GroupHeader() = default;
bool opcUaBinaryEncode(std::ostream& os) const;
bool opcUaBinaryDecode(std::istream& is);

OpcUaByte& groupFlags();
void groupFlags(OpcUaByte& groupFlags);

OpcUaUInt16& writerGroupId();
void writerGroupId(OpcUaUInt16& writerGroupId);

OpcUaByte& groupVersion();
void groupVersion(OpcUaByte& groupVersion);

OpcUaUInt16& networkMessageNumber();
void networkMessageNumber(OpcUaUInt16& networkMessageNumber);

OpcUaUInt16& sequenceNumber();
void sequenceNumber(OpcUaUInt16& sequenceNumber);

private:
OpcUaByte groupFlags_;
OpcUaUInt16 writerGroupId_;
OpcUaByte groupVersion_;
OpcUaUInt16 networkMessageNumber_;
OpcUaUInt16 sequenceNumber_;
};

}

#endif
Loading

0 comments on commit ab180e1

Please sign in to comment.