diff --git a/VERSION b/VERSION index 9e11b32..d15723f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.3.1 +0.3.2 diff --git a/src/Router.cpp b/src/Router.cpp index e81a6fc..16c05c8 100644 --- a/src/Router.cpp +++ b/src/Router.cpp @@ -118,7 +118,7 @@ void Router::onProcess(std::unique_ptr unit) { Utils::Configuration::get().tcp.port } ); - *mLog.info() << u->getFrom()->toString() << " -> " << to.toString(); + *mLog.debug() << u->getFrom()->toString() << " -> " << to.toString(); Application::get().getTcpNet().send(u->getFrom(), &to, u->popData()); } break; @@ -128,7 +128,7 @@ void Router::onProcess(std::unique_ptr unit) { if (!u) { throw Utils::Error("Wrong unit type"); } - *mLog.info() << u->getFrom()->toString() << " -> " << u->getTo()->toString(); + *mLog.debug() << u->getFrom()->toString() << " -> " << u->getTo()->toString(); Application::get().getXBeeNet().to(u->getFrom(), u->getTo(), u->popData()); } break; diff --git a/src/TcpNet.cpp b/src/TcpNet.cpp index 0e805b4..1cc6f04 100644 --- a/src/TcpNet.cpp +++ b/src/TcpNet.cpp @@ -156,6 +156,11 @@ void TcpNet::onSend(std::unique_ptr from, std::unique_ptrsize(); try { TcpNetConnection* connection = mCtx->db.get(*from, *to); + if (connection && isMqttConnect(*buffer)) { + *mLog.info() << "Reestablish Connection"; + connection->close(); + connection = NULL; + } if (!connection) { *mLog.info() << "Connecting, " << from->toString() << " <-> " << to->toString(); // prepare parameters @@ -176,6 +181,47 @@ void TcpNet::onSend(std::unique_ptr from, std::unique_ptr 1) { + uint8_t maxLengthCursor = 4; + uint8_t cursor = 0; + uint8_t header = buffer[cursor++]; + // check header + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Header" << Utils::putByte(header); + if ((((header & 0xF0) >> 4) & 0x0F) == 1) { + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect header" << Utils::putByte(header); + // skip length: skip all bytes with most significant bit == '1' and next one with most significant bit == '0' + do { + if (cursor > maxLengthCursor ) { + *mLog.debug() << UTILS_STR_FUNCTION << "Max length cursor value reached"; + return false; + } + } while ((buffer[cursor++] & 128) != 0); + *mLog.debug() << UTILS_STR_FUNCTION << ", length size: " << (int)cursor; + // skip 2 bytes header length + cursor+=2; + if ((int)size > (cursor + 6)) { + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor]; + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+1]; + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+2]; + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+3]; + if (buffer[cursor] == 'M' && buffer[cursor+1] == 'Q') { + if (buffer[cursor+2] == 'T') { + return (buffer[cursor+3] == 'T'); + } else if (buffer[cursor+2] == 'I') { + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+4]; + *mLog.debug() << UTILS_STR_FUNCTION << ", MQTT Connect protocol name:" << buffer[cursor+5]; + return (buffer[cursor+3] == 's' && buffer[cursor+4] == 'd' && buffer[cursor+5] == 'p'); + } + } + } + } + } + + return false; +} + ///////////////////// TcpNet::Internal Interface ///////////////////// boost::asio::io_service& TcpNet::getIo() const { return mCtx->ioService.getIoService(); diff --git a/src/TcpNet.h b/src/TcpNet.h index b2b2f1b..91260f5 100644 --- a/src/TcpNet.h +++ b/src/TcpNet.h @@ -77,6 +77,7 @@ class TcpNet { // Methods void onSend(std::unique_ptr, std::unique_ptr, std::unique_ptr); + bool isMqttConnect(const Networking::Buffer&) const; // Internal friend class TcpNetCommand; diff --git a/src/TcpNetConnection.cpp b/src/TcpNetConnection.cpp index 5b561b0..2cbad23 100644 --- a/src/TcpNetConnection.cpp +++ b/src/TcpNetConnection.cpp @@ -94,6 +94,11 @@ void TcpNetConnection::send(std::unique_ptr buffer) { } } +void TcpNetConnection::close() { + std::lock_guard locker(mMtx); + destroy(); +} + ///////////////////// TcpNetConnection::Internal ///////////////////// void TcpNetConnection::setState(State v) { mState = v; diff --git a/src/TcpNetConnection.h b/src/TcpNetConnection.h index 6f0424d..febe2d4 100644 --- a/src/TcpNetConnection.h +++ b/src/TcpNetConnection.h @@ -43,10 +43,12 @@ class TcpNetConnection { const Networking::AddressTcp* getTo() const {return mTo.get();} void send(std::unique_ptr buffer); + void close(); + bool isOpen() const {std::lock_guard locker(mMtx); return isAlive();} private: static Utils::IdGen mIdGen; Utils::Logger mLog; - std::mutex mMtx; + mutable std::mutex mMtx; enum State { STATE_NEW, STATE_CONNECTED, diff --git a/src/TcpNetDb.cpp b/src/TcpNetDb.cpp index b480d10..e9cd2d0 100644 --- a/src/TcpNetDb.cpp +++ b/src/TcpNetDb.cpp @@ -36,7 +36,8 @@ TcpNetDb::~TcpNetDb() { TcpNetConnection* TcpNetDb::get(const Networking::Address& from, const Networking::Address& to) { for (auto i: mConnections) { - if (from.isEqual(*(i->getFrom())) + if (i->isOpen() + && from.isEqual(*(i->getFrom())) && to.isEqual(*(i->getTo()))) { return i; @@ -48,16 +49,6 @@ TcpNetConnection* TcpNetDb::get(const Networking::Address& from, const Networkin void TcpNetDb::put(std::unique_ptr connection) { assert(connection.get()); *mLog.debug() << UTILS_STR_FUNCTION << ", Id: " << connection->getId(); - for (auto it = mConnections.begin(); it!=mConnections.end(); it++) { - if (connection->getFrom()->isEqual(*((*it)->getFrom())) - && connection->getTo()->isEqual(*((*it)->getTo()))) - { - // replace old by new one - destroy(*it); - *it = connection.release(); - return; - } - } // add new one mConnections.push_back(connection.get()); connection.release();