Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GC of ManagerBase::requests_ #143

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions src/net/p2p/managerbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,17 @@ namespace P2P {
LOGDEBUG("Peer " + toString(nodeId) + " is a discovery node, cannot send request");
return nullptr;
}
std::unique_lock lockRequests(this->requestsMutex_);
requests_[message->id()] = std::make_shared<Request>(message->command(), message->id(), session->hostNodeId(), message);
bool written = session->write(message);
if (written) {
return requests_[message->id()];
} else {
// Delete the request if it was not written
requests_.erase(message->id());
// Track the request
auto request = std::make_shared<Request>(message->command(), message->id(), session->hostNodeId(), message);
insertRequest(request);
// If message was never sent, then no request has been made. Fail and return.
if (!session->write(message)) {
// We know the latest insert went into the active bucket, so we can just remove it.
std::unique_lock lock(this->requestsMutex_);
requests_[activeRequests_].erase(message->id());
return nullptr;
}
return request;
}

// ManagerBase::answerSession doesn't change sessions_ map, but we still need to
Expand Down Expand Up @@ -419,6 +420,13 @@ namespace P2P {
// objects and the entire Boost ASIO engine are fully gone.

LOGDEBUG("Net engine destroyed");

// We don't need the requests_ cache anymore so free up that memory.
std::unique_lock lockRequests(this->requestsMutex_);
requests_[0].clear();
requests_[1].clear();
activeRequests_ = 0;
lockRequests.unlock();
}

bool ManagerBase::isActive() const {
Expand Down Expand Up @@ -724,4 +732,41 @@ namespace P2P {
// It is probably better to have asio::post inside the sessions mutex to sync with ManagerBase::stop().
std::make_shared<Session>(std::move(socket), ConnectionType::INBOUND, *this)->run();
}

void ManagerBase::insertRequest(std::shared_ptr<Request> request) {
// The previous logic would silently overwrite the previous entry in case of an (improbable)
// request ID collision; this logic is retained here, since lookups will be directed to the
// active bucket first (so a colliding entry that is in the older bucket is shadowed anyway).
std::unique_lock lock(this->requestsMutex_);
if (requests_[activeRequests_].size() >= REQUEST_BUCKET_SIZE_LIMIT) {
activeRequests_ = 1 - activeRequests_;
requests_[activeRequests_].clear();
}
// Make sure the latest insertion always ended up in what is the current active bucket
requests_[activeRequests_][request->id()] = std::move(request);
}

void ManagerBase::handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr<const Message>& message) {
RequestID requestId = message->id();
std::unique_lock lock(this->requestsMutex_);
// Search for the request in both buckets, starting with the active bucket.
// If found, set its answer and return.
for (int i = 0; i < 2; ++i) {
auto& bucket = requests_[(activeRequests_ + i) % 2];
auto it = bucket.find(requestId);
if (it != bucket.end()) {
std::shared_ptr<Request>& req = it->second;
req->setAnswer(message);
return;
}
}
// Receiving an answer for a request ID that can't be found locally is always an error.
// This could legitimately happen if the request storage is too small for actual request traffic,
// so make sure request storage is oversized at all times by a factor of at least 10.
// If there is a need to optimize memory usage, buckets can be instead allocated dynamically as
// they are needed (e.g. in a deque) and an expiration timestamp can be associated with each bucket.
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) + " closing session.");
this->disconnectSession(nodeId);
}
}
28 changes: 21 additions & 7 deletions src/net/p2p/managerbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ See the LICENSE.txt file in the project root for more information.
#ifndef P2P_MANAGER_BASE
#define P2P_MANAGER_BASE

/// Size of a sync-requests active bucket, in items (requests); there are two buckets.
/// This needs to be large enough to hold the maximum number of requests that can be conceivable generated
/// in the maximum time period that it takes to answer a request.
#define REQUEST_BUCKET_SIZE_LIMIT 100'000

#include "discovery.h"
#include "session.h" // encoding.h -> utils/options.h

Expand Down Expand Up @@ -63,13 +68,9 @@ namespace P2P {
DiscoveryWorker discoveryWorker_; ///< DiscoveryWorker object.
const std::string instanceIdStr_; ///< Instance ID for LOGxxx().
const NodeID nodeId_; ///< This ManagerBase's own NodeID.

/// List of currently active sessions.
boost::unordered_flat_map<NodeID, std::shared_ptr<Session>, SafeHash> sessions_;

// TODO: Somehow find a way to clean up requests_ after a certain time/being used.
/// List of currently active requests.
boost::unordered_flat_map<RequestID, std::shared_ptr<Request>, SafeHash> requests_;
boost::unordered_flat_map<NodeID, std::shared_ptr<Session>, SafeHash> sessions_; ///< List of currently active sessions.
std::array<boost::unordered_flat_map<RequestID, std::shared_ptr<Request>, SafeHash>, 2> requests_; ///< Request buckets.
uint64_t activeRequests_ = 0; ///< Index of the current (active) request bucket (either 0 or 1).

/**
* Send a Request to a given node.
Expand Down Expand Up @@ -142,6 +143,19 @@ namespace P2P {
*/
void trySpawnInboundSession(tcp::socket&& socket);

/**
* Insert a new request into requests_.
* @param request The newly created request to be tracked by requests_.
*/
void insertRequest(std::shared_ptr<Request> request);

/**
* Handles a message that is a generic answer to a synchronous generic remote request.
* @param nodeId The node that is answering the request (so it can be disconnected on error).
* @param message The message that is supposed to be an answer to a request that is tracked in requests_.
*/
void handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr<const Message>& message);

public:
/**
* Constructor.
Expand Down
20 changes: 2 additions & 18 deletions src/net/p2p/managerdiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,13 @@ namespace P2P {
void ManagerDiscovery::handlePingAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) +
" closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerDiscovery::handleRequestNodesAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) +
" closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}
};

60 changes: 6 additions & 54 deletions src/net/p2p/managernormal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,85 +221,37 @@ namespace P2P{
void ManagerNormal::handlePingAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before doing anything else to avoid waiting for other locks.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleInfoAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleRequestNodesAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleTxValidatorAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleTxAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" +
std::to_string(nodeId.second) + " , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleRequestBlockAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" +
std::to_string(nodeId.second) + " , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleInfoNotification(
Expand Down