Skip to content

Commit

Permalink
Add MQTT optimization.
Browse files Browse the repository at this point in the history
Reestablish TCP connection on new MQTT-CONNECT message.
Version: 0.3.2
  • Loading branch information
monstrenyatko committed Jan 2, 2018
1 parent 5cc64a8 commit 33b111b
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 15 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.1
0.3.2
4 changes: 2 additions & 2 deletions src/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void Router::onProcess(std::unique_ptr<Networking::DataUnit> unit) {
Utils::Configuration::get().tcp.port
}
);
*mLog.info() << u->getFrom()->toString() << " -> " << to.toString();
*mLog.debug() << u->getFrom()->toString() << " -> " << to.toString();
Application::get().getTcpNet().send(u->getFrom(), &to, u->popData());
}
break;
Expand All @@ -128,7 +128,7 @@ void Router::onProcess(std::unique_ptr<Networking::DataUnit> unit) {
if (!u) {
throw Utils::Error("Wrong unit type");
}
*mLog.info() << u->getFrom()->toString() << " -> " << u->getTo()->toString();
*mLog.debug() << u->getFrom()->toString() << " -> " << u->getTo()->toString();
Application::get().getXBeeNet().to(u->getFrom(), u->getTo(), u->popData());
}
break;
Expand Down
46 changes: 46 additions & 0 deletions src/TcpNet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ void TcpNet::onSend(std::unique_ptr<Networking::Address> from, std::unique_ptr<N
*mLog.debug() << UTILS_STR_FUNCTION << ", size:" << buffer->size();
try {
TcpNetConnection* connection = mCtx->db.get(*from, *to);
if (connection && isMqttConnect(*buffer)) {
*mLog.info() << "Reestablish Connection";
connection->close();
connection = NULL;
}
if (!connection) {
*mLog.info() << "Connecting, " << from->toString() << " <-> " << to->toString();
// prepare parameters
Expand All @@ -176,6 +181,47 @@ void TcpNet::onSend(std::unique_ptr<Networking::Address> from, std::unique_ptr<N
}
}

bool TcpNet::isMqttConnect(const Networking::Buffer& buffer) const {
uint32_t size = buffer.size();
if (size > 1) {
uint8_t maxLengthCursor = 4;
uint8_t cursor = 0;
uint8_t header = buffer[cursor++];
// check header
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Header" << Utils::putByte(header);
if ((((header & 0xF0) >> 4) & 0x0F) == 1) {
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect header" << Utils::putByte(header);
// skip length: skip all bytes with most significant bit == '1' and next one with most significant bit == '0'
do {
if (cursor > maxLengthCursor ) {
*mLog.debug() << UTILS_STR_FUNCTION << "Max length cursor value reached";
return false;
}
} while ((buffer[cursor++] & 128) != 0);
*mLog.debug() << UTILS_STR_FUNCTION << ", length size: " << (int)cursor;
// skip 2 bytes header length
cursor+=2;
if ((int)size > (cursor + 6)) {
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor];
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+1];
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+2];
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+3];
if (buffer[cursor] == 'M' && buffer[cursor+1] == 'Q') {
if (buffer[cursor+2] == 'T') {
return (buffer[cursor+3] == 'T');
} else if (buffer[cursor+2] == 'I') {
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+4];
*mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+5];
return (buffer[cursor+3] == 's' && buffer[cursor+4] == 'd' && buffer[cursor+5] == 'p');
}
}
}
}
}

return false;
}

///////////////////// TcpNet::Internal Interface /////////////////////
boost::asio::io_service& TcpNet::getIo() const {
return mCtx->ioService.getIoService();
Expand Down
1 change: 1 addition & 0 deletions src/TcpNet.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class TcpNet {
// Methods
void onSend(std::unique_ptr<Networking::Address>, std::unique_ptr<Networking::Address>,
std::unique_ptr<Networking::Buffer>);
bool isMqttConnect(const Networking::Buffer&) const;

// Internal
friend class TcpNetCommand;
Expand Down
5 changes: 5 additions & 0 deletions src/TcpNetConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ void TcpNetConnection::send(std::unique_ptr<Networking::Buffer> buffer) {
}
}

void TcpNetConnection::close() {
std::lock_guard<std::mutex> locker(mMtx);
destroy();
}

///////////////////// TcpNetConnection::Internal /////////////////////
void TcpNetConnection::setState(State v) {
mState = v;
Expand Down
4 changes: 3 additions & 1 deletion src/TcpNetConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ class TcpNetConnection {
const Networking::AddressTcp* getTo() const {return mTo.get();}

void send(std::unique_ptr<Networking::Buffer> buffer);
void close();
bool isOpen() const {std::lock_guard<std::mutex> locker(mMtx); return isAlive();}
private:
static Utils::IdGen mIdGen;
Utils::Logger mLog;
std::mutex mMtx;
mutable std::mutex mMtx;
enum State {
STATE_NEW,
STATE_CONNECTED,
Expand Down
13 changes: 2 additions & 11 deletions src/TcpNetDb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ TcpNetDb::~TcpNetDb() {

TcpNetConnection* TcpNetDb::get(const Networking::Address& from, const Networking::Address& to) {
for (auto i: mConnections) {
if (from.isEqual(*(i->getFrom()))
if (i->isOpen()
&& from.isEqual(*(i->getFrom()))
&& to.isEqual(*(i->getTo())))
{
return i;
Expand All @@ -48,16 +49,6 @@ TcpNetConnection* TcpNetDb::get(const Networking::Address& from, const Networkin
void TcpNetDb::put(std::unique_ptr<TcpNetConnection> connection) {
assert(connection.get());
*mLog.debug() << UTILS_STR_FUNCTION << ", Id: " << connection->getId();
for (auto it = mConnections.begin(); it!=mConnections.end(); it++) {
if (connection->getFrom()->isEqual(*((*it)->getFrom()))
&& connection->getTo()->isEqual(*((*it)->getTo())))
{
// replace old by new one
destroy(*it);
*it = connection.release();
return;
}
}
// add new one
mConnections.push_back(connection.get());
connection.release();
Expand Down

0 comments on commit 33b111b

Please sign in to comment.