Skip to content

Commit

Permalink
Internal refactor of P2P::Session vs. P2P::ManagerXXX classes (#93)
Browse files Browse the repository at this point in the history
* Replace weak_ptr<Session> with NodeId in Manager* APIs

* Fix SonarQube issue

* Remove retrieving session object for logging errors

* Harden P2P networking shutdown sequence

- Rule out any possibility that ManagerBase::sessions_ will
grow (or shrink) after ManagerBase::close_ is set to true, so that
ManagerBase::stop() can close all registered sessions

- On ManagerBase::stop(), join the Session thread pool after
Server and ClientFactory are stopped (possibly more correct)

* P2P::ManagerBase:

- removed closed_; replaced by started_ (ManagerBase should be
able to start() and stop() repeatedly and safely)
- added stateMutex_ to synchronize start(), stop() and thread pool
- start() creates threadPool_
- stop() destroys threadPool_ (waits for all tasks and joins all
threads, guaranteeing that there is no session/socket message
handling code active when it is time to e.g. destroy the
underlying io_context)
- threadPool_ is now isolated and not forwarded to other objects;
to use the ManagerBase thread pool, other objects have to call a
method on ManagerBase that will service the request, such as
ManagerBase::asyncHandleMessage()

state.cpp (test):

- increased timeout from 5s to 120s in a couple of futures to
avoid random, performance-related failures (logs to cout if they
exceed 5s, which in any case is rare)

* add this->
  • Loading branch information
fcecin authored Feb 23, 2024
1 parent 3e68fc0 commit 64f7c3d
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 412 deletions.
3 changes: 1 addition & 2 deletions src/net/p2p/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ See the LICENSE.txt file in the project root for more information.
namespace P2P {
void ClientFactory::createClientSession(const boost::asio::ip::address &address, const unsigned short &port) {
tcp::socket socket(io_context_);
auto session = std::make_shared<Session>(std::move(socket), ConnectionType::OUTBOUND, manager_, this->threadPool_, address, port);
auto session = std::make_shared<Session>(std::move(socket), ConnectionType::OUTBOUND, manager_, address, port);
session->run();
}


bool ClientFactory::run() {
Logger::logToDebug(LogType::INFO, Log::P2PClientFactory, __func__,
"Starting P2P Client Factory "
Expand Down
9 changes: 2 additions & 7 deletions src/net/p2p/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ namespace P2P {
/// Reference to the manager.
ManagerBase& manager_;

/// Reference to the thread pool.
BS::thread_pool_light& threadPool_;

/// Internal function for creating a new client session.
void createClientSession(const boost::asio::ip::address &address, const unsigned short &port);

Expand All @@ -55,14 +52,12 @@ namespace P2P {
* Constructor for the ClientFactory.
* @param manager Reference to the manager.
* @param threadCount Number of threads to use.
* @param threadPool Reference to the thread pool.
*/
ClientFactory(ManagerBase& manager, const uint8_t &threadCount, BS::thread_pool_light& threadPool) :
ClientFactory(ManagerBase& manager, const uint8_t &threadCount) :
work_guard_(boost::asio::make_work_guard(io_context_)),
connectorStrand_(io_context_.get_executor()),
threadCount_(threadCount),
manager_(manager),
threadPool_(threadPool) {}
manager_(manager) {}

/// Start the Factory.
bool start();
Expand Down
49 changes: 32 additions & 17 deletions src/net/p2p/managerbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ See the LICENSE.txt file in the project root for more information.
namespace P2P {

bool ManagerBase::registerSessionInternal(const std::shared_ptr<Session>& session) {
if (this->closed_) {
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::registerSessionInternal can change sessions_ map.
if (!this->started_) {
return false;
}
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::registerSessionInternal can change sessions_ map.
// The NodeID of a session is made by the host IP and his server port.
// That means, it is possible for us to receive a inbound connection for someone that we already have a outbound connection.
// In this case, we will keep the oldest connection alive and close the new one.
Expand All @@ -31,10 +31,10 @@ namespace P2P {
}

bool ManagerBase::unregisterSessionInternal(const std::shared_ptr<Session> &session) {
if (this->closed_) {
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::unregisterSessionInternal can change sessions_ map.
if (!this->started_) {
return false;
}
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::unregisterSessionInternal can change sessions_ map.
if (!sessions_.contains(session->hostNodeId())) {
lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session does not exist at " +
Expand All @@ -47,6 +47,9 @@ namespace P2P {

bool ManagerBase::disconnectSessionInternal(const NodeID& nodeId) {
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::disconnectSessionInternal can change sessions_ map.
if (!this->started_) {
return false;
}
if (!sessions_.contains(nodeId)) {
lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session does not exist at " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second));
Expand All @@ -60,7 +63,7 @@ namespace P2P {
}

std::shared_ptr<Request> ManagerBase::sendRequestTo(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
if (this->closed_) return nullptr;
if (!this->started_) return nullptr;
std::shared_lock<std::shared_mutex> lockSession(this->sessionsMutex_); // ManagerBase::sendRequestTo doesn't change sessions_ map.
if(!sessions_.contains(nodeId)) {
lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
Expand All @@ -84,24 +87,30 @@ namespace P2P {

// ManagerBase::answerSession doesn't change sessions_ map, but we still need to
// be sure that the session io_context doesn't get deleted while we are using it.
void ManagerBase::answerSession(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message) {
if (this->closed_) return;
std::shared_lock<std::shared_mutex> lockSession(this->sessionsMutex_);
if (auto ptr = session.lock()) {
ptr->write(message);
} else {
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session is no longer valid");
void ManagerBase::answerSession(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
std::shared_lock lockSession(this->sessionsMutex_);
if (!this->started_) return;
auto it = sessions_.find(nodeId);
if (it == sessions_.end()) {
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Cannot find session for " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second));
return;
}
it->second->write(message);
}

void ManagerBase::start() {
this->closed_ = false;
std::scoped_lock lock(this->stateMutex_);
if (this->started_) return;
this->started_ = true;
this->threadPool_ = std::make_unique<BS::thread_pool_light>(std::thread::hardware_concurrency() * 4);
this->server_.start();
this->clientfactory_.start();
}

void ManagerBase::stop() {
this->closed_ = true;
std::scoped_lock lock(this->stateMutex_);
if (! this->started_) return;
this->started_ = false;
{
std::unique_lock lock(this->sessionsMutex_);
for (auto it = sessions_.begin(); it != sessions_.end();) {
Expand All @@ -110,9 +119,16 @@ namespace P2P {
if (auto sessionPtr = session.lock()) sessionPtr->close();
}
}
this->threadPool_.wait_for_tasks();
this->server_.stop();
this->clientfactory_.stop();
this->threadPool_.reset();
}

void ManagerBase::asyncHandleMessage(const NodeID &nodeId, const std::shared_ptr<const Message> message) {
std::shared_lock lock(this->stateMutex_);
if (this->threadPool_) {
this->threadPool_->push_task(&ManagerBase::handleMessage, this, nodeId, message);
}
}

std::vector<NodeID> ManagerBase::getSessionsIDs() const {
Expand Down Expand Up @@ -142,7 +158,7 @@ namespace P2P {
}

void ManagerBase::connectToServer(const boost::asio::ip::address& address, uint16_t port) {
if (this->closed_) return;
if (!this->started_) return;
if (address == this->server_.getLocalAddress() && port == this->serverPort_) return; /// Cannot connect to itself.
{
std::shared_lock<std::shared_mutex> lock(this->sessionsMutex_);
Expand Down Expand Up @@ -188,4 +204,3 @@ namespace P2P {
}
}
}

32 changes: 19 additions & 13 deletions src/net/p2p/managerbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ namespace P2P {
/// Minimum number of simultaneous connections. See DiscoveryWorker for more information.
const unsigned int minConnections_ = 11;

/// Indicates whether the manager is closed to new connections.
std::atomic<bool> closed_ = true;
/// Check if manager is in the start() state (stop() not called yet).
std::atomic<bool> started_ = false;

/// Core mutex for serializing start(), stop(), and threadPool_.
mutable std::shared_mutex stateMutex_;

/// Pointer to the thread pool.
BS::thread_pool_light threadPool_;
std::unique_ptr<BS::thread_pool_light> threadPool_;

/// Pointer to the options singleton.
const Options& options_;
Expand Down Expand Up @@ -88,7 +91,7 @@ namespace P2P {
* @param session The session to answer to.
* @param message The message to answer.
*/
void answerSession(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message);
void answerSession(const NodeID &nodeId, const std::shared_ptr<const Message>& message);

// TODO: There is a bug with handleRequest that throws std::system_error.
// I believe that this is related with the std::shared_ptr<Session> getting deleted or
Expand All @@ -98,7 +101,7 @@ namespace P2P {
* @param session The session that sent the message.
* @param message The message to handle.
*/
virtual void handleRequest(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message) {
virtual void handleRequest(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
// Do nothing by default, child classes are meant to override this
}

Expand All @@ -107,7 +110,7 @@ namespace P2P {
* @param session The session that sent the message.
* @param message The message to handle.
*/
virtual void handleAnswer(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message) {
virtual void handleAnswer(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
// Do nothing by default, child classes are meant to override this
}

Expand All @@ -123,9 +126,8 @@ namespace P2P {
const net::ip::address& hostIp, NodeType nodeType,
unsigned int maxConnections, const Options& options
) : serverPort_(options.getP2PPort()), nodeType_(nodeType), maxConnections_(maxConnections), options_(options),
threadPool_(std::thread::hardware_concurrency() * 4),
server_(hostIp, options.getP2PPort(), 4, *this, this->threadPool_),
clientfactory_(*this, 4, this->threadPool_),
server_(hostIp, options.getP2PPort(), 4, *this),
clientfactory_(*this, 4),
discoveryWorker_(*this) {};

/// Destructor. Automatically stops the manager.
Expand Down Expand Up @@ -164,9 +166,6 @@ namespace P2P {
/// Getter for `minConnections_`.
unsigned int minConnections() const { return this->minConnections_; }

/// Getter for `closed_`.
const std::atomic<bool>& isClosed() const { return this->closed_; }

/// Get the size of the session list.
uint64_t getPeerCount() const { std::shared_lock lock(this->sessionsMutex_); return this->sessions_.size(); }

Expand Down Expand Up @@ -200,6 +199,13 @@ namespace P2P {
*/
void connectToServer(const boost::asio::ip::address& address, uint16_t port);

/**
* Entrust the internal thread pool to call handleMessage() with the supplied arguments.
* @param session The session to send an answer to.
* @param message The message to handle.
*/
void asyncHandleMessage(const NodeID &nodeId, const std::shared_ptr<const Message> message);

/**
* Handle a message from a session.
* The pointer is a weak_ptr because the parser doesn't need to own the session.
Expand All @@ -209,7 +215,7 @@ namespace P2P {
* @param session The session to send an answer to.
* @param message The message to handle.
*/
virtual void handleMessage(std::weak_ptr<Session> session, const std::shared_ptr<const Message> message) {
virtual void handleMessage(const NodeID &nodeId, const std::shared_ptr<const Message> message) {
// Do nothing by default, child classes are meant to override this
}

Expand Down
Loading

0 comments on commit 64f7c3d

Please sign in to comment.