diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index bf2005df34d7..7bab1ae191ed 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -52,7 +52,7 @@ use std::{ collections::{HashMap, HashSet, VecDeque}, num::NonZeroUsize, pin::Pin, - sync::Arc, + sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -63,6 +63,18 @@ const LOG_TARGET: &str = "sub-libp2p::discovery"; /// Kademlia query interval. const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5); +/// The convergence time between 2 `FIND_NODE` queries. +/// +/// The time is exponentially increased after each query until it reaches 120 seconds. +/// The time is reset to `KADEMLIA_QUERY_INTERVAL` after a failed query. +const CONVERGENCE_QUERY_INTERVAL: Duration = Duration::from_secs(120); + +/// Ensure at least one discovery query is issued periodically. +/// +/// This has a low impact on the networking backend, while keeping a healthy +/// subset of the network discovered. +const MANDATORY_QUERY_INTERVAL: Duration = Duration::from_secs(30 * 60); + /// mDNS query interval. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30); @@ -78,6 +90,9 @@ const MAX_EXTERNAL_ADDRESSES: u32 = 32; /// addresses a bit more robust. const MIN_ADDRESS_CONFIRMATIONS: usize = 2; +/// Maximum number of in-flight `FIND_NODE` queries. +const MAX_INFLIGHT_FIND_NODE_QUERIES: usize = 16; + /// Discovery events. #[derive(Debug)] pub enum DiscoveryEvent { @@ -182,8 +197,8 @@ pub struct Discovery { /// If `None`, there is currently a query pending. next_kad_query: Option, - /// Active `FIND_NODE` query if it exists. - find_node_query_id: Option, + /// Active `FIND_NODE` queries. + find_node_queries: HashMap, /// Pending events. pending_events: VecDeque, @@ -205,6 +220,15 @@ pub struct Discovery { /// Delay to next `FIND_NODE` query. duration_to_next_find_query: Duration, + + /// Delay to next mandatory `FIND_NODE` query. + next_mandatory_kad_query: Option, + + /// Number of connected peers as reported by the blocks announcement protocol. + num_connected_peers: Arc, + + /// Number of active connections over which we interrupt the discovery process. + discovery_only_if_under_num: usize, } /// Legacy (fallback) Kademlia protocol name based on `protocol_id`. @@ -239,6 +263,8 @@ impl Discovery { protocol_id: &ProtocolId, known_peers: HashMap>, listen_addresses: Arc>>, + num_connected_peers: Arc, + discovery_only_if_under_num: usize, _peerstore_handle: Arc, ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option) { let (ping_config, ping_event_stream) = PingConfig::default(); @@ -279,9 +305,10 @@ impl Discovery { kademlia_handle, _peerstore_handle, listen_addresses, - find_node_query_id: None, + find_node_queries: HashMap::new(), pending_events: VecDeque::new(), duration_to_next_find_query: Duration::from_secs(1), + next_mandatory_kad_query: Some(Delay::new(MANDATORY_QUERY_INTERVAL)), address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)), allow_non_global_addresses: config.allow_non_globals_in_dht, public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(), @@ -290,6 +317,8 @@ impl Discovery { genesis_hash, fork_id, )]), + num_connected_peers, + discovery_only_if_under_num, }, ping_config, identify_config, @@ -298,6 +327,11 @@ impl Discovery { ) } + /// Get number of connected peers. + fn num_connected_peers(&self) -> usize { + self.num_connected_peers.load(std::sync::atomic::Ordering::Relaxed) + } + /// Add known peer to `Kademlia`. #[allow(unused)] pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec) { @@ -473,30 +507,75 @@ impl Stream for Discovery { return Poll::Ready(Some(event)) } - if let Some(mut delay) = this.next_kad_query.take() { + // Mandatory kad random queries issued periodically. + if let Some(mut delay) = this.next_mandatory_kad_query.take() { match delay.poll_unpin(cx) { + Poll::Ready(()) => { + if this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { + let peer = PeerId::random(); + log::debug!(target: LOG_TARGET, "start next mandatory kademlia query for {peer:?}"); + + if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { + this.find_node_queries.insert(query_id, std::time::Instant::now()); + + this.next_mandatory_kad_query = + Some(Delay::new(MANDATORY_QUERY_INTERVAL)); + + return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + } else { + log::error!(target: LOG_TARGET, "Kademlia mandatory query cannot be started: handler channel is full"); + } + } else { + log::debug!(target: LOG_TARGET, "discovery is paused: too many in-flight queries"); + } + }, Poll::Pending => { - this.next_kad_query = Some(delay); + this.next_mandatory_kad_query = Some(delay); }, + } + } + + // Exponential timer that checks more frequently if the node is under a number + // of healthy connected peers reported by the sync protocol. + if let Some(mut delay) = this.next_kad_query.take() { + match delay.poll_unpin(cx) { Poll::Ready(()) => { - let peer = PeerId::random(); + let num_peers = this.num_connected_peers(); + if num_peers < 2 * this.discovery_only_if_under_num && + this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES + { + let peer = PeerId::random(); + log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?} {num_peers}/2x{} connected peers", this.discovery_only_if_under_num,); - log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); + if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { + this.find_node_queries.insert(query_id, std::time::Instant::now()); - match this.kademlia_handle.try_find_node(peer) { - Ok(query_id) => { - this.find_node_query_id = Some(query_id); - return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) - }, - Err(()) => { this.duration_to_next_find_query = cmp::min( this.duration_to_next_find_query * 2, - Duration::from_secs(60), + CONVERGENCE_QUERY_INTERVAL, ); this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); - }, + + return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + } else { + log::error!(target: LOG_TARGET, "Kademlia mandatory query cannot be started: handler channel is full"); + } + } else { + log::debug!( + target: LOG_TARGET, + "discovery is paused: {num_peers}/2x{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}", + this.discovery_only_if_under_num, + this.find_node_queries.len(), + ); } + + this.duration_to_next_find_query = + cmp::min(this.duration_to_next_find_query * 2, CONVERGENCE_QUERY_INTERVAL); + this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); + }, + Poll::Pending => { + this.next_kad_query = Some(delay); }, } } @@ -504,13 +583,16 @@ impl Stream for Discovery { match Pin::new(&mut this.kademlia_handle).poll_next(cx) { Poll::Pending => {}, Poll::Ready(None) => return Poll::Ready(None), - Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => { + Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, query_id, .. })) => { // the addresses are already inserted into the DHT and in `TransportManager` so // there is no need to add them again. The found peers must be registered to // `Peerstore` so other protocols are aware of them through `Peerset`. - log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len()); - this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL)); + if let Some(instant) = this.find_node_queries.remove(&query_id) { + log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?} in {:?}", peers.len(), instant.elapsed()); + } else { + log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?}", peers.len()); + } return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate { peers: peers.into_iter().map(|(peer, _)| peer).collect(), @@ -534,15 +616,14 @@ impl Stream for Discovery { Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) => return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })), Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => { - match this.find_node_query_id == Some(query_id) { - true => { - this.find_node_query_id = None; - this.duration_to_next_find_query = - cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); - this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); - }, - false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })), + if let Some(instant) = this.find_node_queries.remove(&query_id) { + this.duration_to_next_find_query = KADEMLIA_QUERY_INTERVAL; + this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); + + log::debug!(target: LOG_TARGET, "dht random walk failed for {query_id:?} in {:?}", instant.elapsed()); } + + return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })); }, Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => { log::trace!( diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 277f0759729c..0a8ff913da5e 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -166,8 +166,10 @@ pub struct Litep2pNetworkBackend { /// Discovery. discovery: Discovery, - /// Number of connected peers. - num_connected: Arc, + /// Number of uniquely connected peers. + /// + /// This is used to instruct the discovery about the number of connected peers. + num_uniquely_connected: Arc, /// Connected peers. peers: HashMap, @@ -436,6 +438,9 @@ impl NetworkBackend for Litep2pNetworkBac let peer_store_handle = params.network_config.peer_store_handle(); let executor = Arc::new(Litep2pExecutor { executor: params.executor }); + let limit_discovery_under = + params.network_config.network_config.default_peers_set.out_peers as usize + 15; + let FullNetworkConfiguration { notification_protocols, request_response_protocols, @@ -449,6 +454,7 @@ impl NetworkBackend for Litep2pNetworkBac // to the protocol's `Peerset` together with the protocol name to allow other subsystems // of Polkadot SDK to control connectivity of the notification protocol let block_announce_protocol = params.block_announce_config.protocol_name().clone(); + let num_sync_connected = params.block_announce_config.handle.connected_peers.clone(); let mut notif_protocols = HashMap::from_iter([( params.block_announce_config.protocol_name().clone(), params.block_announce_config.handle, @@ -538,6 +544,8 @@ impl NetworkBackend for Litep2pNetworkBac // enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it let listen_addresses = Arc::new(Default::default()); + let num_uniquely_connected = Arc::new(Default::default()); + let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( &network_config, @@ -546,6 +554,8 @@ impl NetworkBackend for Litep2pNetworkBac ¶ms.protocol_id, known_addresses.clone(), Arc::clone(&listen_addresses), + Arc::clone(&num_uniquely_connected), + limit_discovery_under, Arc::clone(&peer_store_handle), ); @@ -599,12 +609,11 @@ impl NetworkBackend for Litep2pNetworkBac )); // register rest of the metrics now that `Litep2p` has been created - let num_connected = Arc::new(Default::default()); let bandwidth: Arc = Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() }); if let Some(registry) = ¶ms.metrics_registry { - MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?; + MetricSources::register(registry, bandwidth, Arc::clone(&num_sync_connected))?; } Ok(Self { @@ -612,7 +621,7 @@ impl NetworkBackend for Litep2pNetworkBac cmd_rx, metrics, peerset_handles: notif_protocols, - num_connected, + num_uniquely_connected, discovery, pending_put_values: HashMap::new(), pending_get_values: HashMap::new(), @@ -691,11 +700,7 @@ impl NetworkBackend for Litep2pNetworkBac log::debug!(target: LOG_TARGET, "starting litep2p network backend"); loop { - let num_connected_peers = self - .peerset_handles - .get(&self.block_announce_protocol) - .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); - self.num_connected.store(num_connected_peers, Ordering::Relaxed); + self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed); tokio::select! { command = self.cmd_rx.next() => match command { @@ -722,11 +727,13 @@ impl NetworkBackend for Litep2pNetworkBac self.event_streams.push(tx); } NetworkServiceCommand::Status { tx } => { + let num_connected_peers = self + .peerset_handles + .get(&self.block_announce_protocol) + .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); + let _ = tx.send(NetworkStatus { - num_connected_peers: self - .peerset_handles - .get(&self.block_announce_protocol) - .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)), + num_connected_peers, total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64, total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64, });