Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

litep2p/kad: Configure periodic network bootstrap for better throughput #4942

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a126147
net/litep2p: Update connected peers from network backend sooner
lexnv Jul 1, 2024
d9b57ee
net/litep2p: Propagate connected peers to the discovery mechanism
lexnv Jul 1, 2024
3b2c5ac
net/litep2p: Start random walks if under connected peers limit
lexnv Jul 1, 2024
b61e973
net/kad: Ensure timer moves forward
lexnv Jul 3, 2024
837f196
net/kad: Keep record of inflight kad queries
lexnv Jul 3, 2024
5610b95
net/kad: Reset kad query timer on failure
lexnv Jul 3, 2024
f5aee82
net/kad: Downgrade logs to trace and debug
lexnv Jul 3, 2024
6ac1bff
net/kad: Bound maximum number of queries
lexnv Jul 3, 2024
4983b89
litep2p: Rename num connected peers to num sync peers
lexnv Jul 8, 2024
32c9257
litep2p: Rename function to illustrate sync peers
lexnv Jul 8, 2024
2a6d231
litep2p: Improve comment wrt num_sync_connected
lexnv Jul 8, 2024
c71fa9e
litep2p: Extract number of distinct connected peers
lexnv Jul 8, 2024
24bafe3
litep2p: Remove fetch sync peers method
lexnv Jul 11, 2024
b6c9901
litep2p: Periodically perform kademlia queries instead if under number
lexnv Jul 16, 2024
7bf91eb
Revert "litep2p: Periodically perform kademlia queries instead if und…
lexnv Sep 17, 2024
5d09d4a
litep2p: Change heuristic to 3 times the required discovery number
lexnv Sep 13, 2024
f676446
litep2p/discovery: Fallback to 2x factor for discovery
lexnv Sep 16, 2024
0124309
Merge remote-tracking branch 'origin/master' into lexnv/litep2p-aggr-…
lexnv Sep 17, 2024
a8e1cae
discovery: Introduce mandatory query for discovery every 16 minutes
lexnv Sep 16, 2024
fa3ce93
litep2p/discovery: Better logs for mandatory queries
lexnv Sep 16, 2024
2ab14ee
litep2p/discovery: Periodic kad queries every 30 minutes
lexnv Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 67 additions & 28 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use std::{
cmp,
collections::{HashMap, HashSet, VecDeque},
pin::Pin,
sync::Arc,
sync::{atomic::AtomicUsize, Arc},
task::{Context, Poll},
time::{Duration, Instant},
};
Expand All @@ -62,12 +62,21 @@ const LOG_TARGET: &str = "sub-libp2p::discovery";
/// Kademlia query interval.
const KADEMLIA_QUERY_INTERVAL: Duration = Duration::from_secs(5);

/// The convergence time between 2 `FIND_NODE` queries.
///
/// The time is exponentially increased after each query until it reaches 120 seconds.
/// The time is reset to `KADEMLIA_QUERY_INTERVAL` after a failed query.
const CONVERGENCE_QUERY_INTERVAL: Duration = Duration::from_secs(120);

/// mDNS query interval.
const MDNS_QUERY_INTERVAL: Duration = Duration::from_secs(30);

/// Minimum number of confirmations received before an address is verified.
const MIN_ADDRESS_CONFIRMATIONS: usize = 5;

/// Maximum number of in-flight `FIND_NODE` queries.
const MAX_INFLIGHT_FIND_NODE_QUERIES: usize = 16;

/// Discovery events.
#[derive(Debug)]
pub enum DiscoveryEvent {
Expand Down Expand Up @@ -172,8 +181,8 @@ pub struct Discovery {
/// If `None`, there is currently a query pending.
next_kad_query: Option<Delay>,

/// Active `FIND_NODE` query if it exists.
find_node_query_id: Option<QueryId>,
/// Active `FIND_NODE` queries.
find_node_queries: HashMap<QueryId, std::time::Instant>,

/// Pending events.
pending_events: VecDeque<DiscoveryEvent>,
Expand All @@ -195,6 +204,12 @@ pub struct Discovery {

/// Delay to next `FIND_NODE` query.
duration_to_next_find_query: Duration,

/// Number of connected peers as reported by the blocks announcement protocol.
num_connected_peers: Arc<AtomicUsize>,

/// Number of active connections over which we interrupt the discovery process.
discovery_only_if_under_num: usize,
}

/// Legacy (fallback) Kademlia protocol name based on `protocol_id`.
Expand Down Expand Up @@ -229,6 +244,8 @@ impl Discovery {
protocol_id: &ProtocolId,
known_peers: HashMap<PeerId, Vec<Multiaddr>>,
listen_addresses: Arc<RwLock<HashSet<Multiaddr>>>,
num_connected_peers: Arc<AtomicUsize>,
discovery_only_if_under_num: usize,
_peerstore_handle: Arc<dyn PeerStoreProvider>,
) -> (Self, PingConfig, IdentifyConfig, KademliaConfig, Option<MdnsConfig>) {
let (ping_config, ping_event_stream) = PingConfig::default();
Expand Down Expand Up @@ -271,7 +288,7 @@ impl Discovery {
kademlia_handle,
_peerstore_handle,
listen_addresses,
find_node_query_id: None,
find_node_queries: HashMap::new(),
pending_events: VecDeque::new(),
duration_to_next_find_query: Duration::from_secs(1),
address_confirmations: LruMap::new(ByLength::new(8)),
Expand All @@ -282,6 +299,8 @@ impl Discovery {
genesis_hash,
fork_id,
)]),
num_connected_peers,
discovery_only_if_under_num,
},
ping_config,
identify_config,
Expand All @@ -290,6 +309,11 @@ impl Discovery {
)
}

/// Get number of connected peers.
fn num_connected_peers(&self) -> usize {
self.num_connected_peers.load(std::sync::atomic::Ordering::Relaxed)
}

/// Add known peer to `Kademlia`.
#[allow(unused)]
pub async fn add_known_peer(&mut self, peer: PeerId, addresses: Vec<Multiaddr>) {
Expand Down Expand Up @@ -448,42 +472,58 @@ impl Stream for Discovery {

if let Some(mut delay) = this.next_kad_query.take() {
match delay.poll_unpin(cx) {
Poll::Pending => {
this.next_kad_query = Some(delay);
},
Poll::Ready(()) => {
let peer = PeerId::random();
let num_peers = this.num_connected_peers();
if num_peers < this.discovery_only_if_under_num &&
this.find_node_queries.len() < MAX_INFLIGHT_FIND_NODE_QUERIES
{
let peer = PeerId::random();
log::debug!(target: LOG_TARGET, "start next kademlia query for {peer:?}");

log::trace!(target: LOG_TARGET, "start next kademlia query for {peer:?}");
if let Ok(query_id) = this.kademlia_handle.try_find_node(peer) {
this.find_node_queries.insert(query_id, std::time::Instant::now());

match this.kademlia_handle.try_find_node(peer) {
Ok(query_id) => {
this.find_node_query_id = Some(query_id);
return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
},
Err(()) => {
this.duration_to_next_find_query = cmp::min(
this.duration_to_next_find_query * 2,
Duration::from_secs(60),
CONVERGENCE_QUERY_INTERVAL,
);
this.next_kad_query =
Some(Delay::new(this.duration_to_next_find_query));
},

return Poll::Ready(Some(DiscoveryEvent::RandomKademliaStarted))
}
} else {
log::debug!(
target: LOG_TARGET,
"discovery is paused: {num_peers}/{} connected peers and in flight queries: {}/{MAX_INFLIGHT_FIND_NODE_QUERIES}",
this.discovery_only_if_under_num,
this.find_node_queries.len(),
);
}

this.duration_to_next_find_query =
cmp::min(this.duration_to_next_find_query * 2, CONVERGENCE_QUERY_INTERVAL);
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
},
Poll::Pending => {
this.next_kad_query = Some(delay);
},
}
}

match Pin::new(&mut this.kademlia_handle).poll_next(cx) {
Poll::Pending => {},
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, .. })) => {
Poll::Ready(Some(KademliaEvent::FindNodeSuccess { peers, query_id, .. })) => {
// the addresses are already inserted into the DHT and in `TransportManager` so
// there is no need to add them again. The found peers must be registered to
// `Peerstore` so other protocols are aware of them through `Peerset`.
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers", peers.len());

this.next_kad_query = Some(Delay::new(KADEMLIA_QUERY_INTERVAL));
if let Some(instant) = this.find_node_queries.remove(&query_id) {
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?} in {:?}", peers.len(), instant.elapsed());
} else {
log::trace!(target: LOG_TARGET, "dht random walk yielded {} peers for {query_id:?}", peers.len());
}

return Poll::Ready(Some(DiscoveryEvent::RoutingTableUpdate {
peers: peers.into_iter().map(|(peer, _)| peer).collect(),
Expand All @@ -507,15 +547,14 @@ impl Stream for Discovery {
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Poll::Ready(Some(KademliaEvent::QueryFailed { query_id })) => {
match this.find_node_query_id == Some(query_id) {
true => {
this.find_node_query_id = None;
this.duration_to_next_find_query =
cmp::min(this.duration_to_next_find_query * 2, Duration::from_secs(60));
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));
},
false => return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id })),
if let Some(instant) = this.find_node_queries.remove(&query_id) {
this.duration_to_next_find_query = KADEMLIA_QUERY_INTERVAL;
this.next_kad_query = Some(Delay::new(this.duration_to_next_find_query));

log::debug!(target: LOG_TARGET, "dht random walk failed for {query_id:?} in {:?}", instant.elapsed());
}

return Poll::Ready(Some(DiscoveryEvent::QueryFailed { query_id }));
},
Poll::Ready(Some(KademliaEvent::IncomingRecord { record })) => {
log::trace!(
Expand Down
50 changes: 36 additions & 14 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,15 @@ pub struct Litep2pNetworkBackend {
/// Discovery.
discovery: Discovery,

/// Number of connected peers.
num_connected: Arc<AtomicUsize>,
/// Number of connected peers over the block announce protocol.
///
/// This is used to update metrics and network status.
num_sync_connected: Arc<AtomicUsize>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a metric that should be moved to the sync crate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that sounds good, have created an issue for this:

Since this was a bit involved for the peerstore metrics :D


/// Number of uniquely connected peers.
///
/// This is used to instruct the discovery about the number of connected peers.
num_uniquely_connected: Arc<AtomicUsize>,

/// Connected peers.
peers: HashMap<litep2p::PeerId, ConnectionContext>,
Expand Down Expand Up @@ -259,6 +266,17 @@ impl Litep2pNetworkBackend {
Ok((local_identity, local_peer_id))
}

/// Fetch the number of connected peers from the peerset handle and update
/// the atomic `num_connected` shared between the network backend.
fn fetch_sync_connected_peers(&self) -> usize {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
let num_sync_connected = self
.peerset_handles
.get(&self.block_announce_protocol)
lexnv marked this conversation as resolved.
Show resolved Hide resolved
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
self.num_sync_connected.store(num_sync_connected, Ordering::Relaxed);
num_sync_connected
}

/// Configure transport protocols for `Litep2pNetworkBackend`.
fn configure_transport<B: BlockT + 'static, H: ExHashT>(
config: &FullNetworkConfiguration<B, H, Self>,
Expand Down Expand Up @@ -439,6 +457,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
let peer_store_handle = params.network_config.peer_store_handle();
let executor = Arc::new(Litep2pExecutor { executor: params.executor });

let limit_discovery_under =
params.network_config.network_config.default_peers_set.out_peers as usize + 15;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if in_peers is lower than 15?

Do you think the discovery should be guided by the number of sync peers connected, and not by the number of known peers?

Also, I would make sure we do a random walk at least once per minute as it was before, even if we have enough peers connected (make discovery_only_if_under_num slow down queries, but not stop them completely).


let FullNetworkConfiguration {
notification_protocols,
request_response_protocols,
Expand Down Expand Up @@ -541,6 +562,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac

// enable ipfs ping, identify and kademlia, and potentially mdns if user enabled it
let listen_addresses = Arc::new(Default::default());
let num_uniquely_connected = Arc::new(Default::default());

let (discovery, ping_config, identify_config, kademlia_config, maybe_mdns_config) =
Discovery::new(
&network_config,
Expand All @@ -549,6 +572,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
&params.protocol_id,
known_addresses.clone(),
Arc::clone(&listen_addresses),
Arc::clone(&num_uniquely_connected),
limit_discovery_under,
Arc::clone(&peer_store_handle),
);

Expand Down Expand Up @@ -592,20 +617,21 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
));

// register rest of the metrics now that `Litep2p` has been created
let num_connected = Arc::new(Default::default());
let bandwidth: Arc<dyn BandwidthSink> =
Arc::new(Litep2pBandwidthSink { sink: litep2p.bandwidth_sink() });

let num_sync_connected = Arc::new(Default::default());
if let Some(registry) = &params.metrics_registry {
MetricSources::register(registry, bandwidth, Arc::clone(&num_connected))?;
MetricSources::register(registry, bandwidth, Arc::clone(&num_sync_connected))?;
}

Ok(Self {
network_service,
cmd_rx,
metrics,
peerset_handles: notif_protocols,
num_connected,
num_sync_connected,
num_uniquely_connected,
discovery,
pending_put_values: HashMap::new(),
pending_get_values: HashMap::new(),
Expand Down Expand Up @@ -682,11 +708,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
log::debug!(target: LOG_TARGET, "starting litep2p network backend");

loop {
let num_connected_peers = self
.peerset_handles
.get(&self.block_announce_protocol)
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed));
self.num_connected.store(num_connected_peers, Ordering::Relaxed);
self.fetch_sync_connected_peers();
self.num_uniquely_connected.store(self.peers.len(), Ordering::Relaxed);

tokio::select! {
command = self.cmd_rx.next() => match command {
Expand All @@ -707,11 +730,10 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.event_streams.push(tx);
}
NetworkServiceCommand::Status { tx } => {
let num_connected_peers = self.fetch_sync_connected_peers();

let _ = tx.send(NetworkStatus {
num_connected_peers: self
.peerset_handles
.get(&self.block_announce_protocol)
.map_or(0usize, |handle| handle.connected_peers.load(Ordering::Relaxed)),
num_connected_peers,
total_bytes_inbound: self.litep2p.bandwidth_sink().inbound() as u64,
total_bytes_outbound: self.litep2p.bandwidth_sink().outbound() as u64,
});
Expand Down
Loading