Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

litep2p/kad: Configure periodic network bootstrap for better throughput #4942

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a126147
net/litep2p: Update connected peers from network backend sooner
lexnv Jul 1, 2024
d9b57ee
net/litep2p: Propagate connected peers to the discovery mechanism
lexnv Jul 1, 2024
3b2c5ac
net/litep2p: Start random walks if under connected peers limit
lexnv Jul 1, 2024
b61e973
net/kad: Ensure timer moves forward
lexnv Jul 3, 2024
837f196
net/kad: Keep record of inflight kad queries
lexnv Jul 3, 2024
5610b95
net/kad: Reset kad query timer on failure
lexnv Jul 3, 2024
f5aee82
net/kad: Downgrade logs to trace and debug
lexnv Jul 3, 2024
6ac1bff
net/kad: Bound maximum number of queries
lexnv Jul 3, 2024
4983b89
litep2p: Rename num connected peers to num sync peers
lexnv Jul 8, 2024
32c9257
litep2p: Rename function to illustrate sync peers
lexnv Jul 8, 2024
2a6d231
litep2p: Improve comment wrt num_sync_connected
lexnv Jul 8, 2024
c71fa9e
litep2p: Extract number of distinct connected peers
lexnv Jul 8, 2024
24bafe3
litep2p: Remove fetch sync peers method
lexnv Jul 11, 2024
b6c9901
litep2p: Periodically perform kademlia queries instead if under number
lexnv Jul 16, 2024
7bf91eb
Revert "litep2p: Periodically perform kademlia queries instead if und…
lexnv Sep 17, 2024
5d09d4a
litep2p: Change heuristic to 3 times the required discovery number
lexnv Sep 13, 2024
f676446
litep2p/discovery: Fallback to 2x factor for discovery
lexnv Sep 16, 2024
0124309
Merge remote-tracking branch 'origin/master' into lexnv/litep2p-aggr-…
lexnv Sep 17, 2024
a8e1cae
discovery: Introduce mandatory query for discovery every 16 minutes
lexnv Sep 16, 2024
fa3ce93
litep2p/discovery: Better logs for mandatory queries
lexnv Sep 16, 2024
2ab14ee
litep2p/discovery: Periodic kad queries every 30 minutes
lexnv Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 108 additions & 27 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use std::{
collections::{HashMap, HashSet, VecDeque},
num::NonZeroUsize,
pin::Pin,
sync::Arc,
sync::{atomic::AtomicUsize, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
Expand All @@ -63,6 +63,18 @@ const LOG_TARGET: &str = "sub-libp2p::discovery";
/// Kademlia query interval.
const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);

/// The convergence time between 2 `FIND_NODE` queries.
///
/// The time is exponentially increased after each query until it reaches 120 seconds.
/// The time is reset to `KADEMLIA_QUERY_INTERVAL` after a failed query.
const CONVERGENCE_QUERY_INTERVAL: Duration = Duration::from_secs(120);

/// Ensure at least one discovery query is issued periodically.
///
/// This has a low impact on the networking backend, while keeping a healthy
/// subset of the network discovered.
const MANDATORY_QUERY_INTERVAL: Duration = Duration::from_secs(30 * 60);

/// mDNS query interval.
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);

Expand All @@ -78,6 +90,9 @@ const MAX_EXTERNAL_ADDRESSES: u32 = 32;
/// addresses a bit more robust.
const MIN_ADDRESS_CONFIRMATIONS: usize = 2;

/// Maximum number of in-flight `FIND_NODE` queries.
const MAX_INFLIGHT_FIND_NODE_QUERIES: usize = 16;

/// Discovery events.
#[derive(Debug)]
pub enum DiscoveryEvent {
Expand Down Expand Up @@ -182,8 +197,8 @@ pub struct Discovery {
/// If `None`, there is currently a query pending.
next_kad_query: Option<Delay>,

/// Active `FIND_NODE` query if it exists.
find_node_query_id: Option<QueryId>,
/// Active `FIND_NODE` queries.
find_node_queries: HashMap<QueryId, std::time::Instant>,

/// Pending events.
pending_events: VecDeque<DiscoveryEvent>,
Expand All @@ -205,6 +220,15 @@ pub struct Discovery {

/// Delay to next `FIND_NODE` query.
duration_to_next_find_query: Duration,

/// Delay to next mandatory `FIND_NODE` query.
next_mandatory_kad_query: Option<Delay>,

/// Number of connected peers as reported by the blocks announcement protocol.
num_connected_peers: Arc<AtomicUsize>,

/// Number of active connections over which we interrupt the discovery process.
discovery_only_if_under_num: usize,
}

/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
Expand Down Expand Up @@ -239,6 +263,8 @@ impl Discovery {
protocol_id: &ProtocolId,
known_peers: HashMap<PeerId, Vec<Multiaddr>>,
listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
num_connected_peers: Arc<AtomicUsize>,
discovery_only_if_under_num: usize,
_peerstore_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
Expand Down Expand Up @@ -279,9 +305,10 @@ impl Discovery {
kademlia_handle,
_peerstore_handle,
listen_addresses,
find_node_query_id: None,
find_node_queries: HashMap::new(),
pending_events: VecDeque::new(),
duration_to_next_find_query: Duration::from_secs(1),
next_mandatory_kad_query: Some(Delay::new(MANDATORY_QUERY_INTERVAL)),
address_confirmations: LruMap::new(ByLength::new(MAX_EXTERNAL_ADDRESSES)),
allow_non_global_addresses: config.allow_non_globals_in_dht,
public_addresses: config.public_addresses.iter().cloned().map(Into::into).collect(),
Expand All @@ -290,6 +317,8 @@ impl Discovery {
genesis_hash,
fork_id,
)]),
num_connected_peers,
discovery_only_if_under_num,
},
ping_config,
identify_config,
Expand All @@ -298,6 +327,11 @@ impl Discovery {
)
}

/// Get number of connected peers.
fn num_connected_peers(&self) -> usize {
self.num_connected_peers.load(std::sync::atomic::Ordering::Relaxed)
}

/// Add known peer to `Kademlia`.
#[allow(unused)]
pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
Expand Down Expand Up @@ -473,44 +507,92 @@ impl Stream for Discovery {
return Poll::Ready(Some(event))
}

if let Some(mut delay) = this.next_kad_query.take() {
// Mandatory kad random queries issued periodically.
if let Some(mut delay) = this.next_mandatory_kad_query.take() {
match delay.poll_unpin(cx) {
Poll::Ready(()) => {
if this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES {
let peer = PeerId::random();
log::debug!(target: LOG_TARGET, "start next mandatory kademlia query for {peer:?}");

if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) {
this.find_node_queries.insert(query_id, std::time::Instant::now());

this.next_mandatory_kad_query =
Some(Delay::new(MANDATORY_QUERY_INTERVAL));

return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
} else {
log::error!(target: LOG_TARGET, "Kademlia mandatory query cannot be started: handler channel is full");
}
} else {
log::debug!(target: LOG_TARGET, "discovery is paused: too many in-flight queries");
}
},
Poll::Pending => {
this.next_kad_query = Some(delay);
this.next_mandatory_kad_query = Some(delay);
},
}
}

// Exponential timer that checks more frequently if the node is under a number
// of healthy connected peers reported by the sync protocol.
if let Some(mut delay) = this.next_kad_query.take() {
match delay.poll_unpin(cx) {
Poll::Ready(()) => {
let peer = PeerId::random();
let num_peers = this.num_connected_peers();
if num_peers < 2 * this.discovery_only_if_under_num &&
this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES
{
let peer = PeerId::random();
log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?} {num_peers}/2x{} connected peers", this.discovery_only_if_under_num,);

log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) {
this.find_node_queries.insert(query_id, std::time::Instant::now());

match this.kademlia_handle.try_find_node(peer) {
Ok(query_id) => {
this.find_node_query_id = Some(query_id);
return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
},
Err(()) => {
this.duration_to_next_find_query = cmp::min(
this.duration_to_next_find_query * 2,
Duration::from_secs(60),
CONVERGENCE_QUERY_INTERVAL,
);
this.next_kad_query =
Some(Delay::new(this.duration_to_next_find_query));
},

return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
} else {
log::error!(target: LOG_TARGET, "Kademlia mandatory query cannot be started: handler channel is full");
}
} else {
log::debug!(
target: LOG_TARGET,
"discovery is paused: {num_peers}/2x{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}",
this.discovery_only_if_under_num,
this.find_node_queries.len(),
);
}

this.duration_to_next_find_query =
cmp::min(this.duration_to_next_find_query * 2, CONVERGENCE_QUERY_INTERVAL);
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
},
Poll::Pending => {
this.next_kad_query = Some(delay);
},
}
}

match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => {
Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, query_id, .. })) => {
// the addresses are already inserted into the DHT and in `TransportManager` so
// there is no need to add them again. The found peers must be registered to
// `Peerstore` so other protocols are aware of them through `Peerset`.
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());

this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
if let Some(instant) = this.find_node_queries.remove(&query_id) {
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?} in {:?}", peers.len(), instant.elapsed());
} else {
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?}", peers.len());
}

return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
peers: peers.into_iter().map(|(peer, _)| peer).collect(),
Expand All @@ -534,15 +616,14 @@ impl Stream for Discovery {
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
match this.find_node_query_id == Some(query_id) {
true => {
this.find_node_query_id = None;
this.duration_to_next_find_query =
cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
},
false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
if let Some(instant) = this.find_node_queries.remove(&query_id) {
this.duration_to_next_find_query = KADEMLIA_QUERY_INTERVAL;
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));

log::debug!(target: LOG_TARGET, "dht random walk failed for {query_id:?} in {:?}", instant.elapsed());
}

return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id }));
},
Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
log::trace!(
Expand Down
35 changes: 21 additions & 14 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,10 @@ pub struct Litep2pNetworkBackend {
/// Discovery.
discovery: Discovery,

/// Number of connected peers.
num_connected: Arc<AtomicUsize>,
/// Number of uniquely connected peers.
///
/// This is used to instruct the discovery about the number of connected peers.
num_uniquely_connected: Arc<AtomicUsize>,

/// Connected peers.
peers: HashMap<litep2p::PeerId, ConnectionContext>,
Expand Down Expand Up @@ -436,6 +438,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let peer_store_handle = params.network_config.peer_store_handle();
let executor = Arc::new(Litep2pExecutor { executor: params.executor });

let limit_discovery_under =
params.network_config.network_config.default_peers_set.out_peers as usize + 15;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if in_peers is lower than 15?

Do you think the discovery should be guided by the number of sync peers connected, and not by the number of known peers?

Also, I would make sure we do a random walk at least once per minute as it was before, even if we have enough peers connected (make discovery_only_if_under_num slow down queries, but not stop them completely).


let FullNetworkConfiguration {
notification_protocols,
request_response_protocols,
Expand All @@ -449,6 +454,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
// to the protocol's `Peerset` together with the protocol name to allow other subsystems
// of Polkadot SDK to control connectivity of the notification protocol
let block_announce_protocol = params.block_announce_config.protocol_name().clone();
let num_sync_connected = params.block_announce_config.handle.connected_peers.clone();
let mut notif_protocols = HashMap::from_iter([(
params.block_announce_config.protocol_name().clone(),
params.block_announce_config.handle,
Expand Down Expand Up @@ -538,6 +544,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac

// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
let listen_addresses = Arc::new(Default::default());
let num_uniquely_connected = Arc::new(Default::default());

let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
Discovery::new(
&network_config,
Expand All @@ -546,6 +554,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
&params.protocol_id,
known_addresses.clone(),
Arc::clone(&listen_addresses),
Arc::clone(&num_uniquely_connected),
limit_discovery_under,
Arc::clone(&peer_store_handle),
);

Expand Down Expand Up @@ -599,20 +609,19 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
));

// register rest of the metrics now that `Litep2p` has been created
let num_connected = Arc::new(Default::default());
let bandwidth: Arc<dyn BandwidthSink> =
Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });

if let Some(registry) = &params.metrics_registry {
MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
MetricSources::register(registry, bandwidth, Arc::clone(&num_sync_connected))?;
}

Ok(Self {
network_service,
cmd_rx,
metrics,
peerset_handles: notif_protocols,
num_connected,
num_uniquely_connected,
discovery,
pending_put_values: HashMap::new(),
pending_get_values: HashMap::new(),
Expand Down Expand Up @@ -691,11 +700,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
log::debug!(target: LOG_TARGET, "starting litep2p network backend");

loop {
let num_connected_peers = self
.peerset_handles
.get(&self.block_announce_protocol)
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
self.num_connected.store(num_connected_peers, Ordering::Relaxed);
self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed);

tokio::select! {
command = self.cmd_rx.next() => match command {
Expand All @@ -722,11 +727,13 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.event_streams.push(tx);
}
NetworkServiceCommand::Status { tx } => {
let num_connected_peers = self
.peerset_handles
.get(&self.block_announce_protocol)
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));

let _ = tx.send(NetworkStatus {
num_connected_peers: self
.peerset_handles
.get(&self.block_announce_protocol)
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
num_connected_peers,
total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
});
Expand Down
Loading