Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: merge bitcoin#22817, #23042, #22777, #23774, #25443, #26138, #26854, #27128, #27761, #27863, #28287, #30118, partial bitcoin#22778 (auxiliary backports: part 16) #6276

Merged
merged 14 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1998,7 +1998,6 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
switch (conn_type) {
case ConnectionType::INBOUND:
case ConnectionType::MANUAL:
case ConnectionType::FEELER:
return false;
case ConnectionType::OUTBOUND_FULL_RELAY:
max_connections = m_max_outbound_full_relay;
Expand All @@ -2009,6 +2008,9 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ
// no limit for ADDR_FETCH because -seednode has no limit either
case ConnectionType::ADDR_FETCH:
break;
// no limit for FEELER connections since they're short-lived
case ConnectionType::FEELER:
break;
} // no default case, so the compiler can warn about missing cases

// Count existing connections
Expand Down Expand Up @@ -2253,6 +2255,7 @@ bool CConnman::GenerateSelectSet(const std::vector<CNode*>& nodes,
for (CNode* pnode : nodes) {
bool select_recv = !pnode->fHasRecvData;
bool select_send = !pnode->fCanSendData;
if (!select_recv && !select_send) continue;

LOCK(pnode->m_sock_mutex);
if (!pnode->m_sock) {
Expand Down Expand Up @@ -2623,9 +2626,7 @@ void CConnman::SocketHandlerConnected(const std::set<SOCKET>& recv_set,
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer (!fPauseRecv), try
// receiving data (which should succeed as the socket signalled as receivable).
const auto& [to_send, more, _msg_type] = it->second->m_transport->GetBytesToSend(it->second->nSendMsgSize != 0);
const bool queue_is_empty{to_send.empty() && !more};
if (!it->second->fPauseRecv && !it->second->fDisconnect && queue_is_empty) {
if (!it->second->fPauseRecv && !it->second->fDisconnect && it->second->nSendMsgSize == 0) {
it->second->AddRef();
vReceivableNodes.emplace(it->second);
}
Expand Down Expand Up @@ -3301,7 +3302,7 @@ void CConnman::ThreadOpenConnections(const std::vector<std::string> connect, CDe

// Require outbound IPv4/IPv6 connections, other than feelers, to be to distinct network groups
if (!fFeeler && outbound_ipv46_peer_netgroups.count(m_netgroupman.GetGroup(addr))) {
break;
continue;
}

// if we selected an invalid address, restart
Expand Down
4 changes: 2 additions & 2 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -1433,8 +1433,8 @@ friend class CNode;
* Attempts to open a connection. Currently only used from tests.
*
* @param[in] address Address of node to try connecting to
* @param[in] conn_type ConnectionType::OUTBOUND or ConnectionType::BLOCK_RELAY
* or ConnectionType::ADDR_FETCH
* @param[in] conn_type ConnectionType::OUTBOUND, ConnectionType::BLOCK_RELAY,
* ConnectionType::ADDR_FETCH or ConnectionType::FEELER
* @return bool Returns false if there are no available
* slots for this connection:
* - conn_type not a supported ConnectionType
Expand Down
51 changes: 31 additions & 20 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,25 +272,34 @@ struct Peer {

struct TxRelay {
mutable RecursiveMutex m_bloom_filter_mutex;
// We use m_relay_txs for two purposes -
// a) it allows us to not relay tx invs before receiving the peer's version message
// b) the peer may tell us in its version message that we should not relay tx invs
// unless it loads a bloom filter.
/** Whether the peer wishes to receive transaction announcements.
*
* This is initially set based on the fRelay flag in the received
* `version` message. If initially set to false, it can only be flipped
* to true if we have offered the peer NODE_BLOOM services and it sends
* us a `filterload` or `filterclear` message. See BIP37. */
bool m_relay_txs GUARDED_BY(m_bloom_filter_mutex){false};
/** A bloom filter for which transactions to announce to the peer. See BIP37. */
std::unique_ptr<CBloomFilter> m_bloom_filter PT_GUARDED_BY(m_bloom_filter_mutex) GUARDED_BY(m_bloom_filter_mutex){nullptr};

mutable RecursiveMutex m_tx_inventory_mutex;
// inventory based relay
/** A filter of all the txids that the peer has announced to
* us or we have announced to the peer. We use this to avoid announcing
* the same txid to a peer that already has the transaction. */
CRollingBloomFilter m_tx_inventory_known_filter GUARDED_BY(m_tx_inventory_mutex){50000, 0.000001};
// Set of transaction ids we still have to announce.
// They are sorted by the mempool before relay, so the order is not important.
/** Set of transaction ids we still have to announce. We use the
* mempool to sort transactions in dependency order before relay, so
* this does not have to be sorted. */
std::set<uint256> m_tx_inventory_to_send GUARDED_BY(m_tx_inventory_mutex);
// List of non-tx/non-block inventory items
/** List of non-tx/non-block inventory items */
std::vector<CInv> vInventoryOtherToSend GUARDED_BY(m_tx_inventory_mutex);
// Used for BIP35 mempool sending, also protected by m_tx_inventory_mutex
/** Whether the peer has requested us to send our complete mempool. Only
* permitted if the peer has NetPermissionFlags::Mempool. See BIP35. */
bool m_send_mempool GUARDED_BY(m_tx_inventory_mutex){false};
// Last time a "MEMPOOL" request was serviced.
/** The last time a BIP35 `mempool` request was serviced. */
std::atomic<std::chrono::seconds> m_last_mempool_req{0s};
/** The next time after which we will send an `inv` message containing
* transaction announcements to this peer. */
std::chrono::microseconds m_next_inv_send_time GUARDED_BY(NetEventsInterface::g_msgproc_mutex){0};
};

Expand Down Expand Up @@ -1384,7 +1393,7 @@ void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer)
nProtocolVersion = gArgs.GetArg("-pushversion", PROTOCOL_VERSION);
}

const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn();
const bool tx_relay = !m_ignore_incoming_txs && !pnode.IsBlockOnlyConn() && !pnode.IsFeelerConn();
m_connman.PushMessage(&pnode, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERSION, nProtocolVersion, my_services, nTime,
your_services, addr_you, // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime)
my_services, CService(), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime)
Expand Down Expand Up @@ -3831,6 +3840,12 @@ void PeerManagerImpl::ProcessMessage(
best_block = &inv.hash;
}
} else {
if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId());
pfrom.fDisconnect = true;
return;
}

const bool fAlreadyHave = AlreadyHave(inv);
LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
::g_stats_client->inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);
Expand All @@ -3840,11 +3855,7 @@ void PeerManagerImpl::ProcessMessage(
};

AddKnownInv(*peer, inv.hash);
if (fBlocksOnly && NetMessageViolatesBlocksOnly(inv.GetCommand())) {
LogPrint(BCLog::NET, "%s (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.GetCommand(), inv.hash.ToString(), pfrom.GetId());
pfrom.fDisconnect = true;
return;
} else if (!fAlreadyHave) {
if (!fAlreadyHave) {
if (fBlocksOnly && inv.type == MSG_ISDLOCK) {
if (pfrom.GetCommonVersion() <= ADDRV2_PROTO_VERSION) {
// It's ok to receive these invs, we just ignore them
Expand Down Expand Up @@ -5889,7 +5900,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// Stalling only triggers when the block download window cannot move. During normal steady state,
// the download window should be much larger than the to-be-downloaded set of blocks, so disconnection
// should only happen during initial block download.
LogPrintf("Peer=%d is stalling block download, disconnecting\n", pto->GetId());
LogPrintf("Peer=%d%s is stalling block download, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
pto->fDisconnect = true;
// Increase timeout for the next peer so that we don't disconnect multiple peers if our own
// bandwidth is insufficient.
Expand All @@ -5908,7 +5919,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
QueuedBlock &queuedBlock = state.vBlocksInFlight.front();
int nOtherPeersWithValidatedDownloads = m_peers_downloading_from - 1;
if (current_time > state.m_downloading_since + std::chrono::seconds{consensusParams.nPowTargetSpacing} * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) {
LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId());
LogPrintf("Timeout downloading block %s from peer=%d%s, disconnecting\n", queuedBlock.pindex->GetBlockHash().ToString(), pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
pto->fDisconnect = true;
return true;
}
Expand All @@ -5924,11 +5935,11 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
// disconnect our sync peer for stalling; we have bigger
// problems if we can't get any outbound peers.
if (!pto->HasPermission(NetPermissionFlags::NoBan)) {
LogPrintf("Timeout downloading headers from peer=%d, disconnecting\n", pto->GetId());
LogPrintf("Timeout downloading headers from peer=%d%s, disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
pto->fDisconnect = true;
return true;
} else {
LogPrintf("Timeout downloading headers from noban peer=%d, not disconnecting\n", pto->GetId());
LogPrintf("Timeout downloading headers from noban peer=%d%s, not disconnecting\n", pto->GetId(), fLogIPs ? strprintf(" peeraddr=%s", pto->addr.ToStringAddrPort()) : "");
// Reset the headers sync state so that we have a
// chance to try downloading from a different peer.
// Note: this will also result in at least one more
Expand Down
1 change: 1 addition & 0 deletions src/rpc/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "getnodeaddresses", 0, "count"},
{ "addpeeraddress", 1, "port"},
{ "addpeeraddress", 2, "tried"},
{ "sendmsgtopeer", 0, "peer_id" },
{ "stop", 0, "wait" },
{ "verifychainlock", 2, "blockHeight" },
{ "verifyislock", 3, "maxHeight" },
Expand Down
52 changes: 51 additions & 1 deletion src/rpc/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ static RPCHelpMan addconnection()
"\nOpen an outbound connection to a specified node. This RPC is for testing only.\n",
{
{"address", RPCArg::Type::STR, RPCArg::Optional::NO, "The IP address and port to attempt connecting to."},
{"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\" or \"addr-fetch\")."},
{"connection_type", RPCArg::Type::STR, RPCArg::Optional::NO, "Type of connection to open (\"outbound-full-relay\", \"block-relay-only\", \"addr-fetch\" or \"feeler\")."},
},
RPCResult{
RPCResult::Type::OBJ, "", "",
Expand Down Expand Up @@ -380,6 +380,8 @@ static RPCHelpMan addconnection()
conn_type = ConnectionType::BLOCK_RELAY;
} else if (conn_type_in == "addr-fetch") {
conn_type = ConnectionType::ADDR_FETCH;
} else if (conn_type_in == "feeler") {
conn_type = ConnectionType::FEELER;
} else {
throw JSONRPCError(RPC_INVALID_PARAMETER, self.ToString());
}
Expand Down Expand Up @@ -1019,6 +1021,53 @@ static RPCHelpMan addpeeraddress()
};
}

static RPCHelpMan sendmsgtopeer()
{
return RPCHelpMan{
"sendmsgtopeer",
"Send a p2p message to a peer specified by id.\n"
"The message type and body must be provided, the message header will be generated.\n"
"This RPC is for testing only.",
{
{"peer_id", RPCArg::Type::NUM, RPCArg::Optional::NO, "The peer to send the message to."},
{"msg_type", RPCArg::Type::STR, RPCArg::Optional::NO, strprintf("The message type (maximum length %i)", CMessageHeader::COMMAND_SIZE)},
{"msg", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The serialized message body to send, in hex, without a message header"},
},
RPCResult{RPCResult::Type::NONE, "", ""},
RPCExamples{
HelpExampleCli("sendmsgtopeer", "0 \"addr\" \"ffffff\"") + HelpExampleRpc("sendmsgtopeer", "0 \"addr\" \"ffffff\"")},
[&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue {
const NodeId peer_id{request.params[0].get_int()};
const std::string& msg_type{request.params[1].get_str()};
if (msg_type.size() > CMessageHeader::COMMAND_SIZE) {
throw JSONRPCError(RPC_INVALID_PARAMETER, strprintf("Error: msg_type too long, max length is %i", CMessageHeader::COMMAND_SIZE));
}
const std::string& msg{request.params[2].get_str()};
if (!msg.empty() && !IsHex(msg)) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Error parsing input for msg");
}

NodeContext& node = EnsureAnyNodeContext(request.context);
CConnman& connman = EnsureConnman(node);

CSerializedNetMsg msg_ser;
msg_ser.data = ParseHex(msg);
msg_ser.m_type = msg_type;

bool success = connman.ForNode(peer_id, [&](CNode* node) {
connman.PushMessage(node, std::move(msg_ser));
return true;
});

if (!success) {
throw JSONRPCError(RPC_MISC_ERROR, "Error: Could not send message to peer");
}

return NullUniValue;
},
};
}

static RPCHelpMan setmnthreadactive()
{
return RPCHelpMan{"setmnthreadactive",
Expand Down Expand Up @@ -1068,6 +1117,7 @@ static const CRPCCommand commands[] =

{ "hidden", &addconnection, },
{ "hidden", &addpeeraddress, },
{ "hidden", &sendmsgtopeer },
{ "hidden", &setmnthreadactive },
};
// clang-format on
Expand Down
1 change: 1 addition & 0 deletions src/test/fuzz/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ const std::vector<std::string> RPC_COMMANDS_SAFE_FOR_FUZZING{
"pruneblockchain",
"reconsiderblock",
"scantxoutset",
"sendmsgtopeer", // when no peers are connected, no p2p message is sent
"sendrawtransaction",
"setmnthreadactive",
"setmocktime",
Expand Down
24 changes: 22 additions & 2 deletions test/functional/p2p_add_connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@

from test_framework.p2p import P2PInterface
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import check_node_connections

from test_framework.util import (
assert_equal,
check_node_connections,
)

class P2PFeelerReceiver(P2PInterface):
def on_version(self, message):
# The bitcoind node closes feeler connections as soon as a version
# message is received from the test framework. Don't send any responses
# to the node's version message since the connection will already be
# closed.
pass

class P2PAddConnections(BitcoinTestFramework):
def set_test_params(self):
Expand Down Expand Up @@ -86,6 +96,16 @@ def run_test(self):

check_node_connections(node=self.nodes[1], num_in=5, num_out=10)

self.log.info("Add 1 feeler connection to node 0")
feeler_conn = self.nodes[0].add_outbound_p2p_connection(P2PFeelerReceiver(), p2p_idx=6, connection_type="feeler")

# Feeler connection is closed
assert not feeler_conn.is_connected

# Verify version message received
assert_equal(feeler_conn.message_count["version"], 1)
# Feeler connections do not request tx relay
assert_equal(feeler_conn.last_message["version"].relay, 0)

if __name__ == '__main__':
P2PAddConnections().main()
4 changes: 2 additions & 2 deletions test/functional/p2p_disconnect_ban.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def run_test(self):
self.log.info("disconnectnode: successfully disconnect node by address")
address1 = self.nodes[0].getpeerinfo()[0]['addr']
self.nodes[0].disconnectnode(address=address1)
self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10)
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10)
assert not [node for node in self.nodes[0].getpeerinfo() if node['addr'] == address1]

self.log.info("disconnectnode: successfully reconnect node")
Expand All @@ -102,7 +102,7 @@ def run_test(self):
self.log.info("disconnectnode: successfully disconnect node by node id")
id1 = self.nodes[0].getpeerinfo()[0]['id']
self.nodes[0].disconnectnode(nodeid=id1)
self.wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 1, timeout=10)
self.wait_until(lambda: len(self.nodes[1].getpeerinfo()) == 1, timeout=10)
assert not [node for node in self.nodes[0].getpeerinfo() if node['id'] == id1]

if __name__ == '__main__':
Expand Down
37 changes: 37 additions & 0 deletions test/functional/p2p_net_deadlock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/usr/bin/env python3
# Copyright (c) 2023-present The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.

import threading
from test_framework.messages import MAX_PROTOCOL_MESSAGE_LENGTH
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import random_bytes

class NetDeadlockTest(BitcoinTestFramework):
def set_test_params(self):
self.setup_clean_chain = True
self.num_nodes = 2

def run_test(self):
node0 = self.nodes[0]
node1 = self.nodes[1]

self.log.info("Simultaneously send a large message on both sides")
rand_msg = random_bytes(MAX_PROTOCOL_MESSAGE_LENGTH).hex()

thread0 = threading.Thread(target=node0.sendmsgtopeer, args=(0, "unknown", rand_msg))
thread1 = threading.Thread(target=node1.sendmsgtopeer, args=(0, "unknown", rand_msg))

thread0.start()
thread1.start()
thread0.join()
thread1.join()

self.log.info("Check whether a deadlock happened")
self.nodes[0].generate(1)
self.sync_blocks()


if __name__ == '__main__':
NetDeadlockTest().main()
Loading
Loading