diff --git a/src/net.cpp b/src/net.cpp index 27c2eff4bb9e6..958df9df026c9 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -706,7 +706,7 @@ bool CNode::ReceiveMsgBytes(const char* pch, unsigned int nBytes, bool& complete return false; if (msg.in_data && msg.hdr.nMessageSize > MAX_PROTOCOL_MESSAGE_LENGTH) { - LogPrint(BCLog::NET, "Oversized message from peer=%i, disconnecting", GetId()); + LogPrint(BCLog::NET, "Oversized message from peer=%i, disconnecting\n", GetId()); return false; } @@ -797,12 +797,20 @@ int CNetMessage::readData(const char* pch, unsigned int nBytes) vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024)); } + hasher.Write((const unsigned char*)pch, nCopy); memcpy(&vRecv[nDataPos], pch, nCopy); nDataPos += nCopy; return nCopy; } +const uint256& CNetMessage::GetMessageHash() const +{ + assert(complete()); + if (data_hash.IsNull()) + hasher.Finalize(data_hash.begin()); + return data_hash; +} // requires LOCK(cs_vSend) size_t CConnman::SocketSendData(CNode* pnode) diff --git a/src/net.h b/src/net.h index f0b6db6ec2c55..056c703e2c9d9 100644 --- a/src/net.h +++ b/src/net.h @@ -486,6 +486,9 @@ class CNodeStats class CNetMessage { +private: + mutable CHash256 hasher; + mutable uint256 data_hash; public: bool in_data; // parsing header (false) or data (true) @@ -513,6 +516,8 @@ class CNetMessage return (hdr.nMessageSize == nDataPos); } + const uint256& GetMessageHash() const; + void SetVersion(int nVersionIn) { hdrbuf.SetVersion(nVersionIn); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f98d29056fb66..788124e453fd5 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1352,7 +1352,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR if (vInv.size() > MAX_INV_SZ) { LOCK(cs_main); Misbehaving(pfrom->GetId(), 20); - return error("message inv size() = %u", vInv.size()); + return error("peer=%d message inv size() = %u", pfrom->GetId(), vInv.size()); } LOCK(cs_main); @@ -1402,7 +1402,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR if (vInv.size() > MAX_INV_SZ) { LOCK(cs_main); Misbehaving(pfrom->GetId(), 20); - return error("message getdata size() = %u", vInv.size()); + return error("peer=%d message getdata size() = %u", pfrom->GetId(), vInv.size()); } if (vInv.size() != 1) @@ -2021,7 +2021,7 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // Read header CMessageHeader& hdr = msg.hdr; if (!hdr.IsValid(Params().MessageStart())) { - LogPrint(BCLog::NET, "PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); + LogPrint(BCLog::NET, "PROCESSMESSAGE: ERRORS IN HEADER '%s' peer=%d\n", SanitizeString(hdr.GetCommand()), pfrom->id); return fMoreWork; } std::string strCommand = hdr.GetCommand(); @@ -2029,17 +2029,17 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic& interru // Message size unsigned int nMessageSize = hdr.nMessageSize; - // Checksum - CDataStream& vRecv = msg.vRecv; - uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); - if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) - { - LogPrint(BCLog::NET, "%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", __func__, - SanitizeString(strCommand), nMessageSize, - HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), - HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); - return fMoreWork; - } + // Checksum + CDataStream& vRecv = msg.vRecv; + uint256 hash = msg.GetMessageHash(); + if (memcmp(hash.begin(), hdr.pchChecksum, CMessageHeader::CHECKSUM_SIZE) != 0) + { + LogPrint(BCLog::NET, "%s(%s, %u bytes): CHECKSUM ERROR expected %s was %s\n", __func__, + SanitizeString(strCommand), nMessageSize, + HexStr(hash.begin(), hash.begin()+CMessageHeader::CHECKSUM_SIZE), + HexStr(hdr.pchChecksum, hdr.pchChecksum+CMessageHeader::CHECKSUM_SIZE)); + return fMoreWork; + } // Process message bool fRet = false; diff --git a/test/functional/example_test.py b/test/functional/example_test.py index 757a065dd2b6e..a57a64d190ff5 100755 --- a/test/functional/example_test.py +++ b/test/functional/example_test.py @@ -21,8 +21,6 @@ mininode_lock, msg_block, msg_getdata, - network_thread_join, - network_thread_start, ) from test_framework.test_framework import PivxTestFramework from test_framework.util import ( @@ -135,9 +133,6 @@ def run_test(self): # Create P2P connections to two of the nodes self.nodes[0].add_p2p_connection(BaseNode()) - # Start up network handling in another thread. This needs to be called - # after the P2P connections have been created. - network_thread_start() # wait_for_verack ensures that the P2P connection is fully up. self.nodes[0].p2p.wait_for_verack() @@ -189,14 +184,9 @@ def run_test(self): connect_nodes(self.nodes[1], 2) self.log.info("Add P2P connection to node2") - # We can't add additional P2P connections once the network thread has started. Disconnect the connection - # to node0, wait for the network thread to terminate, then connect to node2. This is specific to - # the current implementation of the network thread and may be improved in future. self.nodes[0].disconnect_p2ps() - network_thread_join() self.nodes[2].add_p2p_connection(BaseNode()) - network_thread_start() self.nodes[2].p2p.wait_for_verack() self.log.info("Wait for node2 reach current tip. Test that it has propagated all the blocks to us") diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py index 5c813e451b59f..b1ec5a48672fd 100755 --- a/test/functional/feature_block.py +++ b/test/functional/feature_block.py @@ -21,7 +21,7 @@ uint256_from_compact, uint256_from_str, ) -from test_framework.mininode import P2PDataStore, network_thread_start, network_thread_join +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, MAX_SCRIPT_ELEMENT_SIZE, @@ -84,7 +84,6 @@ def run_test(self): node = self.nodes[0] # convenience reference to the node # reconnect_p2p() expects the network thread to be running self.log.info("Starting network thread...") - network_thread_start() self.reconnect_p2p() self.block_heights = {} @@ -1207,10 +1206,8 @@ def reconnect_p2p(self): The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" - network_thread_join() self.nodes[0].disconnect_p2ps() self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() def send_blocks(self, blocks, success=True, reject_reason=None, reconnect=False, timeout=60): diff --git a/test/functional/feature_cltv.py b/test/functional/feature_cltv.py index d80121c703a23..a0250fe6e7e5a 100755 --- a/test/functional/feature_cltv.py +++ b/test/functional/feature_cltv.py @@ -67,10 +67,6 @@ def set_test_params(self): def run_test(self): self.nodes[0].add_p2p_connection(P2PInterface()) - - network_thread_start() - - # wait_for_verack ensures that the P2P connection is fully up. self.nodes[0].p2p.wait_for_verack() self.log.info("Mining %d blocks", CLTV_HEIGHT - 2) diff --git a/test/functional/feature_nulldummy.py b/test/functional/feature_nulldummy.py index ba420bda02b36..56a40c1217bc7 100755 --- a/test/functional/feature_nulldummy.py +++ b/test/functional/feature_nulldummy.py @@ -15,7 +15,7 @@ from test_framework.test_framework import PivxTestFramework from test_framework.util import * -from test_framework.mininode import CTransaction, network_thread_start +from test_framework.messages import CTransaction from test_framework.blocktools import create_coinbase, create_block, add_witness_commitment from test_framework.script import CScript from io import BytesIO @@ -48,7 +48,6 @@ def run_test(self): self.address = self.nodes[0].getnewaddress() self.ms_address = self.nodes[0].addmultisigaddress(1,[self.address]) - network_thread_start() self.coinbase_blocks = self.nodes[0].generate(2) # Block 2 coinbase_txid = [] for i in self.coinbase_blocks: diff --git a/test/functional/mining_pos_coldStaking.py b/test/functional/mining_pos_coldStaking.py index fdb93a65daa6b..05aaa41924de6 100755 --- a/test/functional/mining_pos_coldStaking.py +++ b/test/functional/mining_pos_coldStaking.py @@ -8,15 +8,12 @@ from time import sleep from test_framework.messages import CTransaction, CTxIn, CTxOut, COIN, COutPoint -from test_framework.mininode import network_thread_start -from test_framework.pivx_node import PivxTestNode from test_framework.script import CScript, OP_CHECKSIG from test_framework.test_framework import PivxTestFramework from test_framework.util import ( assert_equal, assert_greater_than, assert_raises_rpc_error, - p2p_port, bytes_to_hex_str, set_node_times, ) @@ -34,30 +31,8 @@ def set_test_params(self): self.num_nodes = 3 self.extra_args = [['-nuparams=v5_shield:201']] * self.num_nodes self.extra_args[0].append('-sporkkey=932HEevBSujW2ud7RfB1YF91AFygbBRQj3de3LyaCRqNzKKgWXi') - - def setup_chain(self): - # Start with PoW cache: 200 blocks - self.log.info("Initializing test directory " + self.options.tmpdir) - self._initialize_chain() self.enable_mocktime() - def init_test(self): - title = "*** Starting %s ***" % self.__class__.__name__ - underline = "-" * len(title) - self.log.info("\n\n%s\n%s\n%s\n", title, underline, self.description) - self.DEFAULT_FEE = 0.05 - # Setup the p2p connections and start up the network thread. - self.test_nodes = [] - for i in range(self.num_nodes): - self.test_nodes.append(PivxTestNode()) - self.test_nodes[i].peer_connect('127.0.0.1', p2p_port(i)) - - network_thread_start() # Start up network handling in another thread - - # Let the test nodes get in sync - for i in range(self.num_nodes): - self.test_nodes[i].wait_for_verack() - def setColdStakingEnforcement(self, fEnable=True): sporkName = "SPORK_19_COLDSTAKING_MAINTENANCE" # update spork 19 with node[0] @@ -77,11 +52,12 @@ def isColdStakingEnforced(self): # verify from node[1] return not self.is_spork_active(1, "SPORK_19_COLDSTAKING_MAINTENANCE") - - def run_test(self): self.description = "Performs tests on the Cold Staking P2CS implementation" - self.init_test() + title = "*** Starting %s ***" % self.__class__.__name__ + underline = "-" * len(title) + self.log.info("\n\n%s\n%s\n%s\n", title, underline, self.description) + self.DEFAULT_FEE = 0.05 NUM_OF_INPUTS = 20 INPUT_VALUE = 249 diff --git a/test/functional/p2p_feefilter.py b/test/functional/p2p_feefilter.py index e7e77429a638b..4821f1d6c1546 100755 --- a/test/functional/p2p_feefilter.py +++ b/test/functional/p2p_feefilter.py @@ -47,9 +47,8 @@ def run_test(self): node1.generate(1) self.sync_blocks() - # Setup the p2p connections and start up the network thread. - self.nodes[0].add_p2p_connection(TestNode()) - network_thread_start() + # Setup the p2p connections + self.nodes[0].add_p2p_connection(TestP2PConn()) self.nodes[0].p2p.wait_for_verack() # Test that invs are received for all txs at feerate of 20 sat/byte diff --git a/test/functional/p2p_fingerprint.py b/test/functional/p2p_fingerprint.py index d95c653bfae34..90279d9a91562 100755 --- a/test/functional/p2p_fingerprint.py +++ b/test/functional/p2p_fingerprint.py @@ -18,7 +18,6 @@ msg_block, msg_getdata, msg_getheaders, - network_thread_start, wait_until, ) from test_framework.test_framework import PivxTestFramework @@ -76,8 +75,6 @@ def last_header_equals(self, expected_hash, node): # last month but that have over a month's worth of work are also withheld. def run_test(self): node0 = self.nodes[0].add_p2p_connection(P2PInterface()) - - network_thread_start() node0.wait_for_verack() # Set node time to 60 days ago diff --git a/test/functional/p2p_invalid_block.py b/test/functional/p2p_invalid_block.py index 6aef35f0e35b8..a7de6e29b3431 100755 --- a/test/functional/p2p_invalid_block.py +++ b/test/functional/p2p_invalid_block.py @@ -14,7 +14,7 @@ from test_framework.blocktools import create_block, create_coinbase, create_transaction from test_framework.messages import COIN -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, OP_TRUE, @@ -35,7 +35,6 @@ def run_test(self): # Add p2p connection to node0 node = self.nodes[0] # convenience reference to the node node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() best_block = node.getblock(node.getbestblockhash()) diff --git a/test/functional/p2p_invalid_messages.py b/test/functional/p2p_invalid_messages.py new file mode 100755 index 0000000000000..bc513307cc4e0 --- /dev/null +++ b/test/functional/p2p_invalid_messages.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright (c) 2015-2019 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test node responses to invalid network messages.""" +from test_framework import messages +from test_framework.mininode import ( + P2PDataStore, + P2PInterface, +) +from test_framework.test_framework import PivxTestFramework + +MSG_LIMIT = 2 * 1024 * 1024 # 2MB, per MAX_PROTOCOL_MESSAGE_LENGTH +VALID_DATA_LIMIT = MSG_LIMIT - 5 # Account for the 5-byte length prefix + +class msg_unrecognized: + """Nonsensical message. Modeled after similar types in test_framework.messages.""" + + command = b'badmsg' + + def __init__(self, *, str_data): + self.str_data = str_data.encode() if not isinstance(str_data, bytes) else str_data + + def serialize(self): + return messages.ser_string(self.str_data) + + def __repr__(self): + return "{}(data={})".format(self.command, self.str_data) + + +class InvalidMessagesTest(PivxTestFramework): + def set_test_params(self): + self.num_nodes = 1 + self.setup_clean_chain = True + + def run_test(self): + self.test_magic_bytes() + self.test_checksum() + self.test_size() + self.test_command() + self.test_large_inv() + self.test_resource_exhaustion() + + def test_magic_bytes(self): + conn = self.nodes[0].add_p2p_connection(P2PDataStore()) + with self.nodes[0].assert_debug_log(['PROCESSMESSAGE: INVALID MESSAGESTART badmsg']): + msg = conn.build_message(msg_unrecognized(str_data="d")) + # modify magic bytes + msg = b'\xff' * 4 + msg[4:] + conn.send_raw_message(msg) + conn.wait_for_disconnect(timeout=1) + self.nodes[0].disconnect_p2ps() + + def test_checksum(self): + conn = self.nodes[0].add_p2p_connection(P2PDataStore()) + with self.nodes[0].assert_debug_log(['ProcessMessages(badmsg, 2 bytes): CHECKSUM ERROR expected 78df0a04 was ffffffff']): + msg = conn.build_message(msg_unrecognized(str_data="d")) + # Checksum is after start bytes (4B), message type (12B), len (4B) + cut_len = 4 + 12 + 4 + # modify checksum + msg = msg[:cut_len] + b'\xff' * 4 + msg[cut_len + 4:] + self.nodes[0].p2p.send_raw_message(msg) + conn.sync_with_ping(timeout=1) + self.nodes[0].disconnect_p2ps() + + def test_size(self): + conn = self.nodes[0].add_p2p_connection(P2PDataStore()) + with self.nodes[0].assert_debug_log(['']): + # Create a message with oversized payload + msg = msg_unrecognized(str_data="d" * (VALID_DATA_LIMIT + 1)) + msg = conn.build_message(msg) + self.nodes[0].p2p.send_raw_message(msg) + conn.wait_for_disconnect(timeout=1) + self.nodes[0].disconnect_p2ps() + + def test_command(self): + conn = self.nodes[0].add_p2p_connection(P2PDataStore()) + with self.nodes[0].assert_debug_log(['PROCESSMESSAGE: ERRORS IN HEADER']): + msg = msg_unrecognized(str_data="d") + msg = conn.build_message(msg) + # Modify command + msg = msg[:7] + b'\x00' + msg[7 + 1:] + self.nodes[0].p2p.send_raw_message(msg) + conn.sync_with_ping(timeout=1) + self.nodes[0].disconnect_p2ps() + + def test_large_inv(self): # future: add Misbehaving value check, first invalid message raise it to 20, second to 40. + conn = self.nodes[0].add_p2p_connection(P2PInterface()) + with self.nodes[0].assert_debug_log(['ERROR: peer=5 message inv size() = 50001']): + msg = messages.msg_inv([messages.CInv(1, 1)] * 50001) + conn.send_and_ping(msg) + with self.nodes[0].assert_debug_log(['ERROR: peer=5 message getdata size() = 50001']): + msg = messages.msg_getdata([messages.CInv(1, 1)] * 50001) + conn.send_and_ping(msg) + self.nodes[0].disconnect_p2ps() + + def test_resource_exhaustion(self): + conn = self.nodes[0].add_p2p_connection(P2PDataStore()) + conn2 = self.nodes[0].add_p2p_connection(P2PDataStore()) + msg_at_size = msg_unrecognized(str_data="b" * VALID_DATA_LIMIT) + assert len(msg_at_size.serialize()) == MSG_LIMIT + + self.log.info("Sending a bunch of large, junk messages to test memory exhaustion. May take a bit...") + + # Run a bunch of times to test for memory exhaustion. + for _ in range(80): + conn.send_message(msg_at_size) + + # Check that, even though the node is being hammered by nonsense from one + # connection, it can still service other peers in a timely way. + for _ in range(20): + conn2.sync_with_ping(timeout=2) + + # Peer 1, despite being served up a bunch of nonsense, should still be connected. + self.log.info("Waiting for node to drop junk messages.") + conn.sync_with_ping(timeout=400) + assert conn.is_connected + self.nodes[0].disconnect_p2ps() + + +if __name__ == '__main__': + InvalidMessagesTest().main() diff --git a/test/functional/p2p_invalid_tx.py b/test/functional/p2p_invalid_tx.py index bb15f25d82d5d..e59ba6b260a3e 100755 --- a/test/functional/p2p_invalid_tx.py +++ b/test/functional/p2p_invalid_tx.py @@ -13,11 +13,12 @@ CTxIn, CTxOut, ) -from test_framework.mininode import network_thread_start, P2PDataStore, network_thread_join +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, OP_NOTIF, OP_TRUE, + OP_DROP ) from test_framework.test_framework import PivxTestFramework from test_framework.util import ( @@ -37,7 +38,6 @@ def bootstrap_p2p(self, *, num_connections=1): Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() def reconnect_p2p(self, **kwargs): @@ -46,18 +46,13 @@ def reconnect_p2p(self, **kwargs): The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() - network_thread_join() self.bootstrap_p2p(**kwargs) - def new_spend_tx(self, prev_hash, prev_n, values): - """Create a CTransaction spending COutPoint(prev_hash, prev_n) - - each amount specified in the 'values' list is sent to an - anyone-can-spend script""" + def new_spend_tx(self, prev_hash, prev_n, tx_outs): + """Create a CTransaction spending COutPoint(prev_hash, prev_n) to the CTxOut-list tx_outs.""" tx = CTransaction() tx.vin.append(CTxIn(outpoint=COutPoint(prev_hash, prev_n))) - for value in values: - tx.vout.append(CTxOut(nValue=value, scriptPubKey=CScript([OP_TRUE]))) + tx.vout = tx_outs tx.calc_sha256() return tx @@ -95,21 +90,22 @@ def run_test(self): self.reconnect_p2p(num_connections=2) self.log.info('Test orphan transaction handling ... ') + SCRIPT_PUB_KEY_OP_TRUE = CScript([OP_TRUE, OP_DROP] * 15 + [OP_TRUE]) # Create a root transaction that we withhold until all dependend transactions # are sent out and in the orphan cache - tx_withhold = self.new_spend_tx(block1.vtx[0].sha256, 0, [50 * COIN - 12000]) + tx_withhold = self.new_spend_tx(block1.vtx[0].sha256, 0, [CTxOut(nValue=50 * COIN - 12000, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)]) # Our first orphan tx with 3 outputs to create further orphan txs - tx_orphan_1 = self.new_spend_tx(tx_withhold.sha256, 0, [10 * COIN] * 3) + tx_orphan_1 = self.new_spend_tx(tx_withhold.sha256, 0, [CTxOut(nValue=10 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)] * 3) # A valid transaction with low fee - tx_orphan_2_no_fee = self.new_spend_tx(tx_orphan_1.sha256, 0, [10 * COIN]) + tx_orphan_2_no_fee = self.new_spend_tx(tx_orphan_1.sha256, 0, [CTxOut(nValue=10 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)]) # A valid transaction with sufficient fee - tx_orphan_2_valid = self.new_spend_tx(tx_orphan_1.sha256, 1, [10 * COIN - 12000]) + tx_orphan_2_valid = self.new_spend_tx(tx_orphan_1.sha256, 1, [CTxOut(nValue=10 * COIN - 12000, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)]) # An invalid transaction with negative fee - tx_orphan_2_invalid = self.new_spend_tx(tx_orphan_1.sha256, 2, [11 * COIN]) + tx_orphan_2_invalid = self.new_spend_tx(tx_orphan_1.sha256, 2, [CTxOut(nValue=11 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)]) self.log.info('Send the orphans ... ') # Send valid orphan txs from p2ps[0] @@ -141,6 +137,22 @@ def run_test(self): #wait_until(lambda: 1 == len(node.getpeerinfo()), timeout=12) # p2ps[1] is no longer connected assert_equal(expected_mempool, set(node.getrawmempool())) + self.log.info('Test orphan pool overflow') + orphan_tx_pool = [CTransaction() for _ in range(101)] + for i in range(len(orphan_tx_pool)): + orphan_tx_pool[i].vin.append(CTxIn(outpoint=COutPoint(i, 333))) + orphan_tx_pool[i].vout.append(CTxOut(nValue=11 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)) + + with node.assert_debug_log(['mapOrphan overflow, removed 1 tx']): + node.p2p.send_txs_and_test(orphan_tx_pool, node, success=False) + + rejected_parent = CTransaction() + rejected_parent.vin.append(CTxIn(outpoint=COutPoint(tx_orphan_2_invalid.sha256, 0))) + rejected_parent.vout.append(CTxOut(nValue=11 * COIN, scriptPubKey=SCRIPT_PUB_KEY_OP_TRUE)) + rejected_parent.rehash() + with node.assert_debug_log(['not keeping orphan with rejected parents {}'.format(rejected_parent.hash)]): + node.p2p.send_txs_and_test([rejected_parent], node, success=False) + if __name__ == '__main__': InvalidTxRequestTest().main() diff --git a/test/functional/p2p_leak.py b/test/functional/p2p_leak.py index 2d9ff066cd859..10b4b071c4db0 100755 --- a/test/functional/p2p_leak.py +++ b/test/functional/p2p_leak.py @@ -103,8 +103,6 @@ def run_test(self): unsupported_service_bit5_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK) unsupported_service_bit7_node = self.nodes[0].add_p2p_connection(CLazyNode(), services=NODE_NETWORK) - network_thread_start() - wait_until(lambda: no_version_bannode.ever_connected, timeout=10, lock=mininode_lock) wait_until(lambda: no_version_idlenode.ever_connected, timeout=10, lock=mininode_lock) wait_until(lambda: no_verack_idlenode.version_received, timeout=10, lock=mininode_lock) @@ -118,17 +116,16 @@ def run_test(self): time.sleep(5) #This node should have been banned - assert no_version_bannode.state != "connected" + assert not no_version_bannode.is_connected # These nodes should have been disconnected - assert unsupported_service_bit5_node.state != "connected" - assert unsupported_service_bit7_node.state != "connected" + assert not unsupported_service_bit5_node.is_connected + assert not unsupported_service_bit7_node.is_connected self.nodes[0].disconnect_p2ps() - # Wait until all connections are closed and the network thread has terminated + # Wait until all connections are closed wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0) - network_thread_join() # Make sure no unexpected messages came in assert(no_version_bannode.unexpected_msg == False) @@ -143,11 +140,9 @@ def run_test(self): allowed_service_bit5_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK) allowed_service_bit7_node = self.nodes[0].add_p2p_connection(P2PInterface(), services=NODE_NETWORK) - # Network thread stopped when all previous P2PInterfaces disconnected. Restart it - network_thread_start() - wait_until(lambda: allowed_service_bit5_node.message_count["verack"], lock=mininode_lock) wait_until(lambda: allowed_service_bit7_node.message_count["verack"], lock=mininode_lock) + if __name__ == '__main__': P2PLeakTest().main() diff --git a/test/functional/p2p_mempool.py b/test/functional/p2p_mempool.py index 9f05a560fa5db..ab204483b10f2 100755 --- a/test/functional/p2p_mempool.py +++ b/test/functional/p2p_mempool.py @@ -21,7 +21,6 @@ def set_test_params(self): def run_test(self): # Add a p2p connection self.nodes[0].add_p2p_connection(P2PInterface()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() #request mempool diff --git a/test/functional/p2p_timeouts.py b/test/functional/p2p_timeouts.py index 23a252499a897..dc4cd9ab91c61 100755 --- a/test/functional/p2p_timeouts.py +++ b/test/functional/p2p_timeouts.py @@ -38,18 +38,16 @@ def set_test_params(self): self.num_nodes = 1 def run_test(self): - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections no_verack_node = self.nodes[0].add_p2p_connection(TestNode()) no_version_node = self.nodes[0].add_p2p_connection(TestNode(), send_version=False) no_send_node = self.nodes[0].add_p2p_connection(TestNode(), send_version=False) - network_thread_start() - sleep(1) - assert no_verack_node.connected - assert no_version_node.connected - assert no_send_node.connected + assert no_verack_node.is_connected + assert no_version_node.is_connected + assert no_send_node.is_connected no_verack_node.send_message(msg_ping()) no_version_node.send_message(msg_ping()) @@ -58,18 +56,18 @@ def run_test(self): assert "version" in no_verack_node.last_message - assert no_verack_node.connected - assert no_version_node.connected - assert no_send_node.connected + assert no_verack_node.is_connected + assert no_version_node.is_connected + assert no_send_node.is_connected no_verack_node.send_message(msg_ping()) no_version_node.send_message(msg_ping()) sleep(31) - assert not no_verack_node.connected - assert not no_version_node.connected - assert not no_send_node.connected + assert not no_verack_node.is_connected + assert not no_version_node.is_connected + assert not no_send_node.is_connected if __name__ == '__main__': TimeoutsTest().main() diff --git a/test/functional/p2p_unrequested_blocks.py b/test/functional/p2p_unrequested_blocks.py index 3b873d4efed54..cb47981ec48c8 100755 --- a/test/functional/p2p_unrequested_blocks.py +++ b/test/functional/p2p_unrequested_blocks.py @@ -77,15 +77,11 @@ def setup_network(self): self.setup_nodes() def run_test(self): - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections # test_node connects to node0 (not whitelisted) test_node = self.nodes[0].add_p2p_connection(P2PInterface()) # min_work_node connects to node1 (whitelisted) min_work_node = self.nodes[1].add_p2p_connection(P2PInterface()) - - network_thread_start() - - # Test logic begins here test_node.wait_for_verack() min_work_node.wait_for_verack() @@ -208,10 +204,8 @@ def run_test(self): self.nodes[0].disconnect_p2ps() self.nodes[1].disconnect_p2ps() - network_thread_join() test_node = self.nodes[0].add_p2p_connection(P2PInterface()) - network_thread_start() test_node.wait_for_verack() test_node.send_message(msg_block(block_h1f)) @@ -297,8 +291,6 @@ def run_test(self): self.nodes[0].disconnect_p2ps() test_node = self.nodes[0].add_p2p_connection(P2PInterface()) - - network_thread_start() test_node.wait_for_verack() # We should have failed reorg and switched back to 290 (but have block 291) diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index 7ff905495495a..8133197533493 100644 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -28,7 +28,7 @@ MIN_VERSION_SUPPORTED = 60001 MY_VERSION = 70922 -MY_SUBVERSION = b"/python-mininode-tester:0.0.3/" +MY_SUBVERSION = "/python-mininode-tester:0.0.3/" MY_RELAY = 1 # from version 70001 onwards, fRelay should be appended to version messages (BIP37) MAX_INV_SZ = 50000 @@ -927,7 +927,7 @@ def deserialize(self, f): self.addrFrom = CAddress() self.addrFrom.deserialize(f) self.nNonce = struct.unpack(" 0: self.recvbuf += t self._on_data() @@ -148,7 +138,7 @@ def _on_data(self): while True: if len(self.recvbuf) < 4: return - if self.recvbuf[:4] != MAGIC_BYTES[self.network]: + if self.recvbuf[:4] != self.magic_bytes: raise ValueError("got garbage %s" % repr(self.recvbuf)) if len(self.recvbuf) < 4 + 12 + 4 + 4: return @@ -181,42 +171,37 @@ def on_message(self, message): # Socket write methods - def writable(self): - """asyncore method to determine whether the handle_write() callback should be called on the next loop.""" - with mininode_lock: - pre_connection = self.state == "connecting" - length = len(self.sendbuf) - return (length > 0 or pre_connection) - - def handle_write(self): - """asyncore callback when data should be written to the socket.""" - with mininode_lock: - # asyncore does not expose socket connection, only the first read/write - # event, thus we must check connection manually here to know when we - # actually connect - if self.state == "connecting": - self.handle_connect() - if not self.writable(): - return - - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - - def send_message(self, message, pushbuf=False): + def send_message(self, message): """Send a P2P message over the socket. This method takes a P2P payload, builds the P2P header and adds the message to the send buffer to be sent over the socket.""" - if self.state != "connected" and not pushbuf: - raise IOError('Not connected, no pushbuf') + tmsg = self.build_message(message) self._log_message("send", message) + return self.send_raw_message(tmsg) + + def send_raw_message(self, raw_message_bytes): + if not self.is_connected: + raise IOError('Not connected') + + def maybe_write(): + if not self._transport: + return + # Python <3.4.4 does not have is_closing, so we have to check for + # its existence explicitly as long as Bitcoin Core supports all + # Python 3.4 versions. + if hasattr(self._transport, 'is_closing') and self._transport.is_closing(): + return + self._transport.write(raw_message_bytes) + NetworkThread.network_event_loop.call_soon_threadsafe(maybe_write) + + # Class utility methods + + def build_message(self, message): + """Build a serialized P2P message""" command = message.command data = message.serialize() - tmsg = MAGIC_BYTES[self.network] + tmsg = self.magic_bytes tmsg += command tmsg += b"\x00" * (12 - len(command)) tmsg += struct.pack("