Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal refactor of P2P::Session vs. P2P::ManagerXXX classes #93

Merged
merged 6 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/net/p2p/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ See the LICENSE.txt file in the project root for more information.
namespace P2P {
void ClientFactory::createClientSession(const boost::asio::ip::address &address, const unsigned short &port) {
tcp::socket socket(io_context_);
auto session = std::make_shared<Session>(std::move(socket), ConnectionType::OUTBOUND, manager_, this->threadPool_, address, port);
auto session = std::make_shared<Session>(std::move(socket), ConnectionType::OUTBOUND, manager_, address, port);
session->run();
}


bool ClientFactory::run() {
Logger::logToDebug(LogType::INFO, Log::P2PClientFactory, __func__,
"Starting P2P Client Factory "
Expand Down
9 changes: 2 additions & 7 deletions src/net/p2p/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ namespace P2P {
/// Reference to the manager.
ManagerBase& manager_;

/// Reference to the thread pool.
BS::thread_pool_light& threadPool_;

/// Internal function for creating a new client session.
void createClientSession(const boost::asio::ip::address &address, const unsigned short &port);

Expand All @@ -55,14 +52,12 @@ namespace P2P {
* Constructor for the ClientFactory.
* @param manager Reference to the manager.
* @param threadCount Number of threads to use.
* @param threadPool Reference to the thread pool.
*/
ClientFactory(ManagerBase& manager, const uint8_t &threadCount, BS::thread_pool_light& threadPool) :
ClientFactory(ManagerBase& manager, const uint8_t &threadCount) :
work_guard_(boost::asio::make_work_guard(io_context_)),
connectorStrand_(io_context_.get_executor()),
threadCount_(threadCount),
manager_(manager),
threadPool_(threadPool) {}
manager_(manager) {}

/// Start the Factory.
bool start();
Expand Down
49 changes: 32 additions & 17 deletions src/net/p2p/managerbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ See the LICENSE.txt file in the project root for more information.
namespace P2P {

bool ManagerBase::registerSessionInternal(const std::shared_ptr<Session>& session) {
if (this->closed_) {
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::registerSessionInternal can change sessions_ map.
if (!this->started_) {
return false;
}
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::registerSessionInternal can change sessions_ map.
// The NodeID of a session is made by the host IP and his server port.
// That means, it is possible for us to receive a inbound connection for someone that we already have a outbound connection.
// In this case, we will keep the oldest connection alive and close the new one.
Expand All @@ -31,10 +31,10 @@ namespace P2P {
}

bool ManagerBase::unregisterSessionInternal(const std::shared_ptr<Session> &session) {
if (this->closed_) {
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::unregisterSessionInternal can change sessions_ map.
if (!this->started_) {
return false;
}
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::unregisterSessionInternal can change sessions_ map.
if (!sessions_.contains(session->hostNodeId())) {
lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session does not exist at " +
Expand All @@ -47,6 +47,9 @@ namespace P2P {

bool ManagerBase::disconnectSessionInternal(const NodeID& nodeId) {
std::unique_lock lockSession(this->sessionsMutex_); // ManagerBase::disconnectSessionInternal can change sessions_ map.
if (!this->started_) {
return false;
}
if (!sessions_.contains(nodeId)) {
lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session does not exist at " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second));
Expand All @@ -60,7 +63,7 @@ namespace P2P {
}

std::shared_ptr<Request> ManagerBase::sendRequestTo(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
if (this->closed_) return nullptr;
if (!this->started_) return nullptr;
std::shared_lock<std::shared_mutex> lockSession(this->sessionsMutex_); // ManagerBase::sendRequestTo doesn't change sessions_ map.
if(!sessions_.contains(nodeId)) {
lockSession.unlock(); // Unlock before calling logToDebug to avoid waiting for the lock in the logToDebug function.
Expand All @@ -84,24 +87,30 @@ namespace P2P {

// ManagerBase::answerSession doesn't change sessions_ map, but we still need to
// be sure that the session io_context doesn't get deleted while we are using it.
void ManagerBase::answerSession(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message) {
if (this->closed_) return;
std::shared_lock<std::shared_mutex> lockSession(this->sessionsMutex_);
if (auto ptr = session.lock()) {
ptr->write(message);
} else {
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Session is no longer valid");
void ManagerBase::answerSession(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
std::shared_lock lockSession(this->sessionsMutex_);
if (!this->started_) return;
auto it = sessions_.find(nodeId);
if (it == sessions_.end()) {
Logger::logToDebug(LogType::ERROR, Log::P2PManager, __func__, "Cannot find session for " + nodeId.first.to_string() + ":" + std::to_string(nodeId.second));
return;
}
it->second->write(message);
}

void ManagerBase::start() {
this->closed_ = false;
std::scoped_lock lock(this->stateMutex_);
if (this->started_) return;
this->started_ = true;
this->threadPool_ = std::make_unique<BS::thread_pool_light>(std::thread::hardware_concurrency() * 4);
this->server_.start();
this->clientfactory_.start();
}

void ManagerBase::stop() {
this->closed_ = true;
std::scoped_lock lock(this->stateMutex_);
if (! this->started_) return;
this->started_ = false;
{
std::unique_lock lock(this->sessionsMutex_);
for (auto it = sessions_.begin(); it != sessions_.end();) {
Expand All @@ -110,9 +119,16 @@ namespace P2P {
if (auto sessionPtr = session.lock()) sessionPtr->close();
}
}
this->threadPool_.wait_for_tasks();
this->server_.stop();
this->clientfactory_.stop();
this->threadPool_.reset();
}

void ManagerBase::asyncHandleMessage(const NodeID &nodeId, const std::shared_ptr<const Message> message) {
std::shared_lock lock(this->stateMutex_);
if (this->threadPool_) {
this->threadPool_->push_task(&ManagerBase::handleMessage, this, nodeId, message);
}
}

std::vector<NodeID> ManagerBase::getSessionsIDs() const {
Expand Down Expand Up @@ -142,7 +158,7 @@ namespace P2P {
}

void ManagerBase::connectToServer(const boost::asio::ip::address& address, uint16_t port) {
if (this->closed_) return;
if (!this->started_) return;
if (address == this->server_.getLocalAddress() && port == this->serverPort_) return; /// Cannot connect to itself.
{
std::shared_lock<std::shared_mutex> lock(this->sessionsMutex_);
Expand Down Expand Up @@ -188,4 +204,3 @@ namespace P2P {
}
}
}

32 changes: 19 additions & 13 deletions src/net/p2p/managerbase.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ namespace P2P {
/// Minimum number of simultaneous connections. See DiscoveryWorker for more information.
const unsigned int minConnections_ = 11;

/// Indicates whether the manager is closed to new connections.
std::atomic<bool> closed_ = true;
/// Check if manager is in the start() state (stop() not called yet).
std::atomic<bool> started_ = false;

/// Core mutex for serializing start(), stop(), and threadPool_.
mutable std::shared_mutex stateMutex_;

/// Pointer to the thread pool.
BS::thread_pool_light threadPool_;
std::unique_ptr<BS::thread_pool_light> threadPool_;

/// Pointer to the options singleton.
const Options& options_;
Expand Down Expand Up @@ -88,7 +91,7 @@ namespace P2P {
* @param session The session to answer to.
* @param message The message to answer.
*/
void answerSession(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message);
void answerSession(const NodeID &nodeId, const std::shared_ptr<const Message>& message);

// TODO: There is a bug with handleRequest that throws std::system_error.
// I believe that this is related with the std::shared_ptr<Session> getting deleted or
Expand All @@ -98,7 +101,7 @@ namespace P2P {
* @param session The session that sent the message.
* @param message The message to handle.
*/
virtual void handleRequest(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message) {
virtual void handleRequest(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
// Do nothing by default, child classes are meant to override this
}

Expand All @@ -107,7 +110,7 @@ namespace P2P {
* @param session The session that sent the message.
* @param message The message to handle.
*/
virtual void handleAnswer(std::weak_ptr<Session> session, const std::shared_ptr<const Message>& message) {
virtual void handleAnswer(const NodeID &nodeId, const std::shared_ptr<const Message>& message) {
// Do nothing by default, child classes are meant to override this
}

Expand All @@ -123,9 +126,8 @@ namespace P2P {
const net::ip::address& hostIp, NodeType nodeType,
unsigned int maxConnections, const Options& options
) : serverPort_(options.getP2PPort()), nodeType_(nodeType), maxConnections_(maxConnections), options_(options),
threadPool_(std::thread::hardware_concurrency() * 4),
server_(hostIp, options.getP2PPort(), 4, *this, this->threadPool_),
clientfactory_(*this, 4, this->threadPool_),
server_(hostIp, options.getP2PPort(), 4, *this),
clientfactory_(*this, 4),
discoveryWorker_(*this) {};

/// Destructor. Automatically stops the manager.
Expand Down Expand Up @@ -164,9 +166,6 @@ namespace P2P {
/// Getter for `minConnections_`.
unsigned int minConnections() const { return this->minConnections_; }

/// Getter for `closed_`.
const std::atomic<bool>& isClosed() const { return this->closed_; }

/// Get the size of the session list.
uint64_t getPeerCount() const { std::shared_lock lock(this->sessionsMutex_); return this->sessions_.size(); }

Expand Down Expand Up @@ -200,6 +199,13 @@ namespace P2P {
*/
void connectToServer(const boost::asio::ip::address& address, uint16_t port);

/**
* Entrust the internal thread pool to call handleMessage() with the supplied arguments.
* @param session The session to send an answer to.
* @param message The message to handle.
*/
void asyncHandleMessage(const NodeID &nodeId, const std::shared_ptr<const Message> message);

/**
* Handle a message from a session.
* The pointer is a weak_ptr because the parser doesn't need to own the session.
Expand All @@ -209,7 +215,7 @@ namespace P2P {
* @param session The session to send an answer to.
* @param message The message to handle.
*/
virtual void handleMessage(std::weak_ptr<Session> session, const std::shared_ptr<const Message> message) {
virtual void handleMessage(const NodeID &nodeId, const std::shared_ptr<const Message> message) {
// Do nothing by default, child classes are meant to override this
}

Expand Down
Loading
Loading