Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GC of ManagerBase::requests_ #143

Merged
merged 2 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions src/net/p2p/managerbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,15 @@ namespace P2P {
LOGDEBUG("Peer " + toString(nodeId) + " is a discovery node, cannot send request");
return nullptr;
}
std::unique_lock lockRequests(this->requestsMutex_);
requests_[message->id()] = std::make_shared<Request>(message->command(), message->id(), session->hostNodeId(), message);
bool written = session->write(message);
if (written) {
return requests_[message->id()];
} else {
// Delete the request if it was not written
requests_.erase(message->id());
if (!session->write(message)) {
// If message was never sent, then no request has been made. Fail and return.
return nullptr;
}
// Message was sent, so create a pending request entry and insert it in the active bucket.
// If the active bucket is full, clear the old bucket and make it the new active bucket.
auto request = std::make_shared<Request>(message->command(), message->id(), session->hostNodeId(), message);
insertRequest(request);
return request;
}

// ManagerBase::answerSession doesn't change sessions_ map, but we still need to
Expand Down Expand Up @@ -419,6 +418,13 @@ namespace P2P {
// objects and the entire Boost ASIO engine are fully gone.

LOGDEBUG("Net engine destroyed");

// We don't need the requests_ cache anymore so free up that memory.
std::unique_lock lockRequests(this->requestsMutex_);
requests_[0].clear();
requests_[1].clear();
activeRequests_ = 0;
lockRequests.unlock();
}

bool ManagerBase::isActive() const {
Expand Down Expand Up @@ -724,4 +730,40 @@ namespace P2P {
// It is probably better to have asio::post inside the sessions mutex to sync with ManagerBase::stop().
std::make_shared<Session>(std::move(socket), ConnectionType::INBOUND, *this)->run();
}

void ManagerBase::insertRequest(const std::shared_ptr<Request>& request) {
// The previous logic would silently overwrite the previous entry in case of an (improbable)
// request ID collision; this logic is retained here, since lookups will be directed to the
// active bucket first (so a colliding entry that is in the older bucket is shadowed anyway).
std::unique_lock lock(this->requestsMutex_);
requests_[activeRequests_][request->id()] = request;
if (requests_[activeRequests_].size() >= REQUEST_BUCKET_SIZE_LIMIT) {
activeRequests_ = 1 - activeRequests_;
requests_[activeRequests_].clear();
}
}

void ManagerBase::handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr<const Message>& message) {
RequestID requestId = message->id();
std::unique_lock lock(this->requestsMutex_);
// Search for the request in both buckets, starting with the active bucket.
// If found, set its answer and return.
for (int i = 0; i < 2; ++i) {
auto& bucket = requests_[(activeRequests_ + i) % 2];
auto it = bucket.find(requestId);
if (it != bucket.end()) {
std::shared_ptr<Request>& req = it->second;
req->setAnswer(message);
return;
}
}
// Receiving an answer for a request ID that can't be found locally is always an error.
// This could legitimately happen if the request storage is too small for actual request traffic,
// so make sure request storage is oversized at all times by a factor of at least 10.
// If there is a need to optimize memory usage, buckets can be instead allocated dynamically as
// they are needed (e.g. in a deque) and an expiration timestamp can be associated with each bucket.
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) + " closing session.");
this->disconnectSession(nodeId);
}
}
28 changes: 21 additions & 7 deletions src/net/p2p/managerbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ See the LICENSE.txt file in the project root for more information.
#ifndef P2P_MANAGER_BASE
#define P2P_MANAGER_BASE

/// Size of a sync-requests active bucket, in items (requests); there are two buckets.
/// This needs to be large enough to hold the maximum number of requests that can be conceivable generated
/// in the maximum time period that it takes to answer a request.
#define REQUEST_BUCKET_SIZE_LIMIT 100000
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this?

constexpr size_t REQUEST_BUCKET_SIZE_LIMIT = 100'000;


#include "discovery.h"
#include "session.h" // encoding.h -> utils/options.h

Expand Down Expand Up @@ -63,13 +68,9 @@ namespace P2P {
DiscoveryWorker discoveryWorker_; ///< DiscoveryWorker object.
const std::string instanceIdStr_; ///< Instance ID for LOGxxx().
const NodeID nodeId_; ///< This ManagerBase's own NodeID.

/// List of currently active sessions.
boost::unordered_flat_map<NodeID, std::shared_ptr<Session>, SafeHash> sessions_;

// TODO: Somehow find a way to clean up requests_ after a certain time/being used.
/// List of currently active requests.
boost::unordered_flat_map<RequestID, std::shared_ptr<Request>, SafeHash> requests_;
boost::unordered_flat_map<NodeID, std::shared_ptr<Session>, SafeHash> sessions_; ///< List of currently active sessions.
std::array<boost::unordered_flat_map<RequestID, std::shared_ptr<Request>, SafeHash>, 2> requests_; ///< Request buckets.
uint64_t activeRequests_ = 0; ///< Index of the current (active) request bucket (either 0 or 1).

/**
* Send a Request to a given node.
Expand Down Expand Up @@ -142,6 +143,19 @@ namespace P2P {
*/
void trySpawnInboundSession(tcp::socket&& socket);

/**
* Insert a new request into requests_.
* @param request The newly created request to be tracked by requests_.
*/
void insertRequest(const std::shared_ptr<Request>& request);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function intends to take ownership of the std::shared_ptr copy, it would be a little bit better to receive it by value instead of const-ref.

Like that:

void insertRequest(std::shared_ptr<Request> request) {
  requests_[activeRequests_][request->id()] = std::move(request); // note the move
  // ...
}

Moving a shared_ptr is cheaper than copying it. At least in this way, you delegate to the caller the decision of copying or moving it (instead of always copying).

I'd suggest also making such change in other places too, where a std::shared_ptr parameter is received and will be stored somewhere, but such change could fall out of the scope of this PR. No problem at all if you decide not to do it.


/**
* Handles a message that is a generic answer to a synchronous generic remote request.
* @param nodeId The node that is answering the request (so it can be disconnected on error).
* @param message The message that is supposed to be an answer to a request that is tracked in requests_.
*/
void handleRequestAnswer(const NodeID& nodeId, const std::shared_ptr<const Message>& message);

public:
/**
* Constructor.
Expand Down
20 changes: 2 additions & 18 deletions src/net/p2p/managerdiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,13 @@ namespace P2P {
void ManagerDiscovery::handlePingAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) +
" closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerDiscovery::handleRequestNodesAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGERROR("Answer to invalid request from " + toString(nodeId) +
" closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}
};

60 changes: 6 additions & 54 deletions src/net/p2p/managernormal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,85 +221,37 @@ namespace P2P{
void ManagerNormal::handlePingAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before doing anything else to avoid waiting for other locks.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleInfoAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleRequestNodesAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleTxValidatorAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + toString(nodeId) +
" , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleTxAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" +
std::to_string(nodeId.second) + " , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleRequestBlockAnswer(
const NodeID &nodeId, const std::shared_ptr<const Message>& message
) {
std::unique_lock lock(this->requestsMutex_);
if (!requests_.contains(message->id())) {
lock.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
LOGDEBUG("Answer to invalid request from " + nodeId.first.to_string() + ":" +
std::to_string(nodeId.second) + " , closing session.");
this->disconnectSession(nodeId);
return;
}
requests_[message->id()]->setAnswer(message);
handleRequestAnswer(nodeId, message);
}

void ManagerNormal::handleInfoNotification(
Expand Down