Skip to content

Commit

Permalink
Merge pull request #143 from AppLayerLabs/gcrequests
Browse files Browse the repository at this point in the history
Implement GC of ManagerBase::requests_
  • Loading branch information
fcecin authored Dec 20, 2024
2 parents 5db0c19 + 44f805f commit fec35ba
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 87 deletions.
61 changes: 53 additions & 8 deletions src/net/p2p/managerbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,17 @@ namespace P2P {
LOGDEBUG("Peer " + toString(nodeId) + " is a discovery node, cannot send request");
return nullptr;
}
std::unique_lock lockRequests(this->requestsMutex_);
requests_[message->id()] = std::make_shared<Request>(message->command(), message->id(), session->hostNodeId(), message);
bool written = session->write(message);
if (written) {
return requests_[message->id()];
} else {
// Delete the request if it was not written
requests_.erase(message->id());
// Track the request
auto request = std::make_shared<Request>(message->command(), message->id(), session->hostNodeId(), message);
insertRequest(request);
// If message was never sent, then no request has been made. Fail and return.
if (!session->write(message)) {
// We know the latest insert went into the active bucket, so we can just remove it.
std::unique_lock lock(this->requestsMutex_);
requests_[activeRequests_].erase(message->id());
return nullptr;
}
return request;
}
// ManagerBase::answerSession doesn't change sessions_ map, but we still need to
Expand Down Expand Up @@ -419,6 +420,13 @@ namespace P2P {
// objects and the entire Boost ASIO engine are fully gone.

LOGDEBUG("Net engine destroyed");

// We don't need the requests_ cache anymore so free up that memory.
std::unique_lock lockRequests(this->requestsMutex_);
requests_[0].clear();
requests_[1].clear();
activeRequests_ = 0;
lockRequests.unlock();
}

bool ManagerBase::isActive() const {
Expand Down Expand Up @@ -724,4 +732,41 @@ namespace P2P {
// It is probably better to have asio::post inside the sessions mutex to sync with ManagerBase::stop().
std::make_shared<Session>(std::move(socket), ConnectionType::INBOUND, *this)->run();
}

void ManagerBase::insertRequest(std::shared_ptr<Request> request) {
// The previous logic would silently overwrite the previous entry in case of an (improbable)
// request ID collision; this logic is retained here, since lookups will be directed to the
// active bucket first (so a colliding entry that is in the older bucket is shadowed anyway).
std::unique_lock lock(this->requestsMutex_);
if (requests_[activeRequests_].size() >= REQUEST_BUCKET_SIZE_LIMIT) {
activeRequests_ = 1 - activeRequests_;
requests_[activeRequests_].clear();
}
// Make sure the latest insertion always ended up in what is the current active bucket
requests_[activeRequests_][request->id()] = std::move(request);
}

void ManagerBase::handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr<const Message>& message) {
RequestID requestId = message->id();
std::unique_lock lock(this->requestsMutex_);
// Search for the request in both buckets, starting with the active bucket.
// If found, set its answer and return.
for (int i = 0; i < 2; ++i) {
auto& bucket = requests_[(activeRequests_ + i) % 2];
auto it = bucket.find(requestId);
if (it != bucket.end()) {
std::shared_ptr<Request>& req = it->second;
req->setAnswer(message);
return;
}
}
// Receiving an answer for a request ID that can't be found locally is always an error.
// This could legitimately happen if the request storage is too small for actual request traffic,
// so make sure request storage is oversized at all times by a factor of at least 10.
// If there is a need to optimize memory usage, buckets can be instead allocated dynamically as
// they are needed (e.g. in a deque) and an expiration timestamp can be associated with each bucket.
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) + " closing session.");
this->disconnectSession(nodeId);
}
}
28 changes: 21 additions & 7 deletions src/net/p2p/managerbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ See the LICENSE.txt file in the project root for more information.
#ifndef P2P_MANAGER_BASE
#define P2P_MANAGER_BASE

/// Size of a sync-requests active bucket, in items (requests); there are two buckets.
/// This needs to be large enough to hold the maximum number of requests that can be conceivable generated
/// in the maximum time period that it takes to answer a request.
#define REQUEST_BUCKET_SIZE_LIMIT 100'000

#include "discovery.h"
#include "session.h" // encoding.h -> utils/options.h

Expand Down Expand Up @@ -63,13 +68,9 @@ namespace P2P {
DiscoveryWorker discoveryWorker_; ///< DiscoveryWorker object.
const std::string instanceIdStr_; ///< Instance ID for LOGxxx().
const NodeID nodeId_; ///< This ManagerBase's own NodeID.

/// List of currently active sessions.
boost::unordered_flat_map<NodeID, std::shared_ptr<Session>, SafeHash> sessions_;

// TODO: Somehow find a way to clean up requests_ after a certain time/being used.
/// List of currently active requests.
boost::unordered_flat_map<RequestID, std::shared_ptr<Request>, SafeHash> requests_;
boost::unordered_flat_map<NodeID, std::shared_ptr<Session>, SafeHash> sessions_; ///< List of currently active sessions.
std::array<boost::unordered_flat_map<RequestID, std::shared_ptr<Request>, SafeHash>, 2> requests_; ///< Request buckets.
uint64_t activeRequests_ = 0; ///< Index of the current (active) request bucket (either 0 or 1).

/**
* Send a Request to a given node.
Expand Down Expand Up @@ -142,6 +143,19 @@ namespace P2P {
*/
void trySpawnInboundSession(tcp::socket&& socket);

/**
* Insert a new request into requests_.
* @param request The newly created request to be tracked by requests_.
*/
void insertRequest(std::shared_ptr<Request> request);

/**
* Handles a message that is a generic answer to a synchronous generic remote request.
* @param nodeId The node that is answering the request (so it can be disconnected on error).
* @param message The message that is supposed to be an answer to a request that is tracked in requests_.
*/
void handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr<const Message>& message);

public:
/**
* Constructor.
Expand Down
20 changes: 2 additions & 18 deletions src/net/p2p/managerdiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,13 @@ namespace P2P {
void ManagerDiscovery::handlePingAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) +
" closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerDiscovery::handleRequestNodesAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) +
" closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}
};

60 changes: 6 additions & 54 deletions src/net/p2p/managernormal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,85 +221,37 @@ namespace P2P{
void ManagerNormal::handlePingAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before doing anything else to avoid waiting for other locks.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleInfoAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleRequestNodesAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleTxValidatorAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleTxAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" +
std::to_string(nodeId.second) + " , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleRequestBlockAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" +
std::to_string(nodeId.second) + " , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleInfoNotification(
Expand Down

0 comments on commit fec35ba

Please sign in to comment.