Skip to content

Commit

Permalink
authorithy-discovery: Make changing of peer-id while active a bit mor…
Browse files Browse the repository at this point in the history
…e robust

In the case when nodes don't persist their node-key or they want to
generate a new one while being in the active set, things go wrong
because both the old addresses and the new ones will still be present in
DHT, so because of the distributed nature of the DHT both will survive
in the network untill the old ones expires which is 36 hours.
Nodes in the network will randomly resolve the authorithy-id to the old
address or the new one.

More details in: #3673

This PR proposes we mitigate this problem, by:

1. Let the query for a DHT key retrieve all the results, that is usually
   bounded by the replication factor which is 20, currently we interrupt
   the querry on the first result.
2. Modify the authority-discovery service to keep all the discovered
   addresses around for 24h since they last seen an address.
3. Plumb through other subsystems where the assumption was that an
   authorithy-id will resolve only to one PeerId. Currently, the
   authorithy-discovery keeps just the last record it received from DHT
   and queries the DHT every 10 minutes. But they could always receive
   only the old address, only the new address or a flip-flop between
   them depending on what node wins the race to provide the record
4. Update gossip-support to try resolve authorities more often than
   every session.

This would gives us a lot more chances for the nodes in the networks to
also discover not only the old address of the node but also the new one
and should improve the time it takes for a node to be properly connected
in the network. The behaviour won't be deterministic because there is no
guarantee the all nodes will see the new record at least once, since
they could query only nodes that have the old one.

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
alexggh committed Mar 21, 2024
1 parent 8b3bf39 commit 0103c7f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 24 deletions.
30 changes: 25 additions & 5 deletions polkadot/node/network/gossip-support/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ const BACKOFF_DURATION: Duration = Duration::from_secs(5);
#[cfg(test)]
const BACKOFF_DURATION: Duration = Duration::from_millis(500);

// The authorithy_discovery queries runs every ten minutes,
// so no point in trying more often than that, so let's
// try re-resolving the authorithies every 10 minutes and force
// the reconnection to the ones that changed their address.
const TRY_RECONNECT_AFTER: Duration = Duration::from_secs(60 * 10);

/// Duration after which we consider low connectivity a problem.
///
/// Especially at startup low connectivity is expected (authority discovery cache needs to be
Expand All @@ -91,6 +97,14 @@ pub struct GossipSupport<AD> {
// `None` otherwise.
last_failure: Option<Instant>,

// Validators can restart during a session, so if they change
// their PeerID, we will connect to them in the best case after
// a session, so we need to try more often to resolved peers and
// reconnect to them. The authorithy_discovery queries runs every ten
// minutes, so no point in trying more often than that, so let's
// try reconnecting every 10 minutes here as well.
last_connection_request: Option<Instant>,

/// First time we did not reach our connectivity threshold.
///
/// This is the time of the first failed attempt to connect to >2/3 of all validators in a
Expand Down Expand Up @@ -131,6 +145,7 @@ where
keystore,
last_session_index: None,
last_failure: None,
last_connection_request: None,
failure_start: None,
resolved_authorities: HashMap::new(),
connected_authorities: HashMap::new(),
Expand Down Expand Up @@ -196,7 +211,11 @@ where
for leaf in leaves {
let current_index = util::request_session_index_for_child(leaf, sender).await.await??;
let since_failure = self.last_failure.map(|i| i.elapsed()).unwrap_or_default();
let force_request = since_failure >= BACKOFF_DURATION;
let since_last_reconnect =
self.last_connection_request.map(|i| i.elapsed()).unwrap_or_default();

let force_request =
since_failure >= BACKOFF_DURATION || since_last_reconnect >= TRY_RECONNECT_AFTER;
let leaf_session = Some((current_index, leaf));
let maybe_new_session = match self.last_session_index {
Some(i) if current_index <= i => None,
Expand Down Expand Up @@ -248,7 +267,7 @@ where
// connections to a much broader set of validators.
{
let mut connections = authorities_past_present_future(sender, leaf).await?;

self.last_connection_request = Some(Instant::now());
// Remove all of our locally controlled validator indices so we don't connect to
// ourself.
let connections =
Expand Down Expand Up @@ -405,10 +424,11 @@ where
.await
.into_iter()
.flat_map(|list| list.into_iter())
.find_map(|addr| parse_addr(addr).ok().map(|(p, _)| p));
.flat_map(|addr| parse_addr(addr).ok().map(|(p, _)| p))
.collect::<Vec<_>>();

if let Some(p) = peer_id {
authority_ids.entry(p).or_default().insert(authority);
for p in peer_id {
authority_ids.entry(p).or_default().insert(authority.clone());
}
}

Expand Down
27 changes: 19 additions & 8 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
collections::{HashMap, HashSet},
marker::PhantomData,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -139,6 +139,11 @@ pub struct Worker<Client, Network, Block, DhtEventStream> {
/// Set of in-flight lookups.
in_flight_lookups: HashMap<KademliaKey, AuthorityId>,

/// Set of lookups we can still received records.
/// These are the entries in the `in_flight_lookups` for which
/// we got at least on successfull result.
known_lookups: HashMap<KademliaKey, AuthorityId>,

addr_cache: addr_cache::AddrCache,

metrics: Option<Metrics>,
Expand Down Expand Up @@ -237,6 +242,7 @@ where
query_interval,
pending_lookups: Vec::new(),
in_flight_lookups: HashMap::new(),
known_lookups: HashMap::new(),
addr_cache,
role,
metrics,
Expand Down Expand Up @@ -292,9 +298,7 @@ where
fn process_message_from_service(&self, msg: ServicetoWorkerMsg) {
match msg {
ServicetoWorkerMsg::GetAddressesByAuthorityId(authority, sender) => {
let _ = sender.send(
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
let _ = sender.send(self.addr_cache.get_addresses_by_authority_id(&authority));
},
ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
let _ = sender
Expand Down Expand Up @@ -408,6 +412,7 @@ where
// Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
// query interval ticks are far enough apart for all lookups to succeed.
self.in_flight_lookups.clear();
self.known_lookups.clear();

if let Some(metrics) = &self.metrics {
metrics
Expand Down Expand Up @@ -500,10 +505,16 @@ where
.map_err(|_| Error::ReceivingDhtValueFoundEventWithDifferentKeys)?
.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?;

let authority_id: AuthorityId = self
.in_flight_lookups
.remove(&remote_key)
.ok_or(Error::ReceivingUnexpectedRecord)?;
let authority_id: AuthorityId =
if let Some(authority_id) = self.in_flight_lookups.remove(&remote_key) {
authority_id
} else if let Some(authority_id) = self.known_lookups.get(&remote_key) {
authority_id.clone()
} else {
return Err(Error::ReceivingUnexpectedRecord);
};

self.known_lookups.insert(remote_key.clone(), authority_id.clone());

let local_peer_id = self.network.local_peer_id();

Expand Down
42 changes: 34 additions & 8 deletions substrate/client/authority-discovery/src/worker/addr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use libp2p::{
PeerId,
};
use sp_authority_discovery::AuthorityId;
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
time::Instant,
};

/// Cache for [`AuthorityId`] -> [`HashSet<Multiaddr>`] and [`PeerId`] -> [`HashSet<AuthorityId>`]
/// mappings.
Expand All @@ -33,7 +36,7 @@ pub(super) struct AddrCache {
/// Since we may store the mapping across several sessions, a single
/// `PeerId` might correspond to multiple `AuthorityId`s. However,
/// it's not expected that a single `AuthorityId` can have multiple `PeerId`s.
authority_id_to_addresses: HashMap<AuthorityId, HashSet<Multiaddr>>,
authority_id_to_addresses: HashMap<AuthorityId, HashMap<Multiaddr, Instant>>,
peer_id_to_authority_ids: HashMap<PeerId, HashSet<AuthorityId>>,
}

Expand All @@ -48,8 +51,12 @@ impl AddrCache {
/// Inserts the given [`AuthorityId`] and [`Vec<Multiaddr>`] pair for future lookups by
/// [`AuthorityId`] or [`PeerId`].
pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec<Multiaddr>) {
let addresses = addresses.into_iter().collect::<HashSet<_>>();
let peer_ids = addresses_to_peer_ids(&addresses);
let mut addresses = addresses
.into_iter()
.map(|addr| (addr, Instant::now() + std::time::Duration::from_secs(24 * 60 * 60)))
.collect::<HashMap<_, _>>();

let mut peer_ids = addresses_to_peer_ids(&addresses);

if peer_ids.is_empty() {
log::debug!(
Expand All @@ -74,9 +81,26 @@ impl AddrCache {
"Found addresses for authority {authority_id:?}: {addresses:?}",
);

let old_addresses =
self.authority_id_to_addresses.get(&authority_id).cloned().unwrap_or_default();

let time_now = Instant::now();

let to_keep_addresses = old_addresses
.iter()
.filter(|(addr, expires)| **expires >= time_now && !addresses.contains_key(addr))
.map(|(addr, expires)| (addr.clone(), expires.clone()))
.collect::<HashMap<_, _>>();

addresses.extend(to_keep_addresses.clone());

let old_addresses = self.authority_id_to_addresses.insert(authority_id.clone(), addresses);

let old_peer_ids = addresses_to_peer_ids(&old_addresses.unwrap_or_default());

let to_kepp_peer_ids = addresses_to_peer_ids(&to_keep_addresses);
peer_ids.extend(to_kepp_peer_ids);

// Add the new peer ids
peer_ids.difference(&old_peer_ids).for_each(|new_peer_id| {
self.peer_id_to_authority_ids
Expand Down Expand Up @@ -118,8 +142,10 @@ impl AddrCache {
pub fn get_addresses_by_authority_id(
&self,
authority_id: &AuthorityId,
) -> Option<&HashSet<Multiaddr>> {
self.authority_id_to_addresses.get(authority_id)
) -> Option<HashSet<Multiaddr>> {
self.authority_id_to_addresses
.get(authority_id)
.map(|result| result.keys().map(|addr| addr.clone()).collect::<HashSet<_>>())
}

/// Returns the [`AuthorityId`]s for the given [`PeerId`].
Expand Down Expand Up @@ -170,8 +196,8 @@ fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option<PeerId> {
})
}

fn addresses_to_peer_ids(addresses: &HashSet<Multiaddr>) -> HashSet<PeerId> {
addresses.iter().filter_map(peer_id_from_multiaddr).collect::<HashSet<_>>()
fn addresses_to_peer_ids(addresses: &HashMap<Multiaddr, Instant>) -> HashSet<PeerId> {
addresses.keys().filter_map(peer_id_from_multiaddr).collect::<HashSet<_>>()
}

#[cfg(test)]
Expand Down
11 changes: 8 additions & 3 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use libp2p::{
},
PeerId,
};
use log::{debug, info, trace, warn};
use log::{debug, error, info, trace, warn};
use sp_core::hexdisplay::HexDisplay;
use std::{
cmp,
Expand Down Expand Up @@ -784,8 +784,13 @@ impl NetworkBehaviour for DiscoveryBehaviour {
// Let's directly finish the query, as we are only interested in a
// quorum of 1.
if let Some(kad) = self.kademlia.as_mut() {
if let Some(mut query) = kad.query_mut(&id) {
query.finish();
if let Some(query) = kad.query_mut(&id) {
// Let the query continue, to increase the chances we
// discover all possible addresses, for the cases where more
// addresses might exist in DHT, for example when the node
// changes its PeerId.

// query.finish();
}
}

Expand Down

0 comments on commit 0103c7f

Please sign in to comment.