Skip to content

Commit

Permalink
net: use an interface class rather than signals for message processing
Browse files Browse the repository at this point in the history
Drop boost signals in favor of a stateful class. This will allow the message
processing loop to actually move to net_processing in a future step.

Adapted from btc@8ad663c1fa88d68843e45580deced56112343183
  • Loading branch information
furszy committed Jun 12, 2021
1 parent 67757cd commit 50853a2
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 117 deletions.
5 changes: 2 additions & 3 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,12 @@ void PrepareShutdown()

// After the threads that potentially access these pointers have been stopped,
// destruct and reset all to nullptr.
peerLogic.reset();
g_connman.reset();
peerLogic.reset();

DumpMasternodes();
DumpBudgets(g_budgetman);
DumpMasternodePayments();
UnregisterNodeSignals(GetNodeSignals());
if (::mempool.IsLoaded() && gArgs.GetBoolArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) {
DumpMempool(::mempool);
}
Expand Down Expand Up @@ -1349,7 +1348,6 @@ bool AppInitMain()

peerLogic.reset(new PeerLogicValidation(&connman));
RegisterValidationInterface(peerLogic.get());
RegisterNodeSignals(GetNodeSignals());

// sanitize comments per BIP-0014, format user agent and check total size
std::vector<std::string> uacomments;
Expand Down Expand Up @@ -1952,6 +1950,7 @@ bool AppInitMain()
connOptions.nMaxFeeler = 1;
connOptions.nBestHeight = chainActive.Height();
connOptions.uiInterface = &uiInterface;
connOptions.m_msgproc = peerLogic.get();
connOptions.nSendBufferMaxSize = 1000*gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER);
connOptions.nReceiveFloodSize = 1000*gArgs.GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER);

Expand Down
22 changes: 11 additions & 11 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ std::string strSubVersion;

limitedmap<CInv, int64_t> mapAlreadyAskedFor(MAX_INV_SZ);

// Signals for message handling
static CNodeSignals g_signals;
CNodeSignals& GetNodeSignals() { return g_signals; }

void CConnman::AddOneShot(const std::string& strDest)
{
LOCK(cs_vOneShots);
Expand Down Expand Up @@ -1066,7 +1062,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) {
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, "", true);
pnode->AddRef();
pnode->fWhitelisted = whitelisted;
GetNodeSignals().InitializeNode(pnode, this);
m_msgproc->InitializeNode(pnode, this);

LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString());

Expand Down Expand Up @@ -1828,7 +1824,7 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
if (fFeeler)
pnode->fFeeler = true;

GetNodeSignals().InitializeNode(pnode, this);
m_msgproc->InitializeNode(pnode, this);
{
LOCK(cs_vNodes);
vNodes.push_back(pnode);
Expand Down Expand Up @@ -1856,16 +1852,17 @@ void CConnman::ThreadMessageHandler()
continue;

// Receive messages
bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, this, flagInterruptMsgProc);
bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, this, flagInterruptMsgProc);
fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;

// Send messages
{
LOCK(pnode->cs_sendProcessing);
GetNodeSignals().SendMessages(pnode, this, flagInterruptMsgProc);
m_msgproc->SendMessages(pnode, this, flagInterruptMsgProc);
}

if (flagInterruptMsgProc)
return;
}
Expand Down Expand Up @@ -2055,6 +2052,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
clientInterface = connOptions.uiInterface;
if (clientInterface)
clientInterface->InitMessage(_("Loading addresses..."));
m_msgproc = connOptions.m_msgproc;
// Load addresses from peers.dat
int64_t nStart = GetTimeMillis();
{
Expand Down Expand Up @@ -2105,12 +2103,13 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();

pnodeLocalHost = new CNode(id, nLocalServices, GetBestHeight(), INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices), 0, nonce);
GetNodeSignals().InitializeNode(pnodeLocalHost, this);
m_msgproc->InitializeNode(pnodeLocalHost, this);
}

//
// Start threads
//
assert(m_msgproc);
InterruptSocks5(false);
interruptNet.reset();
flagInterruptMsgProc = false;
Expand Down Expand Up @@ -2231,9 +2230,10 @@ void CConnman::DeleteNode(CNode* pnode)
{
assert(pnode);
bool fUpdateConnectionTime = false;
GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
if(fUpdateConnectionTime)
m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime);
if (fUpdateConnectionTime) {
addrman.Connected(pnode->addr);
}
delete pnode;
}

Expand Down
24 changes: 12 additions & 12 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
#include <arpa/inet.h>
#endif

#include <boost/signals2/signal.hpp>

class CAddrMan;
class CBlockIndex;
class CScheduler;
Expand Down Expand Up @@ -121,7 +119,7 @@ struct CSerializedNetMsg
std::string command;
};


class NetEventsInterface;
class CConnman
{
public:
Expand All @@ -142,6 +140,7 @@ class CConnman
int nMaxFeeler = 0;
int nBestHeight = 0;
CClientUIInterface* uiInterface = nullptr;
NetEventsInterface* m_msgproc = nullptr;
unsigned int nSendBufferMaxSize = 0;
unsigned int nReceiveFloodSize = 0;
};
Expand Down Expand Up @@ -368,6 +367,7 @@ class CConnman
int nMaxFeeler{0};
std::atomic<int> nBestHeight;
CClientUIInterface* clientInterface{nullptr};
NetEventsInterface* m_msgproc{nullptr};

/** SipHasher seeds for deterministic randomness */
const uint64_t nSeed0{0}, nSeed1{0};
Expand Down Expand Up @@ -410,19 +410,19 @@ struct CombinerAll {
}
};

// Signals for message handling
struct CNodeSignals
/**
* Interface for message handling
*/
class NetEventsInterface
{
boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> ProcessMessages;
boost::signals2::signal<bool (CNode*, CConnman*, std::atomic<bool>&), CombinerAll> SendMessages;
boost::signals2::signal<void (CNode*, CConnman*)> InitializeNode;
boost::signals2::signal<void (NodeId, bool&)> FinalizeNode;
public:
virtual bool ProcessMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0;
virtual bool SendMessages(CNode* pnode, CConnman* connman, std::atomic<bool>& interrupt) = 0;
virtual void InitializeNode(CNode* pnode, CConnman* connman) = 0;
virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0;
};


CNodeSignals& GetNodeSignals();


enum {
LOCAL_NONE, // unknown
LOCAL_IF, // address a local interface listens on
Expand Down
93 changes: 35 additions & 58 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,6 @@ int nPreferredDownload = 0;

} // anon namespace


//////////////////////////////////////////////////////////////////////////////
//
// Registration of network node signals.
//

namespace
{
struct CBlockReject {
Expand Down Expand Up @@ -282,39 +276,6 @@ void PushNodeVersion(CNode* pnode, CConnman* connman, int64_t nTime)
LogPrint(BCLog::NET, "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), nodeid);
}

void InitializeNode(CNode *pnode, CConnman* connman) {
CAddress addr = pnode->addr;
std::string addrName = pnode->GetAddrName();
NodeId nodeid = pnode->GetId();
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
}
if(!pnode->fInbound)
PushNodeVersion(pnode, connman, GetTime());
}

void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime)
{
fUpdateConnectionTime = false;
LOCK(cs_main);
CNodeState* state = State(nodeid);

if (state->fSyncStarted)
nSyncStarted--;

if (state->nMisbehavior == 0 && state->fCurrentlyConnected) {
fUpdateConnectionTime = true;
}

for (const QueuedBlock& entry : state->vBlocksInFlight)
mapBlocksInFlight.erase(entry.hash);
EraseOrphansFor(nodeid);
nPreferredDownload -= state->fPreferredDownload;

mapNodeState.erase(nodeid);
}

// Requires cs_main.
void MarkBlockAsReceived(const uint256& hash)
{
Expand Down Expand Up @@ -466,6 +427,39 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<con

} // anon namespace

void PeerLogicValidation::InitializeNode(CNode *pnode, CConnman* connman) {
CAddress addr = pnode->addr;
std::string addrName = pnode->GetAddrName();
NodeId nodeid = pnode->GetId();
{
LOCK(cs_main);
mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName)));
}
if(!pnode->fInbound)
PushNodeVersion(pnode, connman, GetTime());
}

void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime)
{
fUpdateConnectionTime = false;
LOCK(cs_main);
CNodeState* state = State(nodeid);

if (state->fSyncStarted)
nSyncStarted--;

if (state->nMisbehavior == 0 && state->fCurrentlyConnected) {
fUpdateConnectionTime = true;
}

for (const QueuedBlock& entry : state->vBlocksInFlight)
mapBlocksInFlight.erase(entry.hash);
EraseOrphansFor(nodeid);
nPreferredDownload -= state->fPreferredDownload;

mapNodeState.erase(nodeid);
}

bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats)
{
LOCK(cs_main);
Expand All @@ -482,23 +476,6 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats)
return true;
}

void RegisterNodeSignals(CNodeSignals& nodeSignals)
{
nodeSignals.ProcessMessages.connect(&ProcessMessages);
nodeSignals.SendMessages.connect(&SendMessages);
nodeSignals.InitializeNode.connect(&InitializeNode);
nodeSignals.FinalizeNode.connect(&FinalizeNode);
}

void UnregisterNodeSignals(CNodeSignals& nodeSignals)
{
nodeSignals.ProcessMessages.disconnect(&ProcessMessages);
nodeSignals.SendMessages.disconnect(&SendMessages);
nodeSignals.InitializeNode.disconnect(&InitializeNode);
nodeSignals.FinalizeNode.disconnect(&FinalizeNode);
}


//////////////////////////////////////////////////////////////////////////////
//
// mapOrphanTransactions
Expand Down Expand Up @@ -1973,7 +1950,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR
}


bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interruptMsgProc)
bool PeerLogicValidation::ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interruptMsgProc)
{
// Message format
// (4) message start
Expand Down Expand Up @@ -2089,7 +2066,7 @@ class CompareInvMempoolOrder
}
};

bool SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interruptMsgProc)
bool PeerLogicValidation::SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interruptMsgProc)
{
{
// Don't send anything until the version handshake is complete
Expand Down
36 changes: 17 additions & 19 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,32 @@ static const unsigned int INVENTORY_BROADCAST_INTERVAL = 5;
* Limits the impact of low-fee transaction floods. */
static const unsigned int INVENTORY_BROADCAST_MAX = 7 * INVENTORY_BROADCAST_INTERVAL;

/** Register with a network node to receive its signals */
void RegisterNodeSignals(CNodeSignals& nodeSignals);
/** Unregister a network node */
void UnregisterNodeSignals(CNodeSignals& nodeSignals);

class PeerLogicValidation : public CValidationInterface {
class PeerLogicValidation : public CValidationInterface, public NetEventsInterface {
private:
CConnman* connman;

public:
PeerLogicValidation(CConnman* connmanIn);
PeerLogicValidation(CConnman* connman);
~PeerLogicValidation() = default;

void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void BlockChecked(const CBlock& block, const CValidationState& state) override;


void InitializeNode(CNode* pnode, CConnman* connman) override;
void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override;
/** Process protocol messages received from a given node */
bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interrupt) override;
/**
* Send queued protocol messages to be sent to a give node.
*
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interrupt) override;
};

struct CNodeStateStats {
Expand All @@ -61,17 +71,5 @@ struct CNodeStateStats {
bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats);
/** Increase a node's misbehavior score. */
void Misbehaving(NodeId nodeid, int howmuch) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Process protocol messages received from a given node */
bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic<bool>& interrupt);
/**
* Send queued protocol messages to be sent to a give node.
*
* @param[in] pto The node which we are sending messages to.
* @param[in] connman The connection manager for that node.
* @param[in] interrupt Interrupt condition for processing threads
* @return True if there is more work to be done
*/
bool SendMessages(CNode* pto, CConnman* connman, std::atomic<bool>& interrupt);


#endif // BITCOIN_NET_PROCESSING_H
Loading

0 comments on commit 50853a2

Please sign in to comment.