From a126147efad227b3e00ca32e99856e22fda699ba Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 1 Jul 2024 13:44:20 +0300 Subject: [PATCH 01/20] net/litep2p: Update connected peers from network backend sooner Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/mod.rs | 24 +++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 824f62082cac..12268466fa51 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -259,6 +259,17 @@ impl Litep2pNetworkBackend { Ok((local_identity, local_peer_id)) } + /// Fetch the number of connected peers from the peerset handle and update + /// the atomic `num_connected` shared between the network backend. + fn fetch_connected_peers(&self) -> usize { + let num_connected_peers = self + .peerset_handles + .get(&self.block_announce_protocol) + .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); + self.num_connected.store(num_connected_peers, Ordering::Relaxed); + num_connected_peers + } + /// Configure transport protocols for `Litep2pNetworkBackend`. fn configure_transport( config: &FullNetworkConfiguration, @@ -682,11 +693,7 @@ impl NetworkBackend for Litep2pNetworkBac log::debug!(target: LOG_TARGET, "starting litep2p network backend"); loop { - let num_connected_peers = self - .peerset_handles - .get(&self.block_announce_protocol) - .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); - self.num_connected.store(num_connected_peers, Ordering::Relaxed); + self.fetch_connected_peers(); tokio::select! { command = self.cmd_rx.next() => match command { @@ -707,11 +714,10 @@ impl NetworkBackend for Litep2pNetworkBac self.event_streams.push(tx); } NetworkServiceCommand::Status { tx } => { + let num_connected_peers = self.fetch_connected_peers(); + let _ = tx.send(NetworkStatus { - num_connected_peers: self - .peerset_handles - .get(&self.block_announce_protocol) - .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)), + num_connected_peers, total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64, total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64, }); From d9b57ee10878b705b9ca043fe71e24b0c8a6b5c0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 1 Jul 2024 13:49:12 +0300 Subject: [PATCH 02/20] net/litep2p: Propagate connected peers to the discovery mechanism Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 12 +++++++++++- substrate/client/network/src/litep2p/mod.rs | 5 ++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 6ff05e6af327..f426f52cb7d9 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -51,7 +51,7 @@ use std::{ cmp, collections::{HashMap, HashSet, VecDeque}, pin::Pin, - sync::Arc, + sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -195,6 +195,9 @@ pub struct Discovery { /// Delay to next `FIND_NODE` query. duration_to_next_find_query: Duration, + + /// Number of connected peers as reported by the blocks announcement protocol. + num_connected_peers: Arc, } /// Legacy (fallback) Kademlia protocol name based on `protocol_id`. @@ -229,6 +232,7 @@ impl Discovery { protocol_id: &ProtocolId, known_peers: HashMap>, listen_addresses: Arc>>, + num_connected_peers: Arc, _peerstore_handle: Arc, ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option) { let (ping_config, ping_event_stream) = PingConfig::default(); @@ -282,6 +286,7 @@ impl Discovery { genesis_hash, fork_id, )]), + num_connected_peers, }, ping_config, identify_config, @@ -290,6 +295,11 @@ impl Discovery { ) } + /// Get number of connected peers. + fn num_connected_peers(&self) -> usize { + self.num_connected_peers.load(std::sync::atomic::Ordering::Relaxed) + } + /// Add known peer to `Kademlia`. #[allow(unused)] pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec) { diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 12268466fa51..a940cfd3ec1f 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -404,6 +404,7 @@ impl NetworkBackend for Litep2pNetworkBac where Self: Sized, { + config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15); let (keypair, local_peer_id) = Self::get_keypair(¶ms.network_config.network_config.node_key)?; let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000); @@ -552,6 +553,8 @@ impl NetworkBackend for Litep2pNetworkBac // enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it let listen_addresses = Arc::new(Default::default()); + let num_connected = Arc::new(Default::default()); + let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( &network_config, @@ -560,6 +563,7 @@ impl NetworkBackend for Litep2pNetworkBac ¶ms.protocol_id, known_addresses.clone(), Arc::clone(&listen_addresses), + Arc::clone(&num_connected), Arc::clone(&peer_store_handle), ); @@ -603,7 +607,6 @@ impl NetworkBackend for Litep2pNetworkBac )); // register rest of the metrics now that `Litep2p` has been created - let num_connected = Arc::new(Default::default()); let bandwidth: Arc = Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() }); From 3b2c5ac6ca5ce039a3181064cb6c2f9e761b8802 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 1 Jul 2024 14:03:22 +0300 Subject: [PATCH 03/20] net/litep2p: Start random walks if under connected peers limit Signed-off-by: Alexandru Vasile --- .../client/network/src/litep2p/discovery.rs | 45 ++++++++++--------- substrate/client/network/src/litep2p/mod.rs | 5 ++- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index f426f52cb7d9..f162f5bb9b2c 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -198,6 +198,9 @@ pub struct Discovery { /// Number of connected peers as reported by the blocks announcement protocol. num_connected_peers: Arc, + + /// Number of active connections over which we interrupt the discovery process. + discovery_only_if_under_num: usize, } /// Legacy (fallback) Kademlia protocol name based on `protocol_id`. @@ -233,6 +236,7 @@ impl Discovery { known_peers: HashMap>, listen_addresses: Arc>>, num_connected_peers: Arc, + discovery_only_if_under_num: usize, _peerstore_handle: Arc, ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option) { let (ping_config, ping_event_stream) = PingConfig::default(); @@ -287,6 +291,7 @@ impl Discovery { fork_id, )]), num_connected_peers, + discovery_only_if_under_num, }, ping_config, identify_config, @@ -457,30 +462,28 @@ impl Stream for Discovery { } if let Some(mut delay) = this.next_kad_query.take() { - match delay.poll_unpin(cx) { - Poll::Pending => { - this.next_kad_query = Some(delay); - }, - Poll::Ready(()) => { + while delay.poll_unpin(cx).is_ready() { + let num_peers = this.num_connected_peers(); + if num_peers < this.discovery_only_if_under_num { let peer = PeerId::random(); + log::info!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); + + let started = this.kademlia_handle.try_find_node(peer).is_ok(); - log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); - - match this.kademlia_handle.try_find_node(peer) { - Ok(query_id) => { - this.find_node_query_id = Some(query_id); - return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) - }, - Err(()) => { - this.duration_to_next_find_query = cmp::min( - this.duration_to_next_find_query * 2, - Duration::from_secs(60), - ); - this.next_kad_query = - Some(Delay::new(this.duration_to_next_find_query)); - }, + this.duration_to_next_find_query = + cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); + this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); + + if started { + this.find_node_query_id = None; + return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) } - }, + } else { + log::info!( + target: LOG_TARGET, + "discovery is disabled as we have {num_peers} connected peers." + ); + } } } diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index a940cfd3ec1f..f3ecd9e0e84f 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -404,7 +404,6 @@ impl NetworkBackend for Litep2pNetworkBac where Self: Sized, { - config.discovery_limit(u64::from(network_config.default_peers_set.out_peers) + 15); let (keypair, local_peer_id) = Self::get_keypair(¶ms.network_config.network_config.node_key)?; let (cmd_tx, cmd_rx) = tracing_unbounded("mpsc_network_worker", 100_000); @@ -451,6 +450,9 @@ impl NetworkBackend for Litep2pNetworkBac let peer_store_handle = params.network_config.peer_store_handle(); let executor = Arc::new(Litep2pExecutor { executor: params.executor }); + let limit_discovery_under = + params.network_config.network_config.default_peers_set.out_peers as usize + 15; + let FullNetworkConfiguration { notification_protocols, request_response_protocols, @@ -564,6 +566,7 @@ impl NetworkBackend for Litep2pNetworkBac known_addresses.clone(), Arc::clone(&listen_addresses), Arc::clone(&num_connected), + limit_discovery_under, Arc::clone(&peer_store_handle), ); From b61e97371efffc1866914cecef17abd9edbbf76f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Jul 2024 14:56:30 +0300 Subject: [PATCH 04/20] net/kad: Ensure timer moves forward Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index f162f5bb9b2c..02a204a0ac92 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -470,10 +470,6 @@ impl Stream for Discovery { let started = this.kademlia_handle.try_find_node(peer).is_ok(); - this.duration_to_next_find_query = - cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); - this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); - if started { this.find_node_query_id = None; return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) @@ -485,6 +481,10 @@ impl Stream for Discovery { ); } } + + this.duration_to_next_find_query = + cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); + this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); } match Pin::new(&mut this.kademlia_handle).poll_next(cx) { From 837f196785b5895e3afcfaa629355c134e9d7895 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Jul 2024 19:30:35 +0300 Subject: [PATCH 05/20] net/kad: Keep record of inflight kad queries Signed-off-by: Alexandru Vasile --- .../client/network/src/litep2p/discovery.rs | 76 +++++++++++-------- 1 file changed, 43 insertions(+), 33 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 02a204a0ac92..cccb88838190 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -172,8 +172,8 @@ pub struct Discovery { /// If `None`, there is currently a query pending. next_kad_query: Option, - /// Active `FIND_NODE` query if it exists. - find_node_query_id: Option, + /// Active `FIND_NODE` queries. + find_node_queries: HashMap, /// Pending events. pending_events: VecDeque, @@ -279,7 +279,7 @@ impl Discovery { kademlia_handle, _peerstore_handle, listen_addresses, - find_node_query_id: None, + find_node_queries: HashMap::new(), pending_events: VecDeque::new(), duration_to_next_find_query: Duration::from_secs(1), address_confirmations: LruMap::new(ByLength::new(8)), @@ -462,35 +462,45 @@ impl Stream for Discovery { } if let Some(mut delay) = this.next_kad_query.take() { - while delay.poll_unpin(cx).is_ready() { - let num_peers = this.num_connected_peers(); - if num_peers < this.discovery_only_if_under_num { - let peer = PeerId::random(); - log::info!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); - - let started = this.kademlia_handle.try_find_node(peer).is_ok(); - - if started { - this.find_node_query_id = None; - return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + match delay.poll_unpin(cx) { + Poll::Ready(()) => { + let num_peers = this.num_connected_peers(); + if num_peers < this.discovery_only_if_under_num { + let peer = PeerId::random(); + log::info!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); + + if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { + this.find_node_queries.insert(query_id, std::time::Instant::now()); + this.duration_to_next_find_query = cmp::min( + this.duration_to_next_find_query * 2, + Duration::from_secs(60), + ); + this.next_kad_query = + Some(Delay::new(this.duration_to_next_find_query)); + + return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + } + } else { + log::info!( + target: LOG_TARGET, + "discovery is disabled as we have {num_peers} connected peers." + ); } - } else { - log::info!( - target: LOG_TARGET, - "discovery is disabled as we have {num_peers} connected peers." - ); - } - } - this.duration_to_next_find_query = - cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); - this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); + this.duration_to_next_find_query = + cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); + this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); + }, + Poll::Pending => { + this.next_kad_query = Some(delay); + }, + } } match Pin::new(&mut this.kademlia_handle).poll_next(cx) { Poll::Pending => {}, Poll::Ready(None) => return Poll::Ready(None), - Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => { + Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, query_id, .. })) => { // the addresses are already inserted into the DHT and in `TransportManager` so // there is no need to add them again. The found peers must be registered to // `Peerstore` so other protocols are aware of them through `Peerset`. @@ -498,6 +508,10 @@ impl Stream for Discovery { this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL)); + if let Some(instant) = this.find_node_queries.remove(&query_id) { + log::info!(target: LOG_TARGET, "dht random walk yielded {} peers {query_id:?} in {:?}", peers.len(), instant.elapsed()); + } + return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate { peers: peers.into_iter().map(|(peer, _)| peer).collect(), })) @@ -520,15 +534,11 @@ impl Stream for Discovery { Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) => return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })), Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => { - match this.find_node_query_id == Some(query_id) { - true => { - this.find_node_query_id = None; - this.duration_to_next_find_query = - cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); - this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); - }, - false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })), + if let Some(instant) = this.find_node_queries.remove(&query_id) { + log::warn!(target: LOG_TARGET, "`GET_RECORD` failed for {query_id:?} in {:?}", instant.elapsed()); } + + return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })); }, Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => { log::trace!( From 5610b9532bb8e7bc645eba07c26f459d8f0a22fc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Jul 2024 19:38:33 +0300 Subject: [PATCH 06/20] net/kad: Reset kad query timer on failure Signed-off-by: Alexandru Vasile --- .../client/network/src/litep2p/discovery.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index cccb88838190..dcced2130708 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -62,6 +62,12 @@ const LOG_TARGET: &str = "sub-libp2p::discovery"; /// Kademlia query interval. const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5); +/// The convergence time between 2 `FIND_NODE` queries. +/// +/// The time is exponentially increased after each query until it reaches 120 seconds. +/// The time is reset to `KADEMLIA_QUERY_INTERVAL` after a failed query. +const CONVERGENCE_QUERY_INTERVAL: Duration = Duration::from_secs(120); + /// mDNS query interval. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30); @@ -471,9 +477,10 @@ impl Stream for Discovery { if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { this.find_node_queries.insert(query_id, std::time::Instant::now()); + this.duration_to_next_find_query = cmp::min( this.duration_to_next_find_query * 2, - Duration::from_secs(60), + CONVERGENCE_QUERY_INTERVAL, ); this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); @@ -488,7 +495,7 @@ impl Stream for Discovery { } this.duration_to_next_find_query = - cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60)); + cmp::min(this.duration_to_next_find_query * 2, CONVERGENCE_QUERY_INTERVAL); this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); }, Poll::Pending => { @@ -535,7 +542,10 @@ impl Stream for Discovery { return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })), Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => { if let Some(instant) = this.find_node_queries.remove(&query_id) { - log::warn!(target: LOG_TARGET, "`GET_RECORD` failed for {query_id:?} in {:?}", instant.elapsed()); + this.duration_to_next_find_query = KADEMLIA_QUERY_INTERVAL; + this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); + + log::warn!(target: LOG_TARGET, "dht random walk failed for {query_id:?} in {:?}", instant.elapsed()); } return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })); From f5aee829b0683cbf9e813568844ffed4bc1ea75f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Jul 2024 19:40:23 +0300 Subject: [PATCH 07/20] net/kad: Downgrade logs to trace and debug Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index dcced2130708..40ca3ef7864a 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -473,7 +473,7 @@ impl Stream for Discovery { let num_peers = this.num_connected_peers(); if num_peers < this.discovery_only_if_under_num { let peer = PeerId::random(); - log::info!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); + log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { this.find_node_queries.insert(query_id, std::time::Instant::now()); @@ -488,7 +488,7 @@ impl Stream for Discovery { return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) } } else { - log::info!( + log::debug!( target: LOG_TARGET, "discovery is disabled as we have {num_peers} connected peers." ); @@ -511,12 +511,11 @@ impl Stream for Discovery { // the addresses are already inserted into the DHT and in `TransportManager` so // there is no need to add them again. The found peers must be registered to // `Peerstore` so other protocols are aware of them through `Peerset`. - log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len()); - - this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL)); if let Some(instant) = this.find_node_queries.remove(&query_id) { - log::info!(target: LOG_TARGET, "dht random walk yielded {} peers {query_id:?} in {:?}", peers.len(), instant.elapsed()); + log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?} in {:?}", peers.len(), instant.elapsed()); + } else { + log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?}", peers.len()); } return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate { @@ -545,7 +544,7 @@ impl Stream for Discovery { this.duration_to_next_find_query = KADEMLIA_QUERY_INTERVAL; this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query)); - log::warn!(target: LOG_TARGET, "dht random walk failed for {query_id:?} in {:?}", instant.elapsed()); + log::debug!(target: LOG_TARGET, "dht random walk failed for {query_id:?} in {:?}", instant.elapsed()); } return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })); From 6ac1bffc085bf6c26867939aeccd539ada823755 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 3 Jul 2024 19:44:54 +0300 Subject: [PATCH 08/20] net/kad: Bound maximum number of queries Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 40ca3ef7864a..18ba1771bf04 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -74,6 +74,9 @@ const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30); /// Minimum number of confirmations received before an address is verified. const MIN_ADDRESS_CONFIRMATIONS: usize = 5; +/// Maximum number of in-flight `FIND_NODE` queries. +const MAX_INFLIGHT_FIND_NODE_QUERIES: usize = 16; + /// Discovery events. #[derive(Debug)] pub enum DiscoveryEvent { @@ -471,7 +474,9 @@ impl Stream for Discovery { match delay.poll_unpin(cx) { Poll::Ready(()) => { let num_peers = this.num_connected_peers(); - if num_peers < this.discovery_only_if_under_num { + if num_peers < this.discovery_only_if_under_num && + this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES + { let peer = PeerId::random(); log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); @@ -490,7 +495,9 @@ impl Stream for Discovery { } else { log::debug!( target: LOG_TARGET, - "discovery is disabled as we have {num_peers} connected peers." + "discovery is paused: {num_peers}/{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}", + this.discovery_only_if_under_num, + this.find_node_queries.len(), ); } From 4983b890dbe98b5a912ff5dc7ebe290b701a0806 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 14:39:32 +0300 Subject: [PATCH 09/20] litep2p: Rename num connected peers to num sync peers Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index f3ecd9e0e84f..5be8d90ca0a8 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -166,8 +166,8 @@ pub struct Litep2pNetworkBackend { /// Discovery. discovery: Discovery, - /// Number of connected peers. - num_connected: Arc, + /// Number of connected peers over the block announce protocol. + num_sync_connected: Arc, /// Connected peers. peers: HashMap, @@ -262,12 +262,12 @@ impl Litep2pNetworkBackend { /// Fetch the number of connected peers from the peerset handle and update /// the atomic `num_connected` shared between the network backend. fn fetch_connected_peers(&self) -> usize { - let num_connected_peers = self + let num_sync_connected = self .peerset_handles .get(&self.block_announce_protocol) .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); - self.num_connected.store(num_connected_peers, Ordering::Relaxed); - num_connected_peers + self.num_sync_connected.store(num_sync_connected, Ordering::Relaxed); + num_sync_connected } /// Configure transport protocols for `Litep2pNetworkBackend`. @@ -555,7 +555,7 @@ impl NetworkBackend for Litep2pNetworkBac // enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it let listen_addresses = Arc::new(Default::default()); - let num_connected = Arc::new(Default::default()); + let num_sync_connected = Arc::new(Default::default()); let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( @@ -565,7 +565,7 @@ impl NetworkBackend for Litep2pNetworkBac ¶ms.protocol_id, known_addresses.clone(), Arc::clone(&listen_addresses), - Arc::clone(&num_connected), + Arc::clone(&num_sync_connected), limit_discovery_under, Arc::clone(&peer_store_handle), ); @@ -614,7 +614,7 @@ impl NetworkBackend for Litep2pNetworkBac Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() }); if let Some(registry) = ¶ms.metrics_registry { - MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?; + MetricSources::register(registry, bandwidth, Arc::clone(&num_sync_connected))?; } Ok(Self { @@ -622,7 +622,7 @@ impl NetworkBackend for Litep2pNetworkBac cmd_rx, metrics, peerset_handles: notif_protocols, - num_connected, + num_sync_connected, discovery, pending_put_values: HashMap::new(), pending_get_values: HashMap::new(), From 32c92575836c12625c910329cce736ee9321e2f6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 14:40:05 +0300 Subject: [PATCH 10/20] litep2p: Rename function to illustrate sync peers Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 5be8d90ca0a8..867066d2959b 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -261,7 +261,7 @@ impl Litep2pNetworkBackend { /// Fetch the number of connected peers from the peerset handle and update /// the atomic `num_connected` shared between the network backend. - fn fetch_connected_peers(&self) -> usize { + fn fetch_sync_connected_peers(&self) -> usize { let num_sync_connected = self .peerset_handles .get(&self.block_announce_protocol) @@ -699,7 +699,7 @@ impl NetworkBackend for Litep2pNetworkBac log::debug!(target: LOG_TARGET, "starting litep2p network backend"); loop { - self.fetch_connected_peers(); + self.fetch_sync_connected_peers(); tokio::select! { command = self.cmd_rx.next() => match command { @@ -720,7 +720,7 @@ impl NetworkBackend for Litep2pNetworkBac self.event_streams.push(tx); } NetworkServiceCommand::Status { tx } => { - let num_connected_peers = self.fetch_connected_peers(); + let num_connected_peers = self.fetch_sync_connected_peers(); let _ = tx.send(NetworkStatus { num_connected_peers, From 2a6d231b514643657516590d5255dc355a943863 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 14:41:07 +0300 Subject: [PATCH 11/20] litep2p: Improve comment wrt num_sync_connected Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 867066d2959b..d7d043b6dd91 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -167,6 +167,8 @@ pub struct Litep2pNetworkBackend { discovery: Discovery, /// Number of connected peers over the block announce protocol. + /// + /// This is used to update metrics and network status. num_sync_connected: Arc, /// Connected peers. From c71fa9ef9586c84b03e1274576d63e1ba1e755ff Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 8 Jul 2024 14:44:56 +0300 Subject: [PATCH 12/20] litep2p: Extract number of distinct connected peers Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index d7d043b6dd91..7870a02f47c1 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -171,6 +171,11 @@ pub struct Litep2pNetworkBackend { /// This is used to update metrics and network status. num_sync_connected: Arc, + /// Number of uniquely connected peers. + /// + /// This is used to instruct the discovery about the number of connected peers. + num_uniquely_connected: Arc, + /// Connected peers. peers: HashMap, @@ -557,7 +562,7 @@ impl NetworkBackend for Litep2pNetworkBac // enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it let listen_addresses = Arc::new(Default::default()); - let num_sync_connected = Arc::new(Default::default()); + let num_uniquely_connected = Arc::new(Default::default()); let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( @@ -567,7 +572,7 @@ impl NetworkBackend for Litep2pNetworkBac ¶ms.protocol_id, known_addresses.clone(), Arc::clone(&listen_addresses), - Arc::clone(&num_sync_connected), + Arc::clone(&num_uniquely_connected), limit_discovery_under, Arc::clone(&peer_store_handle), ); @@ -615,6 +620,7 @@ impl NetworkBackend for Litep2pNetworkBac let bandwidth: Arc = Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() }); + let num_sync_connected = Arc::new(Default::default()); if let Some(registry) = ¶ms.metrics_registry { MetricSources::register(registry, bandwidth, Arc::clone(&num_sync_connected))?; } @@ -625,6 +631,7 @@ impl NetworkBackend for Litep2pNetworkBac metrics, peerset_handles: notif_protocols, num_sync_connected, + num_uniquely_connected, discovery, pending_put_values: HashMap::new(), pending_get_values: HashMap::new(), @@ -702,6 +709,7 @@ impl NetworkBackend for Litep2pNetworkBac loop { self.fetch_sync_connected_peers(); + self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed); tokio::select! { command = self.cmd_rx.next() => match command { From 24bafe3f177b2f14ce35b614d0564ce65681014f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 11 Jul 2024 15:29:40 +0300 Subject: [PATCH 13/20] litep2p: Remove fetch sync peers method Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/mod.rs | 25 +++++---------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index 7870a02f47c1..d910d032069b 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -166,11 +166,6 @@ pub struct Litep2pNetworkBackend { /// Discovery. discovery: Discovery, - /// Number of connected peers over the block announce protocol. - /// - /// This is used to update metrics and network status. - num_sync_connected: Arc, - /// Number of uniquely connected peers. /// /// This is used to instruct the discovery about the number of connected peers. @@ -266,17 +261,6 @@ impl Litep2pNetworkBackend { Ok((local_identity, local_peer_id)) } - /// Fetch the number of connected peers from the peerset handle and update - /// the atomic `num_connected` shared between the network backend. - fn fetch_sync_connected_peers(&self) -> usize { - let num_sync_connected = self - .peerset_handles - .get(&self.block_announce_protocol) - .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); - self.num_sync_connected.store(num_sync_connected, Ordering::Relaxed); - num_sync_connected - } - /// Configure transport protocols for `Litep2pNetworkBackend`. fn configure_transport( config: &FullNetworkConfiguration, @@ -473,6 +457,7 @@ impl NetworkBackend for Litep2pNetworkBac // to the protocol's `Peerset` together with the protocol name to allow other subsystems // of Polkadot SDK to control connectivity of the notification protocol let block_announce_protocol = params.block_announce_config.protocol_name().clone(); + let num_sync_connected = params.block_announce_config.handle.connected_peers.clone(); let mut notif_protocols = HashMap::from_iter([( params.block_announce_config.protocol_name().clone(), params.block_announce_config.handle, @@ -620,7 +605,6 @@ impl NetworkBackend for Litep2pNetworkBac let bandwidth: Arc = Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() }); - let num_sync_connected = Arc::new(Default::default()); if let Some(registry) = ¶ms.metrics_registry { MetricSources::register(registry, bandwidth, Arc::clone(&num_sync_connected))?; } @@ -630,7 +614,6 @@ impl NetworkBackend for Litep2pNetworkBac cmd_rx, metrics, peerset_handles: notif_protocols, - num_sync_connected, num_uniquely_connected, discovery, pending_put_values: HashMap::new(), @@ -708,7 +691,6 @@ impl NetworkBackend for Litep2pNetworkBac log::debug!(target: LOG_TARGET, "starting litep2p network backend"); loop { - self.fetch_sync_connected_peers(); self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed); tokio::select! { @@ -730,7 +712,10 @@ impl NetworkBackend for Litep2pNetworkBac self.event_streams.push(tx); } NetworkServiceCommand::Status { tx } => { - let num_connected_peers = self.fetch_sync_connected_peers(); + let num_connected_peers = self + .peerset_handles + .get(&self.block_announce_protocol) + .map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)); let _ = tx.send(NetworkStatus { num_connected_peers, From b6c99011ac9cfb2058746c02e79b7c01d4627f77 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 16 Jul 2024 19:41:34 +0300 Subject: [PATCH 14/20] litep2p: Periodically perform kademlia queries instead if under number Signed-off-by: Alexandru Vasile --- .../client/network/src/litep2p/discovery.rs | 26 +++---------------- substrate/client/network/src/litep2p/mod.rs | 19 +------------- 2 files changed, 4 insertions(+), 41 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 18ba1771bf04..d579d83025c9 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -51,7 +51,7 @@ use std::{ cmp, collections::{HashMap, HashSet, VecDeque}, pin::Pin, - sync::{atomic::AtomicUsize, Arc}, + sync::Arc, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -204,12 +204,6 @@ pub struct Discovery { /// Delay to next `FIND_NODE` query. duration_to_next_find_query: Duration, - - /// Number of connected peers as reported by the blocks announcement protocol. - num_connected_peers: Arc, - - /// Number of active connections over which we interrupt the discovery process. - discovery_only_if_under_num: usize, } /// Legacy (fallback) Kademlia protocol name based on `protocol_id`. @@ -244,8 +238,6 @@ impl Discovery { protocol_id: &ProtocolId, known_peers: HashMap>, listen_addresses: Arc>>, - num_connected_peers: Arc, - discovery_only_if_under_num: usize, _peerstore_handle: Arc, ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option) { let (ping_config, ping_event_stream) = PingConfig::default(); @@ -299,8 +291,6 @@ impl Discovery { genesis_hash, fork_id, )]), - num_connected_peers, - discovery_only_if_under_num, }, ping_config, identify_config, @@ -309,11 +299,6 @@ impl Discovery { ) } - /// Get number of connected peers. - fn num_connected_peers(&self) -> usize { - self.num_connected_peers.load(std::sync::atomic::Ordering::Relaxed) - } - /// Add known peer to `Kademlia`. #[allow(unused)] pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec) { @@ -473,10 +458,7 @@ impl Stream for Discovery { if let Some(mut delay) = this.next_kad_query.take() { match delay.poll_unpin(cx) { Poll::Ready(()) => { - let num_peers = this.num_connected_peers(); - if num_peers < this.discovery_only_if_under_num && - this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES - { + if this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { let peer = PeerId::random(); log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); @@ -495,9 +477,7 @@ impl Stream for Discovery { } else { log::debug!( target: LOG_TARGET, - "discovery is paused: {num_peers}/{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}", - this.discovery_only_if_under_num, - this.find_node_queries.len(), + "discovery is paused; too many in-flight queries", ); } diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index d910d032069b..ca623a937aed 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -88,10 +88,7 @@ use std::{ future::Future, iter, pin::Pin, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, }; @@ -166,11 +163,6 @@ pub struct Litep2pNetworkBackend { /// Discovery. discovery: Discovery, - /// Number of uniquely connected peers. - /// - /// This is used to instruct the discovery about the number of connected peers. - num_uniquely_connected: Arc, - /// Connected peers. peers: HashMap, @@ -441,9 +433,6 @@ impl NetworkBackend for Litep2pNetworkBac let peer_store_handle = params.network_config.peer_store_handle(); let executor = Arc::new(Litep2pExecutor { executor: params.executor }); - let limit_discovery_under = - params.network_config.network_config.default_peers_set.out_peers as usize + 15; - let FullNetworkConfiguration { notification_protocols, request_response_protocols, @@ -547,7 +536,6 @@ impl NetworkBackend for Litep2pNetworkBac // enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it let listen_addresses = Arc::new(Default::default()); - let num_uniquely_connected = Arc::new(Default::default()); let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( @@ -557,8 +545,6 @@ impl NetworkBackend for Litep2pNetworkBac ¶ms.protocol_id, known_addresses.clone(), Arc::clone(&listen_addresses), - Arc::clone(&num_uniquely_connected), - limit_discovery_under, Arc::clone(&peer_store_handle), ); @@ -614,7 +600,6 @@ impl NetworkBackend for Litep2pNetworkBac cmd_rx, metrics, peerset_handles: notif_protocols, - num_uniquely_connected, discovery, pending_put_values: HashMap::new(), pending_get_values: HashMap::new(), @@ -691,8 +676,6 @@ impl NetworkBackend for Litep2pNetworkBac log::debug!(target: LOG_TARGET, "starting litep2p network backend"); loop { - self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed); - tokio::select! { command = self.cmd_rx.next() => match command { None => return, From 7bf91eba2fb7e9a960aa19f542f36ac4625dd79b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 Sep 2024 16:11:26 +0000 Subject: [PATCH 15/20] Revert "litep2p: Periodically perform kademlia queries instead if under number" This reverts commit b6c99011ac9cfb2058746c02e79b7c01d4627f77. --- .../client/network/src/litep2p/discovery.rs | 26 ++++++++++++++++--- substrate/client/network/src/litep2p/mod.rs | 19 +++++++++++++- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index d579d83025c9..18ba1771bf04 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -51,7 +51,7 @@ use std::{ cmp, collections::{HashMap, HashSet, VecDeque}, pin::Pin, - sync::Arc, + sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll}, time::{Duration, Instant}, }; @@ -204,6 +204,12 @@ pub struct Discovery { /// Delay to next `FIND_NODE` query. duration_to_next_find_query: Duration, + + /// Number of connected peers as reported by the blocks announcement protocol. + num_connected_peers: Arc, + + /// Number of active connections over which we interrupt the discovery process. + discovery_only_if_under_num: usize, } /// Legacy (fallback) Kademlia protocol name based on `protocol_id`. @@ -238,6 +244,8 @@ impl Discovery { protocol_id: &ProtocolId, known_peers: HashMap>, listen_addresses: Arc>>, + num_connected_peers: Arc, + discovery_only_if_under_num: usize, _peerstore_handle: Arc, ) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option) { let (ping_config, ping_event_stream) = PingConfig::default(); @@ -291,6 +299,8 @@ impl Discovery { genesis_hash, fork_id, )]), + num_connected_peers, + discovery_only_if_under_num, }, ping_config, identify_config, @@ -299,6 +309,11 @@ impl Discovery { ) } + /// Get number of connected peers. + fn num_connected_peers(&self) -> usize { + self.num_connected_peers.load(std::sync::atomic::Ordering::Relaxed) + } + /// Add known peer to `Kademlia`. #[allow(unused)] pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec) { @@ -458,7 +473,10 @@ impl Stream for Discovery { if let Some(mut delay) = this.next_kad_query.take() { match delay.poll_unpin(cx) { Poll::Ready(()) => { - if this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { + let num_peers = this.num_connected_peers(); + if num_peers < this.discovery_only_if_under_num && + this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES + { let peer = PeerId::random(); log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); @@ -477,7 +495,9 @@ impl Stream for Discovery { } else { log::debug!( target: LOG_TARGET, - "discovery is paused; too many in-flight queries", + "discovery is paused: {num_peers}/{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}", + this.discovery_only_if_under_num, + this.find_node_queries.len(), ); } diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index ca623a937aed..d910d032069b 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -88,7 +88,10 @@ use std::{ future::Future, iter, pin::Pin, - sync::{atomic::Ordering, Arc}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, Instant}, }; @@ -163,6 +166,11 @@ pub struct Litep2pNetworkBackend { /// Discovery. discovery: Discovery, + /// Number of uniquely connected peers. + /// + /// This is used to instruct the discovery about the number of connected peers. + num_uniquely_connected: Arc, + /// Connected peers. peers: HashMap, @@ -433,6 +441,9 @@ impl NetworkBackend for Litep2pNetworkBac let peer_store_handle = params.network_config.peer_store_handle(); let executor = Arc::new(Litep2pExecutor { executor: params.executor }); + let limit_discovery_under = + params.network_config.network_config.default_peers_set.out_peers as usize + 15; + let FullNetworkConfiguration { notification_protocols, request_response_protocols, @@ -536,6 +547,7 @@ impl NetworkBackend for Litep2pNetworkBac // enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it let listen_addresses = Arc::new(Default::default()); + let num_uniquely_connected = Arc::new(Default::default()); let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) = Discovery::new( @@ -545,6 +557,8 @@ impl NetworkBackend for Litep2pNetworkBac ¶ms.protocol_id, known_addresses.clone(), Arc::clone(&listen_addresses), + Arc::clone(&num_uniquely_connected), + limit_discovery_under, Arc::clone(&peer_store_handle), ); @@ -600,6 +614,7 @@ impl NetworkBackend for Litep2pNetworkBac cmd_rx, metrics, peerset_handles: notif_protocols, + num_uniquely_connected, discovery, pending_put_values: HashMap::new(), pending_get_values: HashMap::new(), @@ -676,6 +691,8 @@ impl NetworkBackend for Litep2pNetworkBac log::debug!(target: LOG_TARGET, "starting litep2p network backend"); loop { + self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed); + tokio::select! { command = self.cmd_rx.next() => match command { None => return, From 5d09d4a6c337887fe58420fc4c79e4dc3e074393 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 13 Sep 2024 13:21:46 +0000 Subject: [PATCH 16/20] litep2p: Change heuristic to 3 times the required discovery number This is mainly done to keep a healthy subset of the network in the node's memory and routing table. Otherwise, we may risk trading off discoverability with protocol performance, which is not entirely desirable. Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 18ba1771bf04..a2f57be7827f 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -474,7 +474,7 @@ impl Stream for Discovery { match delay.poll_unpin(cx) { Poll::Ready(()) => { let num_peers = this.num_connected_peers(); - if num_peers < this.discovery_only_if_under_num && + if num_peers < 3 * this.discovery_only_if_under_num && this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { let peer = PeerId::random(); From f676446895564764a45c7cc5b347b23b8d5e8a26 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 16 Sep 2024 08:08:40 +0000 Subject: [PATCH 17/20] litep2p/discovery: Fallback to 2x factor for discovery Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index a2f57be7827f..50b41e94e7ad 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -474,11 +474,11 @@ impl Stream for Discovery { match delay.poll_unpin(cx) { Poll::Ready(()) => { let num_peers = this.num_connected_peers(); - if num_peers < 3 * this.discovery_only_if_under_num && + if num_peers < 2 * this.discovery_only_if_under_num && this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { let peer = PeerId::random(); - log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?}"); + log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?} {num_peers}/2x{} connected peers", this.discovery_only_if_under_num,); if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { this.find_node_queries.insert(query_id, std::time::Instant::now()); @@ -495,7 +495,7 @@ impl Stream for Discovery { } else { log::debug!( target: LOG_TARGET, - "discovery is paused: {num_peers}/{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}", + "discovery is paused: {num_peers}/2x{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}", this.discovery_only_if_under_num, this.find_node_queries.len(), ); From a8e1cae200431b3927c9334239ab6685c9b6f4b3 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 16 Sep 2024 15:39:38 +0000 Subject: [PATCH 18/20] discovery: Introduce mandatory query for discovery every 16 minutes Signed-off-by: Alexandru Vasile --- .../client/network/src/litep2p/discovery.rs | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 73f3ea5429e6..81d1c5d1e18a 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -69,6 +69,12 @@ const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5); /// The time is reset to `KADEMLIA_QUERY_INTERVAL` after a failed query. const CONVERGENCE_QUERY_INTERVAL: Duration = Duration::from_secs(120); +/// Ensure at least one discovery query is issued every 16 minutes. +/// +/// This has a low impact on the networking backend, while keeping a healthy +/// subset of the network discovered. +const MANDATORY_QUERY_INTERVAL: Duration = Duration::from_secs(16 * 60); + /// mDNS query interval. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30); @@ -215,6 +221,9 @@ pub struct Discovery { /// Delay to next `FIND_NODE` query. duration_to_next_find_query: Duration, + /// Delay to next mandatory `FIND_NODE` query. + next_mandatory_kad_query: Option, + /// Number of connected peers as reported by the blocks announcement protocol. num_connected_peers: Arc, @@ -299,6 +308,7 @@ impl Discovery { find_node_queries: HashMap::new(), pending_events: VecDeque::new(), duration_to_next_find_query: Duration::from_secs(1), + next_mandatory_kad_query: Some(Delay::new(MANDATORY_QUERY_INTERVAL)), address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)), allow_non_global_addresses: config.allow_non_globals_in_dht, public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(), @@ -497,6 +507,27 @@ impl Stream for Discovery { return Poll::Ready(Some(event)) } + if let Some(mut delay) = this.next_mandatory_kad_query.take() { + match delay.poll_unpin(cx) { + Poll::Ready(()) => { + if this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { + let peer = PeerId::random(); + log::debug!(target: LOG_TARGET, "start next mandatory kademlia query for {peer:?} {num_peers}/2x{} connected peers", this.discovery_only_if_under_num,); + + if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { + this.next_mandatory_kad_query = + Some(Delay::new(MANDATORY_QUERY_INTERVAL)); + + return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + } + } + }, + Poll::Pending => { + this.next_mandatory_kad_query = Some(delay); + }, + } + } + if let Some(mut delay) = this.next_kad_query.take() { match delay.poll_unpin(cx) { Poll::Ready(()) => { From fa3ce932894a12e91459fa819c5316e92af7263b Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Mon, 16 Sep 2024 15:43:14 +0000 Subject: [PATCH 19/20] litep2p/discovery: Better logs for mandatory queries Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 81d1c5d1e18a..42e8f8e85f4c 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -507,19 +507,26 @@ impl Stream for Discovery { return Poll::Ready(Some(event)) } + // Mandatory kad random queries issued periodically. if let Some(mut delay) = this.next_mandatory_kad_query.take() { match delay.poll_unpin(cx) { Poll::Ready(()) => { if this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES { let peer = PeerId::random(); - log::debug!(target: LOG_TARGET, "start next mandatory kademlia query for {peer:?} {num_peers}/2x{} connected peers", this.discovery_only_if_under_num,); + log::debug!(target: LOG_TARGET, "start next mandatory kademlia query for {peer:?}"); if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) { + this.find_node_queries.insert(query_id, std::time::Instant::now()); + this.next_mandatory_kad_query = Some(Delay::new(MANDATORY_QUERY_INTERVAL)); return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + } else { + log::error!(target: LOG_TARGET, "Kademlia mandatory query cannot be started: handler channel is full"); } + } else { + log::debug!(target: LOG_TARGET, "discovery is paused: too many in-flight queries"); } }, Poll::Pending => { @@ -528,6 +535,8 @@ impl Stream for Discovery { } } + // Exponential timer that checks more frequently if the node is under a number + // of healthy connected peers reported by the sync protocol. if let Some(mut delay) = this.next_kad_query.take() { match delay.poll_unpin(cx) { Poll::Ready(()) => { @@ -549,6 +558,8 @@ impl Stream for Discovery { Some(Delay::new(this.duration_to_next_find_query)); return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted)) + } else { + log::error!(target: LOG_TARGET, "Kademlia mandatory query cannot be started: handler channel is full"); } } else { log::debug!( From 2ab14ee0361020b2c52681a784795f9e42ce527c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 17 Sep 2024 10:55:16 +0000 Subject: [PATCH 20/20] litep2p/discovery: Periodic kad queries every 30 minutes Signed-off-by: Alexandru Vasile --- substrate/client/network/src/litep2p/discovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 42e8f8e85f4c..7bab1ae191ed 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -69,11 +69,11 @@ const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5); /// The time is reset to `KADEMLIA_QUERY_INTERVAL` after a failed query. const CONVERGENCE_QUERY_INTERVAL: Duration = Duration::from_secs(120); -/// Ensure at least one discovery query is issued every 16 minutes. +/// Ensure at least one discovery query is issued periodically. /// /// This has a low impact on the networking backend, while keeping a healthy /// subset of the network discovered. -const MANDATORY_QUERY_INTERVAL: Duration = Duration::from_secs(16 * 60); +const MANDATORY_QUERY_INTERVAL: Duration = Duration::from_secs(30 * 60); /// mDNS query interval. const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);