diff --git a/src/net/p2p/managerbase.cpp b/src/net/p2p/managerbase.cpp index cdc20cf9..303499b5 100644 --- a/src/net/p2p/managerbase.cpp +++ b/src/net/p2p/managerbase.cpp @@ -264,16 +264,17 @@ namespace P2P { LOGDEBUG("Peer " + toString(nodeId) + " is a discovery node, cannot send request"); return nullptr; } - std::unique_lock lockRequests(this->requestsMutex_); - requests_[message->id()] = std::make_shared(message->command(), message->id(), session->hostNodeId(), message); - bool written = session->write(message); - if (written) { - return requests_[message->id()]; - } else { - // Delete the request if it was not written - requests_.erase(message->id()); + // Track the request + auto request = std::make_shared(message->command(), message->id(), session->hostNodeId(), message); + insertRequest(request); + // If message was never sent, then no request has been made. Fail and return. + if (!session->write(message)) { + // We know the latest insert went into the active bucket, so we can just remove it. + std::unique_lock lock(this->requestsMutex_); + requests_[activeRequests_].erase(message->id()); return nullptr; } + return request; } // ManagerBase::answerSession doesn't change sessions_ map, but we still need to @@ -419,6 +420,13 @@ namespace P2P { // objects and the entire Boost ASIO engine are fully gone. LOGDEBUG("Net engine destroyed"); + + // We don't need the requests_ cache anymore so free up that memory. + std::unique_lock lockRequests(this->requestsMutex_); + requests_[0].clear(); + requests_[1].clear(); + activeRequests_ = 0; + lockRequests.unlock(); } bool ManagerBase::isActive() const { @@ -724,4 +732,41 @@ namespace P2P { // It is probably better to have asio::post inside the sessions mutex to sync with ManagerBase::stop(). std::make_shared(std::move(socket), ConnectionType::INBOUND, *this)->run(); } + + void ManagerBase::insertRequest(std::shared_ptr request) { + // The previous logic would silently overwrite the previous entry in case of an (improbable) + // request ID collision; this logic is retained here, since lookups will be directed to the + // active bucket first (so a colliding entry that is in the older bucket is shadowed anyway). + std::unique_lock lock(this->requestsMutex_); + if (requests_[activeRequests_].size() >= REQUEST_BUCKET_SIZE_LIMIT) { + activeRequests_ = 1 - activeRequests_; + requests_[activeRequests_].clear(); + } + // Make sure the latest insertion always ended up in what is the current active bucket + requests_[activeRequests_][request->id()] = std::move(request); + } + + void ManagerBase::handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr& message) { + RequestID requestId = message->id(); + std::unique_lock lock(this->requestsMutex_); + // Search for the request in both buckets, starting with the active bucket. + // If found, set its answer and return. + for (int i = 0; i < 2; ++i) { + auto& bucket = requests_[(activeRequests_ + i) % 2]; + auto it = bucket.find(requestId); + if (it != bucket.end()) { + std::shared_ptr& req = it->second; + req->setAnswer(message); + return; + } + } + // Receiving an answer for a request ID that can't be found locally is always an error. + // This could legitimately happen if the request storage is too small for actual request traffic, + // so make sure request storage is oversized at all times by a factor of at least 10. + // If there is a need to optimize memory usage, buckets can be instead allocated dynamically as + // they are needed (e.g. in a deque) and an expiration timestamp can be associated with each bucket. + lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. + LOGERROR("Answer to invalid request from " + toString(nodeId) + " closing session."); + this->disconnectSession(nodeId); + } } diff --git a/src/net/p2p/managerbase.h b/src/net/p2p/managerbase.h index 7a5fc5a3..b1f08cdb 100644 --- a/src/net/p2p/managerbase.h +++ b/src/net/p2p/managerbase.h @@ -8,6 +8,11 @@ See the LICENSE.txt file in the project root for more information. #ifndef P2P_MANAGER_BASE #define P2P_MANAGER_BASE +/// Size of a sync-requests active bucket, in items (requests); there are two buckets. +/// This needs to be large enough to hold the maximum number of requests that can be conceivable generated +/// in the maximum time period that it takes to answer a request. +#define REQUEST_BUCKET_SIZE_LIMIT 100'000 + #include "discovery.h" #include "session.h" // encoding.h -> utils/options.h @@ -63,13 +68,9 @@ namespace P2P { DiscoveryWorker discoveryWorker_; ///< DiscoveryWorker object. const std::string instanceIdStr_; ///< Instance ID for LOGxxx(). const NodeID nodeId_; ///< This ManagerBase's own NodeID. - - /// List of currently active sessions. - boost::unordered_flat_map, SafeHash> sessions_; - - // TODO: Somehow find a way to clean up requests_ after a certain time/being used. - /// List of currently active requests. - boost::unordered_flat_map, SafeHash> requests_; + boost::unordered_flat_map, SafeHash> sessions_; ///< List of currently active sessions. + std::array, SafeHash>, 2> requests_; ///< Request buckets. + uint64_t activeRequests_ = 0; ///< Index of the current (active) request bucket (either 0 or 1). /** * Send a Request to a given node. @@ -142,6 +143,19 @@ namespace P2P { */ void trySpawnInboundSession(tcp::socket&& socket); + /** + * Insert a new request into requests_. + * @param request The newly created request to be tracked by requests_. + */ + void insertRequest(std::shared_ptr request); + + /** + * Handles a message that is a generic answer to a synchronous generic remote request. + * @param nodeId The node that is answering the request (so it can be disconnected on error). + * @param message The message that is supposed to be an answer to a request that is tracked in requests_. + */ + void handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr& message); + public: /** * Constructor. diff --git a/src/net/p2p/managerdiscovery.cpp b/src/net/p2p/managerdiscovery.cpp index fd5b5fa8..758b38bd 100644 --- a/src/net/p2p/managerdiscovery.cpp +++ b/src/net/p2p/managerdiscovery.cpp @@ -102,29 +102,13 @@ namespace P2P { void ManagerDiscovery::handlePingAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGERROR("Answer to invalid request from " + toString(nodeId) + - " closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerDiscovery::handleRequestNodesAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGERROR("Answer to invalid request from " + toString(nodeId) + - " closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } }; diff --git a/src/net/p2p/managernormal.cpp b/src/net/p2p/managernormal.cpp index 290c06ba..801a0f4f 100644 --- a/src/net/p2p/managernormal.cpp +++ b/src/net/p2p/managernormal.cpp @@ -221,85 +221,37 @@ namespace P2P{ void ManagerNormal::handlePingAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before doing anything else to avoid waiting for other locks. - LOGDEBUG("Answer to invalid request from " + toString(nodeId) + - " , closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerNormal::handleInfoAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGDEBUG("Answer to invalid request from " + toString(nodeId) + - " , closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerNormal::handleRequestNodesAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGDEBUG("Answer to invalid request from " + toString(nodeId) + - " , closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerNormal::handleTxValidatorAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGDEBUG("Answer to invalid request from " + toString(nodeId) + - " , closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerNormal::handleTxAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" + - std::to_string(nodeId.second) + " , closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerNormal::handleRequestBlockAnswer( const NodeID &nodeId, const std::shared_ptr& message ) { - std::unique_lock lock(this->requestsMutex_); - if (!requests_.contains(message->id())) { - lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function. - LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" + - std::to_string(nodeId.second) + " , closing session."); - this->disconnectSession(nodeId); - return; - } - requests_[message->id()]->setAnswer(message); + handleRequestAnswer(nodeId, message); } void ManagerNormal::handleInfoNotification(