diff --git a/src/init.cpp b/src/init.cpp index 47dd50e99bf26..d963be55be9c1 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -243,13 +243,12 @@ void PrepareShutdown() // After the threads that potentially access these pointers have been stopped, // destruct and reset all to nullptr. - peerLogic.reset(); g_connman.reset(); + peerLogic.reset(); DumpMasternodes(); DumpBudgets(g_budgetman); DumpMasternodePayments(); - UnregisterNodeSignals(GetNodeSignals()); if (::mempool.IsLoaded() && gArgs.GetBoolArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) { DumpMempool(::mempool); } @@ -1349,7 +1348,6 @@ bool AppInitMain() peerLogic.reset(new PeerLogicValidation(&connman)); RegisterValidationInterface(peerLogic.get()); - RegisterNodeSignals(GetNodeSignals()); // sanitize comments per BIP-0014, format user agent and check total size std::vector uacomments; @@ -1952,6 +1950,7 @@ bool AppInitMain() connOptions.nMaxFeeler = 1; connOptions.nBestHeight = chainActive.Height(); connOptions.uiInterface = &uiInterface; + connOptions.m_msgproc = peerLogic.get(); connOptions.nSendBufferMaxSize = 1000*gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER); connOptions.nReceiveFloodSize = 1000*gArgs.GetArg("-maxreceivebuffer", DEFAULT_MAXRECEIVEBUFFER); diff --git a/src/net.cpp b/src/net.cpp index dd869d19854e6..22524db9fd58a 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -85,10 +85,6 @@ std::string strSubVersion; limitedmap mapAlreadyAskedFor(MAX_INV_SZ); -// Signals for message handling -static CNodeSignals g_signals; -CNodeSignals& GetNodeSignals() { return g_signals; } - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -1066,7 +1062,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; - GetNodeSignals().InitializeNode(pnode, this); + m_msgproc->InitializeNode(pnode, this); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); @@ -1828,7 +1824,7 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai if (fFeeler) pnode->fFeeler = true; - GetNodeSignals().InitializeNode(pnode, this); + m_msgproc->InitializeNode(pnode, this); { LOCK(cs_vNodes); vNodes.push_back(pnode); @@ -1856,7 +1852,7 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, this, flagInterruptMsgProc); + bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, this, flagInterruptMsgProc); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; @@ -1864,8 +1860,9 @@ void CConnman::ThreadMessageHandler() // Send messages { LOCK(pnode->cs_sendProcessing); - GetNodeSignals().SendMessages(pnode, this, flagInterruptMsgProc); + m_msgproc->SendMessages(pnode, this, flagInterruptMsgProc); } + if (flagInterruptMsgProc) return; } @@ -2055,6 +2052,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c clientInterface = connOptions.uiInterface; if (clientInterface) clientInterface->InitMessage(_("Loading addresses...")); + m_msgproc = connOptions.m_msgproc; // Load addresses from peers.dat int64_t nStart = GetTimeMillis(); { @@ -2105,12 +2103,13 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); pnodeLocalHost = new CNode(id, nLocalServices, GetBestHeight(), INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices), 0, nonce); - GetNodeSignals().InitializeNode(pnodeLocalHost, this); + m_msgproc->InitializeNode(pnodeLocalHost, this); } // // Start threads // + assert(m_msgproc); InterruptSocks5(false); interruptNet.reset(); flagInterruptMsgProc = false; @@ -2231,9 +2230,10 @@ void CConnman::DeleteNode(CNode* pnode) { assert(pnode); bool fUpdateConnectionTime = false; - GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime); - if(fUpdateConnectionTime) + m_msgproc->FinalizeNode(pnode->GetId(), fUpdateConnectionTime); + if (fUpdateConnectionTime) { addrman.Connected(pnode->addr); + } delete pnode; } diff --git a/src/net.h b/src/net.h index c589ac15b18da..22f4c1a72aba1 100644 --- a/src/net.h +++ b/src/net.h @@ -34,8 +34,6 @@ #include #endif -#include - class CAddrMan; class CBlockIndex; class CScheduler; @@ -121,7 +119,7 @@ struct CSerializedNetMsg std::string command; }; - +class NetEventsInterface; class CConnman { public: @@ -142,6 +140,7 @@ class CConnman int nMaxFeeler = 0; int nBestHeight = 0; CClientUIInterface* uiInterface = nullptr; + NetEventsInterface* m_msgproc = nullptr; unsigned int nSendBufferMaxSize = 0; unsigned int nReceiveFloodSize = 0; }; @@ -368,6 +367,7 @@ class CConnman int nMaxFeeler{0}; std::atomic nBestHeight; CClientUIInterface* clientInterface{nullptr}; + NetEventsInterface* m_msgproc{nullptr}; /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0{0}, nSeed1{0}; @@ -410,19 +410,19 @@ struct CombinerAll { } }; -// Signals for message handling -struct CNodeSignals +/** + * Interface for message handling + */ +class NetEventsInterface { - boost::signals2::signal&), CombinerAll> ProcessMessages; - boost::signals2::signal&), CombinerAll> SendMessages; - boost::signals2::signal InitializeNode; - boost::signals2::signal FinalizeNode; +public: + virtual bool ProcessMessages(CNode* pnode, CConnman* connman, std::atomic& interrupt) = 0; + virtual bool SendMessages(CNode* pnode, CConnman* connman, std::atomic& interrupt) = 0; + virtual void InitializeNode(CNode* pnode, CConnman* connman) = 0; + virtual void FinalizeNode(NodeId id, bool& update_connection_time) = 0; }; -CNodeSignals& GetNodeSignals(); - - enum { LOCAL_NONE, // unknown LOCAL_IF, // address a local interface listens on diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 09f2f693796e5..71cdb3db99ea3 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -99,12 +99,6 @@ int nPreferredDownload = 0; } // anon namespace - -////////////////////////////////////////////////////////////////////////////// -// -// Registration of network node signals. -// - namespace { struct CBlockReject { @@ -282,39 +276,6 @@ void PushNodeVersion(CNode* pnode, CConnman* connman, int64_t nTime) LogPrint(BCLog::NET, "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), nodeid); } -void InitializeNode(CNode *pnode, CConnman* connman) { - CAddress addr = pnode->addr; - std::string addrName = pnode->GetAddrName(); - NodeId nodeid = pnode->GetId(); - { - LOCK(cs_main); - mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); - } - if(!pnode->fInbound) - PushNodeVersion(pnode, connman, GetTime()); -} - -void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) -{ - fUpdateConnectionTime = false; - LOCK(cs_main); - CNodeState* state = State(nodeid); - - if (state->fSyncStarted) - nSyncStarted--; - - if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { - fUpdateConnectionTime = true; - } - - for (const QueuedBlock& entry : state->vBlocksInFlight) - mapBlocksInFlight.erase(entry.hash); - EraseOrphansFor(nodeid); - nPreferredDownload -= state->fPreferredDownload; - - mapNodeState.erase(nodeid); -} - // Requires cs_main. void MarkBlockAsReceived(const uint256& hash) { @@ -466,6 +427,39 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vectoraddr; + std::string addrName = pnode->GetAddrName(); + NodeId nodeid = pnode->GetId(); + { + LOCK(cs_main); + mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); + } + if(!pnode->fInbound) + PushNodeVersion(pnode, connman, GetTime()); +} + +void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) +{ + fUpdateConnectionTime = false; + LOCK(cs_main); + CNodeState* state = State(nodeid); + + if (state->fSyncStarted) + nSyncStarted--; + + if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { + fUpdateConnectionTime = true; + } + + for (const QueuedBlock& entry : state->vBlocksInFlight) + mapBlocksInFlight.erase(entry.hash); + EraseOrphansFor(nodeid); + nPreferredDownload -= state->fPreferredDownload; + + mapNodeState.erase(nodeid); +} + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) { LOCK(cs_main); @@ -482,23 +476,6 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats) return true; } -void RegisterNodeSignals(CNodeSignals& nodeSignals) -{ - nodeSignals.ProcessMessages.connect(&ProcessMessages); - nodeSignals.SendMessages.connect(&SendMessages); - nodeSignals.InitializeNode.connect(&InitializeNode); - nodeSignals.FinalizeNode.connect(&FinalizeNode); -} - -void UnregisterNodeSignals(CNodeSignals& nodeSignals) -{ - nodeSignals.ProcessMessages.disconnect(&ProcessMessages); - nodeSignals.SendMessages.disconnect(&SendMessages); - nodeSignals.InitializeNode.disconnect(&InitializeNode); - nodeSignals.FinalizeNode.disconnect(&FinalizeNode); -} - - ////////////////////////////////////////////////////////////////////////////// // // mapOrphanTransactions @@ -1973,7 +1950,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } -bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic& interruptMsgProc) +bool PeerLogicValidation::ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic& interruptMsgProc) { // Message format // (4) message start @@ -2089,7 +2066,7 @@ class CompareInvMempoolOrder } }; -bool SendMessages(CNode* pto, CConnman* connman, std::atomic& interruptMsgProc) +bool PeerLogicValidation::SendMessages(CNode* pto, CConnman* connman, std::atomic& interruptMsgProc) { { // Don't send anything until the version handshake is complete diff --git a/src/net_processing.h b/src/net_processing.h index de5e6ba5534f1..c76063458ab2e 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -32,22 +32,32 @@ static const unsigned int INVENTORY_BROADCAST_INTERVAL = 5; * Limits the impact of low-fee transaction floods. */ static const unsigned int INVENTORY_BROADCAST_MAX = 7 * INVENTORY_BROADCAST_INTERVAL; -/** Register with a network node to receive its signals */ -void RegisterNodeSignals(CNodeSignals& nodeSignals); -/** Unregister a network node */ -void UnregisterNodeSignals(CNodeSignals& nodeSignals); - -class PeerLogicValidation : public CValidationInterface { +class PeerLogicValidation : public CValidationInterface, public NetEventsInterface { private: CConnman* connman; public: - PeerLogicValidation(CConnman* connmanIn); + PeerLogicValidation(CConnman* connman); ~PeerLogicValidation() = default; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void BlockChecked(const CBlock& block, const CValidationState& state) override; + + + void InitializeNode(CNode* pnode, CConnman* connman) override; + void FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) override; + /** Process protocol messages received from a given node */ + bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic& interrupt) override; + /** + * Send queued protocol messages to be sent to a give node. + * + * @param[in] pto The node which we are sending messages to. + * @param[in] connman The connection manager for that node. + * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done + */ + bool SendMessages(CNode* pto, CConnman* connman, std::atomic& interrupt) override; }; struct CNodeStateStats { @@ -61,17 +71,5 @@ struct CNodeStateStats { bool GetNodeStateStats(NodeId nodeid, CNodeStateStats& stats); /** Increase a node's misbehavior score. */ void Misbehaving(NodeId nodeid, int howmuch) EXCLUSIVE_LOCKS_REQUIRED(cs_main); -/** Process protocol messages received from a given node */ -bool ProcessMessages(CNode* pfrom, CConnman* connman, std::atomic& interrupt); -/** - * Send queued protocol messages to be sent to a give node. - * - * @param[in] pto The node which we are sending messages to. - * @param[in] connman The connection manager for that node. - * @param[in] interrupt Interrupt condition for processing threads - * @return True if there is more work to be done - */ -bool SendMessages(CNode* pto, CConnman* connman, std::atomic& interrupt); - #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp index 36146b6b47d81..915f51b2386d9 100644 --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -60,26 +60,26 @@ BOOST_AUTO_TEST_CASE(DoS_banning) CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode1, *connman); + peerLogic->InitializeNode(&dummyNode1, connman); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; misbehave(dummyNode1.GetId(), 100); // Should get banned - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001|0x0000ff00))); // Different IP, not banned CAddress addr2(ip(0xa0b0c002), NODE_NONE); CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, "", true); dummyNode2.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode2, *connman); + peerLogic->InitializeNode(&dummyNode2, connman); dummyNode2.nVersion = 1; dummyNode2.fSuccessfullyConnected = true; misbehave(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode2, connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr2)); // 2 not banned yet... BOOST_CHECK(connman->IsBanned(addr1)); // ... but 1 still should be misbehave(dummyNode2.GetId(), 50); - SendMessages(&dummyNode2, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode2, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr2)); } @@ -92,17 +92,17 @@ BOOST_AUTO_TEST_CASE(DoS_banscore) CAddress addr1(ip(0xa0b0c001), NODE_NONE); CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode1, *connman); + peerLogic->InitializeNode(&dummyNode1, connman); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; misbehave(dummyNode1.GetId(), 100); - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); misbehave(dummyNode1.GetId(), 10); - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); misbehave(dummyNode1.GetId(), 1); - SendMessages(&dummyNode1, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode1, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); gArgs.ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); } @@ -118,12 +118,12 @@ BOOST_AUTO_TEST_CASE(DoS_bantime) CAddress addr(ip(0xa0b0c001), NODE_NONE); CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, "", true); dummyNode.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(&dummyNode, *connman); + peerLogic->InitializeNode(&dummyNode, connman); dummyNode.nVersion = 1; dummyNode.fSuccessfullyConnected = true; misbehave(dummyNode.GetId(), 100); - SendMessages(&dummyNode, *connman, interruptDummy); + peerLogic->SendMessages(&dummyNode, connman, interruptDummy); BOOST_CHECK(connman->IsBanned(addr)); SetMockTime(nStartTime+60*60); diff --git a/src/test/test_pivx.cpp b/src/test/test_pivx.cpp index 20eb499ff1e45..13007091abd18 100644 --- a/src/test/test_pivx.cpp +++ b/src/test/test_pivx.cpp @@ -55,7 +55,6 @@ BasicTestingSetup::~BasicTestingSetup() { fs::remove_all(m_path_root); ECC_Stop(); - g_connman.reset(); deterministicMNManager.reset(); evoDb.reset(); } @@ -107,17 +106,18 @@ TestingSetup::TestingSetup(const std::string& chainName) : BasicTestingSetup(cha nScriptCheckThreads = 3; for (int i=0; i < nScriptCheckThreads-1; i++) threadGroup.create_thread(&ThreadScriptCheck); - RegisterNodeSignals(GetNodeSignals()); + peerLogic.reset(new PeerLogicValidation(connman)); } TestingSetup::~TestingSetup() { - UnregisterNodeSignals(GetNodeSignals()); threadGroup.interrupt_all(); threadGroup.join_all(); GetMainSignals().FlushBackgroundCallbacks(); UnregisterAllValidationInterfaces(); GetMainSignals().UnregisterBackgroundSignalScheduler(); + g_connman.reset(); + peerLogic.reset(); UnloadBlockIndex(); delete pEvoNotificationInterface; delete pcoinsTip; diff --git a/src/test/test_pivx.h b/src/test/test_pivx.h index 9f05ca95f5360..7da9901cedd85 100644 --- a/src/test/test_pivx.h +++ b/src/test/test_pivx.h @@ -50,6 +50,7 @@ struct BasicTestingSetup { * and wallet (if enabled) setup. */ class CConnman; +class PeerLogicValidation; class EvoNotificationInterface; struct TestingSetup: public BasicTestingSetup { @@ -58,6 +59,7 @@ struct TestingSetup: public BasicTestingSetup CConnman* connman; EvoNotificationInterface* pEvoNotificationInterface; CScheduler scheduler; + std::unique_ptr peerLogic; TestingSetup(const std::string& chainName = CBaseChainParams::MAIN); ~TestingSetup();