From 64f7c3d0728e59310114a9b17130a8dee81fc4fa Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Fri, 23 Feb 2024 06:19:00 -0300 Subject: [PATCH] Internal refactor of P2P::Session vs. P2P::ManagerXXX classes (#93) * Replace weak_ptr with NodeId in Manager* APIs * Fix SonarQube issue * Remove retrieving session object for logging errors * Harden P2P networking shutdown sequence - Rule out any possibility that ManagerBase::sessions_ will grow (or shrink) after ManagerBase::close_ is set to true, so that ManagerBase::stop() can close all registered sessions - On ManagerBase::stop(), join the Session thread pool after Server and ClientFactory are stopped (possibly more correct) * P2P::ManagerBase: - removed closed_; replaced by started_ (ManagerBase should be able to start() and stop() repeatedly and safely) - added stateMutex_ to synchronize start(), stop() and thread pool - start() creates threadPool_ - stop() destroys threadPool_ (waits for all tasks and joins all threads, guaranteeing that there is no session/socket message handling code active when it is time to e.g. destroy the underlying io_context) - threadPool_ is now isolated and not forwarded to other objects; to use the ManagerBase thread pool, other objects have to call a method on ManagerBase that will service the request, such as ManagerBase::asyncHandleMessage() state.cpp (test): - increased timeout from 5s to 120s in a couple of futures to avoid random, performance-related failures (logs to cout if they exceed 5s, which in any case is rare) * add this-> --- src/net/p2p/client.cpp | 3 +- src/net/p2p/client.h | 9 +- src/net/p2p/managerbase.cpp | 49 +++-- src/net/p2p/managerbase.h | 32 +-- src/net/p2p/managerdiscovery.cpp | 136 ++++--------- src/net/p2p/managerdiscovery.h | 14 +- src/net/p2p/managernormal.cpp | 331 ++++++++++--------------------- src/net/p2p/managernormal.h | 34 ++-- src/net/p2p/server.cpp | 5 +- src/net/p2p/server.h | 23 +-- src/net/p2p/session.cpp | 5 +- src/net/p2p/session.h | 10 +- tests/core/state.cpp | 14 +- 13 files changed, 253 insertions(+), 412 deletions(-) diff --git a/src/net/p2p/client.cpp b/src/net/p2p/client.cpp index b1d21e1c..25ea46f8 100644 --- a/src/net/p2p/client.cpp +++ b/src/net/p2p/client.cpp @@ -10,11 +10,10 @@ See the LICENSE.txt file in the project root for more information. namespace P2P { void ClientFactory::createClientSession(const boost::asio::ip::address &address, const unsigned short &port) { tcp::socket socket(io_context_); - auto session = std::make_shared(std::move(socket), ConnectionType::OUTBOUND, manager_, this->threadPool_, address, port); + auto session = std::make_shared(std::move(socket), ConnectionType::OUTBOUND, manager_, address, port); session->run(); } - bool ClientFactory::run() { Logger::logToDebug(LogType::INFO, Log::P2PClientFactory, __func__, "Starting P2P Client Factory " diff --git a/src/net/p2p/client.h b/src/net/p2p/client.h index 6d274cf3..de3fe06c 100644 --- a/src/net/p2p/client.h +++ b/src/net/p2p/client.h @@ -43,9 +43,6 @@ namespace P2P { /// Reference to the manager. ManagerBase& manager_; - /// Reference to the thread pool. - BS::thread_pool_light& threadPool_; - /// Internal function for creating a new client session. void createClientSession(const boost::asio::ip::address &address, const unsigned short &port); @@ -55,14 +52,12 @@ namespace P2P { * Constructor for the ClientFactory. * @param manager Reference to the manager. * @param threadCount Number of threads to use. - * @param threadPool Reference to the thread pool. */ - ClientFactory(ManagerBase& manager, const uint8_t &threadCount, BS::thread_pool_light& threadPool) : + ClientFactory(ManagerBase& manager, const uint8_t &threadCount) : work_guard_(boost::asio::make_work_guard(io_context_)), connectorStrand_(io_context_.get_executor()), threadCount_(threadCount), - manager_(manager), - threadPool_(threadPool) {} + manager_(manager) {} /// Start the Factory. bool start(); diff --git a/src/net/p2p/managerbase.cpp b/src/net/p2p/managerbase.cpp index 3c0321c7..ea8ba0f6 100644 --- a/src/net/p2p/managerbase.cpp +++ b/src/net/p2p/managerbase.cpp @@ -10,10 +10,10 @@ See the LICENSE.txt file in the project root for more information. namespace P2P { bool ManagerBase::registerSessionInternal(const std::shared_ptr& session) { - if (this->closed_) { + std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::registerSessionInternal can change sessions_ map. + if (!this->started_) { return false; } - std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::registerSessionInternal can change sessions_ map. // The NodeID of a session is made by the host IP and his server port. // That means, it is possible for us to receive a inbound connection for someone that we already have a outbound connection. // In this case, we will keep the oldest connection alive and close the new one. @@ -31,10 +31,10 @@ namespace P2P { } bool ManagerBase::unregisterSessionInternal(const std::shared_ptr &session) { - if (this->closed_) { + std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::unregisterSessionInternal can change sessions_ map. + if (!this->started_) { return false; } - std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::unregisterSessionInternal can change sessions_ map. if (!sessions_.contains(session->hostNodeId())) { lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session does not exist at " + @@ -47,6 +47,9 @@ namespace P2P { bool ManagerBase::disconnectSessionInternal(const NodeID& nodeId) { std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::disconnectSessionInternal can change sessions_ map. + if (!this->started_) { + return false; + } if (!sessions_.contains(nodeId)) { lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session does not exist at " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second)); @@ -60,7 +63,7 @@ namespace P2P { } std::shared_ptr ManagerBase::sendRequestTo(const NodeID &nodeId, const std::shared_ptr& message) { - if (this->closed_) return nullptr; + if (!this->started_) return nullptr; std::shared_lock lockSession(this->sessionsMutex_); // ManagerBase::sendRequestTo doesn't change sessions_ map. if(!sessions_.contains(nodeId)) { lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. @@ -84,24 +87,30 @@ namespace P2P { // ManagerBase::answerSession doesn't change sessions_ map, but we still need to // be sure that the session io_context doesn't get deleted while we are using it. - void ManagerBase::answerSession(std::weak_ptr session, const std::shared_ptr& message) { - if (this->closed_) return; - std::shared_lock lockSession(this->sessionsMutex_); - if (auto ptr = session.lock()) { - ptr->write(message); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session is no longer valid"); + void ManagerBase::answerSession(const NodeID &nodeId, const std::shared_ptr& message) { + std::shared_lock lockSession(this->sessionsMutex_); + if (!this->started_) return; + auto it = sessions_.find(nodeId); + if (it == sessions_.end()) { + Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Cannot find session for " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second)); + return; } + it->second->write(message); } void ManagerBase::start() { - this->closed_ = false; + std::scoped_lock lock(this->stateMutex_); + if (this->started_) return; + this->started_ = true; + this->threadPool_ = std::make_unique(std::thread::hardware_concurrency() * 4); this->server_.start(); this->clientfactory_.start(); } void ManagerBase::stop() { - this->closed_ = true; + std::scoped_lock lock(this->stateMutex_); + if (! this->started_) return; + this->started_ = false; { std::unique_lock lock(this->sessionsMutex_); for (auto it = sessions_.begin(); it != sessions_.end();) { @@ -110,9 +119,16 @@ namespace P2P { if (auto sessionPtr = session.lock()) sessionPtr->close(); } } - this->threadPool_.wait_for_tasks(); this->server_.stop(); this->clientfactory_.stop(); + this->threadPool_.reset(); + } + + void ManagerBase::asyncHandleMessage(const NodeID &nodeId, const std::shared_ptr message) { + std::shared_lock lock(this->stateMutex_); + if (this->threadPool_) { + this->threadPool_->push_task(&ManagerBase::handleMessage, this, nodeId, message); + } } std::vector ManagerBase::getSessionsIDs() const { @@ -142,7 +158,7 @@ namespace P2P { } void ManagerBase::connectToServer(const boost::asio::ip::address& address, uint16_t port) { - if (this->closed_) return; + if (!this->started_) return; if (address == this->server_.getLocalAddress() && port == this->serverPort_) return; /// Cannot connect to itself. { std::shared_lock lock(this->sessionsMutex_); @@ -188,4 +204,3 @@ namespace P2P { } } } - diff --git a/src/net/p2p/managerbase.h b/src/net/p2p/managerbase.h index 9a9ac09c..eb8b18e9 100644 --- a/src/net/p2p/managerbase.h +++ b/src/net/p2p/managerbase.h @@ -35,11 +35,14 @@ namespace P2P { /// Minimum number of simultaneous connections. See DiscoveryWorker for more information. const unsigned int minConnections_ = 11; - /// Indicates whether the manager is closed to new connections. - std::atomic closed_ = true; + /// Check if manager is in the start() state (stop() not called yet). + std::atomic started_ = false; + + /// Core mutex for serializing start(), stop(), and threadPool_. + mutable std::shared_mutex stateMutex_; /// Pointer to the thread pool. - BS::thread_pool_light threadPool_; + std::unique_ptr threadPool_; /// Pointer to the options singleton. const Options& options_; @@ -88,7 +91,7 @@ namespace P2P { * @param session The session to answer to. * @param message The message to answer. */ - void answerSession(std::weak_ptr session, const std::shared_ptr& message); + void answerSession(const NodeID &nodeId, const std::shared_ptr& message); // TODO: There is a bug with handleRequest that throws std::system_error. // I believe that this is related with the std::shared_ptr getting deleted or @@ -98,7 +101,7 @@ namespace P2P { * @param session The session that sent the message. * @param message The message to handle. */ - virtual void handleRequest(std::weak_ptr session, const std::shared_ptr& message) { + virtual void handleRequest(const NodeID &nodeId, const std::shared_ptr& message) { // Do nothing by default, child classes are meant to override this } @@ -107,7 +110,7 @@ namespace P2P { * @param session The session that sent the message. * @param message The message to handle. */ - virtual void handleAnswer(std::weak_ptr session, const std::shared_ptr& message) { + virtual void handleAnswer(const NodeID &nodeId, const std::shared_ptr& message) { // Do nothing by default, child classes are meant to override this } @@ -123,9 +126,8 @@ namespace P2P { const net::ip::address& hostIp, NodeType nodeType, unsigned int maxConnections, const Options& options ) : serverPort_(options.getP2PPort()), nodeType_(nodeType), maxConnections_(maxConnections), options_(options), - threadPool_(std::thread::hardware_concurrency() * 4), - server_(hostIp, options.getP2PPort(), 4, *this, this->threadPool_), - clientfactory_(*this, 4, this->threadPool_), + server_(hostIp, options.getP2PPort(), 4, *this), + clientfactory_(*this, 4), discoveryWorker_(*this) {}; /// Destructor. Automatically stops the manager. @@ -164,9 +166,6 @@ namespace P2P { /// Getter for `minConnections_`. unsigned int minConnections() const { return this->minConnections_; } - /// Getter for `closed_`. - const std::atomic& isClosed() const { return this->closed_; } - /// Get the size of the session list. uint64_t getPeerCount() const { std::shared_lock lock(this->sessionsMutex_); return this->sessions_.size(); } @@ -200,6 +199,13 @@ namespace P2P { */ void connectToServer(const boost::asio::ip::address& address, uint16_t port); + /** + * Entrust the internal thread pool to call handleMessage() with the supplied arguments. + * @param session The session to send an answer to. + * @param message The message to handle. + */ + void asyncHandleMessage(const NodeID &nodeId, const std::shared_ptr message); + /** * Handle a message from a session. * The pointer is a weak_ptr because the parser doesn't need to own the session. @@ -209,7 +215,7 @@ namespace P2P { * @param session The session to send an answer to. * @param message The message to handle. */ - virtual void handleMessage(std::weak_ptr session, const std::shared_ptr message) { + virtual void handleMessage(const NodeID &nodeId, const std::shared_ptr message) { // Do nothing by default, child classes are meant to override this } diff --git a/src/net/p2p/managerdiscovery.cpp b/src/net/p2p/managerdiscovery.cpp index 863a1d9d..61d31f93 100644 --- a/src/net/p2p/managerdiscovery.cpp +++ b/src/net/p2p/managerdiscovery.cpp @@ -9,121 +9,85 @@ See the LICENSE.txt file in the project root for more information. namespace P2P { void ManagerDiscovery::handleMessage( - std::weak_ptr session, const std::shared_ptr message + const NodeID &nodeId, const std::shared_ptr message ) { - if (this->closed_) return; + if (!this->started_) return; switch (message->type()) { case Requesting: - handleRequest(session, message); + handleRequest(nodeId, message); break; case Answering: - handleAnswer(session, message); + handleAnswer(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid message type from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid message type from unknown session, the session ran away." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid message type from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " closing session."); + this->disconnectSession(nodeId); break; } } void ManagerDiscovery::handleRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { switch (message->command()) { case Ping: - handlePingRequest(session, message); + handlePingRequest(nodeId, message); break; case RequestNodes: - handleRequestNodesRequest(session, message); + handleRequestNodesRequest(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Request Command Type from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + ", closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Request Command Type from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid Request Command Type from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + ", closing session."); + this->disconnectSession(nodeId); break; } } void ManagerDiscovery::handleAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { switch (message->command()) { case Ping: - handlePingAnswer(session, message); + handlePingAnswer(nodeId, message); break; case Info: break; case RequestNodes: - handleRequestNodesAnswer(session, message); + handleRequestNodesAnswer(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Answer Command Type from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + ", closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Answer Command Type from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid Answer Command Type from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + ", closing session."); + this->disconnectSession(nodeId); break; } } void ManagerDiscovery::handlePingRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { if (!RequestDecoder::ping(*message)) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid ping request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid ping request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid ping request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " closing session."); + this->disconnectSession(nodeId); return; } - this->answerSession(session, std::make_shared(AnswerEncoder::ping(*message))); + this->answerSession(nodeId, std::make_shared(AnswerEncoder::ping(*message))); } void ManagerDiscovery::handleRequestNodesRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { if (!RequestDecoder::requestNodes(*message)) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestNodes request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestNodes request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid requestNodes request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + " closing session."); + this->disconnectSession(nodeId); return; } @@ -137,48 +101,34 @@ namespace P2P { } ); } - this->answerSession(session, std::make_shared(AnswerEncoder::requestNodes(*message, nodes))); + this->answerSession(nodeId, std::make_shared(AnswerEncoder::requestNodes(*message, nodes))); } void ManagerDiscovery::handlePingAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); } void ManagerDiscovery::handleRequestNodesAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); diff --git a/src/net/p2p/managerdiscovery.h b/src/net/p2p/managerdiscovery.h index 60514473..b2a0a31a 100644 --- a/src/net/p2p/managerdiscovery.h +++ b/src/net/p2p/managerdiscovery.h @@ -22,14 +22,14 @@ namespace P2P { * @param session The session that sent the request. * @param message The request message to handle. */ - void handleRequest(std::weak_ptr session, const std::shared_ptr& message) override; + void handleRequest(const NodeID &nodeId, const std::shared_ptr& message) override; /** * Handle an answer from a server. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleAnswer(std::weak_ptr session, const std::shared_ptr& message) override; + void handleAnswer(const NodeID &nodeId, const std::shared_ptr& message) override; private: /** @@ -37,28 +37,28 @@ namespace P2P { * @param session The session that sent the request. * @param message The request message to handle. */ - void handlePingRequest(std::weak_ptr session, const std::shared_ptr& message); + void handlePingRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestNodes` request. * @param session The session that sent the request. * @param message The request message to handle. */ - void handleRequestNodesRequest(std::weak_ptr session, const std::shared_ptr& message); + void handleRequestNodesRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `Ping` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handlePingAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handlePingAnswer(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestNodes` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleRequestNodesAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handleRequestNodesAnswer(const NodeID &nodeId, const std::shared_ptr& message); public: /** @@ -78,7 +78,7 @@ namespace P2P { * @param session The session that sent the message. * @param message The message to handle. */ - void handleMessage(std::weak_ptr session, const std::shared_ptr message) override; + void handleMessage(const NodeID &nodeId, const std::shared_ptr message) override; }; }; diff --git a/src/net/p2p/managernormal.cpp b/src/net/p2p/managernormal.cpp index b19bf2b5..9611f460 100644 --- a/src/net/p2p/managernormal.cpp +++ b/src/net/p2p/managernormal.cpp @@ -12,7 +12,7 @@ See the LICENSE.txt file in the project root for more information. namespace P2P{ void ManagerNormal::broadcastMessage(const std::shared_ptr message) { - if (this->closed_) return; + if (!this->started_) return; { std::unique_lock broadcastLock(this->broadcastMutex_); if (broadcastedMessages_[message->id().toUint64()] > 0) { @@ -36,114 +36,89 @@ namespace P2P{ } void ManagerNormal::handleMessage( - std::weak_ptr session, const std::shared_ptr message + const NodeID &nodeId, const std::shared_ptr message ) { - if (this->closed_) return; + if (!this->started_) return; switch (message->type()) { case Requesting: - handleRequest(session, message); + handleRequest(nodeId, message); break; case Answering: - handleAnswer(session, message); + handleAnswer(nodeId, message); break; case Broadcasting: - handleBroadcast(session, message); + handleBroadcast(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid message type from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid message type from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid message type from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + " , closing session."); + this->disconnectSession(nodeId); break; } } void ManagerNormal::handleRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { switch (message->command()) { case Ping: - handlePingRequest(session, message); + handlePingRequest(nodeId, message); break; case Info: - handleInfoRequest(session, message); + handleInfoRequest(nodeId, message); break; case RequestNodes: - handleRequestNodesRequest(session, message); + handleRequestNodesRequest(nodeId, message); break; case RequestValidatorTxs: - handleTxValidatorRequest(session, message); + handleTxValidatorRequest(nodeId, message); break; case RequestTxs: - handleTxRequest(session, message); + handleTxRequest(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Request Command Type: " + std::to_string(message->command()) - + " from: " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Request Command Type: " + std::to_string(message->command()) - + " from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid Request Command Type: " + std::to_string(message->command()) + + " from: " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + ", closing session."); + this->disconnectSession(nodeId); break; } } void ManagerNormal::handleAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { switch (message->command()) { case Ping: - handlePingAnswer(session, message); + handlePingAnswer(nodeId, message); break; case Info: - handleInfoAnswer(session, message); + handleInfoAnswer(nodeId, message); break; case RequestNodes: - handleRequestNodesAnswer(session, message); + handleRequestNodesAnswer(nodeId, message); break; case RequestValidatorTxs: - handleTxValidatorAnswer(session, message); + handleTxValidatorAnswer(nodeId, message); break; case RequestTxs: - handleTxAnswer(session, message); + handleTxAnswer(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Answer Command Type: " + std::to_string(message->command()) - + " from: " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Answer Command Type: " + std::to_string(message->command()) - + " from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid Answer Command Type: " + std::to_string(message->command()) + + " from: " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); break; } } void ManagerNormal::handleBroadcast( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { - if (this->closed_) return; + if (!this->started_) return; { std::shared_lock broadcastLock(this->broadcastMutex_); auto it = broadcastedMessages_.find(message->id().toUint64()); @@ -158,76 +133,54 @@ namespace P2P{ } switch (message->command()) { case BroadcastValidatorTx: - handleTxValidatorBroadcast(session, message); + handleTxValidatorBroadcast(nodeId, message); break; case BroadcastTx: - handleTxBroadcast(session, message); + handleTxBroadcast(nodeId, message); break; case BroadcastBlock: - handleBlockBroadcast(session, message); + handleBlockBroadcast(nodeId, message); break; default: - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Broadcast Command Type: " + std::to_string(message->command()) - + " from: " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid Broadcast Command Type: " + std::to_string(message->command()) - + " from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid Broadcast Command Type: " + std::to_string(message->command()) + + " from: " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); break; } } void ManagerNormal::handlePingRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { if (!RequestDecoder::ping(*message)) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid ping request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid ping request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid ping request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } - this->answerSession(session, std::make_shared(AnswerEncoder::ping(*message))); + this->answerSession(nodeId, std::make_shared(AnswerEncoder::ping(*message))); } void ManagerNormal::handleInfoRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { RequestDecoder::info(*message); - this->answerSession(session, std::make_shared(AnswerEncoder::info( + this->answerSession(nodeId, std::make_shared(AnswerEncoder::info( *message, this->storage_.latest(), this->options_ ))); } void ManagerNormal::handleRequestNodesRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { if (!RequestDecoder::requestNodes(*message)) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestNodes request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestNodes request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid requestNodes request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } @@ -241,203 +194,140 @@ namespace P2P{ } ); } - this->answerSession(session, std::make_shared(AnswerEncoder::requestNodes(*message, nodes))); + this->answerSession(nodeId, std::make_shared(AnswerEncoder::requestNodes(*message, nodes))); } void ManagerNormal::handleTxValidatorRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { if (!RequestDecoder::requestValidatorTxs(*message)) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestValidatorTxs request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestValidatorTxs request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid requestValidatorTxs request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } - this->answerSession(session, std::make_shared(AnswerEncoder::requestValidatorTxs(*message, this->rdpos_.getMempool()))); + this->answerSession(nodeId, std::make_shared(AnswerEncoder::requestValidatorTxs(*message, this->rdpos_.getMempool()))); } void ManagerNormal::handleTxRequest( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { if (!RequestDecoder::requestTxs(*message)) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestTxs request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid requestTxs request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid requestTxs request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } - this->answerSession(session, std::make_shared(AnswerEncoder::requestTxs(*message, this->state_.getMempool()))); + this->answerSession(nodeId, std::make_shared(AnswerEncoder::requestTxs(*message, this->state_.getMempool()))); } void ManagerNormal::handlePingAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before doing anything else to avoid waiting for other locks. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); } void ManagerNormal::handleInfoAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); } void ManagerNormal::handleRequestNodesAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); } void ManagerNormal::handleTxValidatorAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); } void ManagerNormal::handleTxAnswer( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { std::unique_lock lock(this->requestsMutex_); if (!requests_.contains(message->id())) { lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Answer to invalid request from unknown session, closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Answer to invalid request from " + nodeId.first.to_string() + ":" + + std::to_string(nodeId.second) + " , closing session."); + this->disconnectSession(nodeId); return; } requests_[message->id()]->setAnswer(message); } void ManagerNormal::handleTxValidatorBroadcast( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { try { auto tx = BroadcastDecoder::broadcastValidatorTx(*message, this->options_.getChainID()); if (this->state_.addValidatorTx(tx)) this->broadcastMessage(message); } catch (std::exception &e) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid txValidatorBroadcast from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , error: " + e.what() + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - std::string("Invalid txValidatorBroadcast from unknown session, error: ") + e.what() + " closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid txValidatorBroadcast from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , error: " + e.what() + " closing session."); + this->disconnectSession(nodeId); } } void ManagerNormal::handleTxBroadcast( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { try { auto tx = BroadcastDecoder::broadcastTx(*message, this->options_.getChainID()); if (!this->state_.addTx(std::move(tx))) this->broadcastMessage(message); } catch (std::exception &e) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid txBroadcast from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , error: " + e.what() + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - std::string("Invalid txBroadcast from unknown session, error: ") + e.what() + " closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid txBroadcast from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , error: " + e.what() + " closing session."); + this->disconnectSession(nodeId); } } void ManagerNormal::handleBlockBroadcast( - std::weak_ptr session, const std::shared_ptr& message + const NodeID &nodeId, const std::shared_ptr& message ) { // We require a lock here because validateNextBlock **throws** if the block is invalid. // The reason for locking because for that a processNextBlock race condition can occur, @@ -456,17 +346,10 @@ namespace P2P{ rebroadcast = true; } } catch (std::exception &e) { - if (auto sessionPtr = session.lock()) { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - "Invalid blockBroadcast from " + sessionPtr->hostNodeId().first.to_string() + ":" + - std::to_string(sessionPtr->hostNodeId().second) + " , error: " + e.what() + " closing session." - ); - this->disconnectSession(sessionPtr->hostNodeId()); - } else { - Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, - std::string("Invalid blockBroadcast from unknown session, error: ") + e.what() + " closing session." - ); - } + Logger::logToDebug(LogType::ERROR, Log::P2PParser, __func__, + "Invalid blockBroadcast from " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second) + + " , error: " + e.what() + " closing session."); + this->disconnectSession(nodeId); return; } if (rebroadcast) this->broadcastMessage(message); diff --git a/src/net/p2p/managernormal.h b/src/net/p2p/managernormal.h index c54f9ec6..882ceee0 100644 --- a/src/net/p2p/managernormal.h +++ b/src/net/p2p/managernormal.h @@ -24,21 +24,21 @@ namespace P2P { * @param session The session that sent the request. * @param message The request message to handle. */ - void handleRequest(std::weak_ptr session, const std::shared_ptr& message) override; + void handleRequest(const NodeID &nodeId, const std::shared_ptr& message) override; /** * Handle an answer from a server. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleAnswer(std::weak_ptr session, const std::shared_ptr& message) override; + void handleAnswer(const NodeID &nodeId, const std::shared_ptr& message) override; /** * Handle a broadcast from a node. * @param session The session that sent the broadcast. * @param message The broadcast message to handle. */ - void handleBroadcast(std::weak_ptr session, const std::shared_ptr& message); + void handleBroadcast(const NodeID &nodeId, const std::shared_ptr& message); private: /// Reference to the rdPoS object. @@ -73,91 +73,91 @@ namespace P2P { * @param session The session that sent the request. * @param message The request message to handle. */ - void handlePingRequest(std::weak_ptr session, const std::shared_ptr& message); + void handlePingRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `Info` request. * @param session The session that sent the request. * @param message The request message to handle. */ - void handleInfoRequest(std::weak_ptr session, const std::shared_ptr& message); + void handleInfoRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestNodes` request. * @param session The session that sent the request. * @param message The request message to handle. */ - void handleRequestNodesRequest(std::weak_ptr session, const std::shared_ptr& message); + void handleRequestNodesRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestValidatorTxs` request. * @param session The session that sent the request. * @param message The request message to handle. */ - void handleTxValidatorRequest(std::weak_ptr session, const std::shared_ptr& message); + void handleTxValidatorRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestTxs` request. * @param session The session that sent the request. * @param message The request message to handle. */ - void handleTxRequest(std::weak_ptr session, const std::shared_ptr& message); + void handleTxRequest(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `Ping` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handlePingAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handlePingAnswer(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `Info` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleInfoAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handleInfoAnswer(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestNodes` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleRequestNodesAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handleRequestNodesAnswer(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestValidatorTxs` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleTxValidatorAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handleTxValidatorAnswer(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a `RequestTxs` answer. * @param session The session that sent the answer. * @param message The answer message to handle. */ - void handleTxAnswer(std::weak_ptr session, const std::shared_ptr& message); + void handleTxAnswer(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a Validator transaction broadcast message. * @param session The node that sent the broadcast. * @param message The message that was broadcast. */ - void handleTxValidatorBroadcast(std::weak_ptr session, const std::shared_ptr& message); + void handleTxValidatorBroadcast(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a block transaction broadcast message. * @param session The node that sent the broadcast. * @param message The message that was broadcast. */ - void handleTxBroadcast(std::weak_ptr session, const std::shared_ptr& message); + void handleTxBroadcast(const NodeID &nodeId, const std::shared_ptr& message); /** * Handle a block broadcast message. * @param session The node that sent the broadcast. * @param message The message that was broadcast. */ - void handleBlockBroadcast(std::weak_ptr session, const std::shared_ptr& message); + void handleBlockBroadcast(const NodeID &nodeId, const std::shared_ptr& message); public: /** @@ -184,7 +184,7 @@ namespace P2P { * @param session The session that sent the message. * @param message The message to handle. */ - void handleMessage(std::weak_ptr session, const std::shared_ptr message) override; + void handleMessage(const NodeID &nodeId, const std::shared_ptr message) override; /** * Request Validator transactions from a given node. diff --git a/src/net/p2p/server.cpp b/src/net/p2p/server.cpp index 4b2965c6..db6c9a92 100644 --- a/src/net/p2p/server.cpp +++ b/src/net/p2p/server.cpp @@ -6,6 +6,7 @@ See the LICENSE.txt file in the project root for more information. */ #include "server.h" +#include "managerbase.h" namespace P2P { void ServerListener::do_accept() { @@ -25,7 +26,7 @@ namespace P2P { /// TODO: Handle error return; } else { - std::make_shared(std::move(socket), ConnectionType::INBOUND, this->manager_, this->threadPool_)->run(); + std::make_shared(std::move(socket), ConnectionType::INBOUND, this->manager_)->run(); } this->do_accept(); } @@ -58,7 +59,7 @@ namespace P2P { io_context_.restart(); Logger::logToDebug(LogType::DEBUG, Log::P2PServer, __func__, "Starting listener."); this->listener_ = std::make_shared( - io_context_, tcp::endpoint{this->localAddress_, this->localPort_}, this->manager_, this->threadPool_ + io_context_, tcp::endpoint{this->localAddress_, this->localPort_}, this->manager_ ); this->listener_->run(); Logger::logToDebug(LogType::DEBUG, Log::P2PServer, __func__, "Listener started."); diff --git a/src/net/p2p/server.h b/src/net/p2p/server.h index 00982008..cab8cf50 100644 --- a/src/net/p2p/server.h +++ b/src/net/p2p/server.h @@ -7,6 +7,9 @@ See the LICENSE.txt file in the project root for more information. #include "session.h" +#ifndef P2P_SERVER +#define P2P_SERVER + namespace P2P { /** * ServerListener class @@ -25,24 +28,19 @@ namespace P2P { void on_accept(boost::system::error_code ec, net::ip::tcp::socket socket); /// Pointer back to the Manager object. ManagerBase& manager_; - /// Reference to the thread pool. - BS::thread_pool_light& threadPool_; public: /** * Constructor for the ServerListener. * @param io_context Reference to the server io_context. * @param endpoint The endpoint to listen on. * @param manager Reference to the manager. - * @param threadPool Reference to the thread pool. */ ServerListener(net::io_context& io_context, tcp::endpoint endpoint, - ManagerBase& manager, - BS::thread_pool_light& threadPool) : + ManagerBase& manager) : io_context_(io_context), acceptor_(io_context), - manager_(manager), - threadPool_(threadPool) { + manager_(manager) { boost::system::error_code ec; acceptor_.open(endpoint.protocol(), ec); // Open the acceptor if (ec) { Logger::logToDebug(LogType::ERROR, Log::P2PServerListener, __func__, "Open Acceptor: " + ec.message()); return; } @@ -85,9 +83,6 @@ namespace P2P { /// Pointer to the manager. ManagerBase& manager_; - /// Reference to the thread pool. - BS::thread_pool_light& threadPool_; - public: /** * Constructor for the server. @@ -95,18 +90,15 @@ namespace P2P { * @param localPort The local port. * @param threadCount Reference to the thread count. * @param manager Reference to the manager. - * @param threadPool Reference to the thread pool. */ Server(const net::ip::address &localAddress, const uint16_t &localPort, const uint8_t& threadCount, - ManagerBase& manager, - BS::thread_pool_light& threadPool) : + ManagerBase& manager) : localAddress_(localAddress), localPort_(localPort), threadCount_(threadCount), - manager_(manager), - threadPool_(threadPool) + manager_(manager) {} /// Start the Server. @@ -123,3 +115,4 @@ namespace P2P { }; } +#endif diff --git a/src/net/p2p/session.cpp b/src/net/p2p/session.cpp index d918cc0e..22cffde6 100644 --- a/src/net/p2p/session.cpp +++ b/src/net/p2p/session.cpp @@ -111,10 +111,7 @@ namespace P2P { void Session::on_read_message(boost::system::error_code ec, std::size_t) { if (ec && this->handle_error(__func__, ec)) return; - // Make it a unique_ptr so that we can pass it to the thread pool. - this->threadPool_.push_task( - &ManagerBase::handleMessage, &this->manager_, weak_from_this(), this->inboundMessage_ - ); + this->manager_.asyncHandleMessage(this->nodeId_, this->inboundMessage_); this->inboundMessage_ = nullptr; this->do_read_header(); } diff --git a/src/net/p2p/session.h b/src/net/p2p/session.h index 524a69aa..17c18956 100644 --- a/src/net/p2p/session.h +++ b/src/net/p2p/session.h @@ -19,7 +19,6 @@ See the LICENSE.txt file in the project root for more information. #include #include "../../utils/utils.h" -#include "../../libs/BS_thread_pool_light.hpp" #include "encoding.h" using boost::asio::ip::tcp; @@ -60,9 +59,6 @@ namespace P2P { /// Reference back to the Manager object. ManagerBase& manager_; - /// Reference to the thread pool. - BS::thread_pool_light& threadPool_; - net::strand readStrand_; ///< Strand for read operations. net::strand writeStrand_; ///< Strand for write operations. @@ -139,13 +135,11 @@ namespace P2P { /// Construct a session with the given socket. (Used by the server) explicit Session(tcp::socket &&socket, ConnectionType connectionType, - ManagerBase& manager, - BS::thread_pool_light& threadPool) + ManagerBase& manager) : socket_(std::move(socket)), readStrand_(socket_.get_executor()), writeStrand_(socket_.get_executor()), manager_(manager), - threadPool_(threadPool), address_(socket_.remote_endpoint().address()), port_(socket_.remote_endpoint().port()), connectionType_(connectionType) @@ -160,7 +154,6 @@ namespace P2P { explicit Session(tcp::socket &&socket, ConnectionType connectionType, ManagerBase& manager, - BS::thread_pool_light& threadPool, const net::ip::address& address, unsigned short port ) @@ -168,7 +161,6 @@ namespace P2P { readStrand_(socket_.get_executor()), writeStrand_(socket_.get_executor()), manager_(manager), - threadPool_(threadPool), address_(address), port_(port), connectionType_(connectionType) diff --git a/tests/core/state.cpp b/tests/core/state.cpp index 4c8fced2..d27d6edc 100644 --- a/tests/core/state.cpp +++ b/tests/core/state.cpp @@ -465,7 +465,12 @@ namespace TState { } }); - REQUIRE(discoveryFuture.wait_for(std::chrono::seconds(5)) != std::future_status::timeout); + // TODO: This had a 5s timeout, but this is temporarily increased to avoid random failures. + auto start = std::chrono::high_resolution_clock::now(); + REQUIRE(discoveryFuture.wait_for(std::chrono::seconds(120)) != std::future_status::timeout); + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + if (duration > 5000) std::cout << "WARNING ([state]): discoveryFuture elapsed time: " << duration << " ms" << std::endl; REQUIRE(p2pDiscovery.getSessionsIDs().size() == 8); REQUIRE(blockchainWrapper1.p2p.getSessionsIDs().size() == 1); @@ -1500,7 +1505,12 @@ namespace TState { }); // Sleep for blocks to be broadcasted and accepted. - REQUIRE(broadcastBlockFuture.wait_for(std::chrono::seconds(5)) != std::future_status::timeout); + // TODO: This had a 5s timeout, but this is temporarily increased to avoid random failures. + auto start = std::chrono::high_resolution_clock::now(); + REQUIRE(broadcastBlockFuture.wait_for(std::chrono::seconds(120)) != std::future_status::timeout); + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + if (duration > 5000) std::cout << "WARNING ([state]): broadcastBlockFuture elapsed time: " << duration << " ms" << std::endl; // Check if the block was accepted by all nodes. REQUIRE(blockchainWrapper1.storage.latest()->hash() == blockchainWrapper2.storage.latest()->hash());