diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 3ad98d44f74..8b260b96e89 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -92,6 +92,9 @@ pub struct Kademlia { /// Queued events to return when the behaviour is being polled. queued_events: VecDeque, KademliaEvent>>, + /// The currently known addresses of the local node. + local_addrs: HashSet, + /// The record storage. store: TStore, } @@ -358,6 +361,7 @@ where record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, connection_idle_timeout: config.connection_idle_timeout, + local_addrs: HashSet::with_capacity(6) } } @@ -708,7 +712,14 @@ where /// The results of the (repeated) provider announcements sent by this node are /// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`]. pub fn start_providing(&mut self, key: record::Key) -> Result { - let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone()); + // Note: We store our own provider records locally without local addresses + // to avoid redundant storage and outdated addresses. Instead these are + // acquired on demand when returning a `ProviderRecord` for the local node. + let local_addrs = Vec::new(); + let record = ProviderRecord::new( + key.clone(), + self.kbuckets.local_key().preimage().clone(), + local_addrs); self.store.add_provider(record)?; let target = kbucket::Key::new(key.clone()); let peers = self.kbuckets.closest_keys(&target); @@ -784,12 +795,42 @@ where /// Collects all peers who are known to be providers of the value for a given `Multihash`. fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec { let kbuckets = &mut self.kbuckets; + let connected = &mut self.connected_peers; + let local_addrs = &self.local_addrs; self.store.providers(key) .into_iter() .filter_map(move |p| if &p.provider != source { - let key = kbucket::Key::new(p.provider.clone()); - kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned())) + let node_id = p.provider; + let multiaddrs = p.addresses; + let connection_ty = if connected.contains(&node_id) { + KadConnectionType::Connected + } else { + KadConnectionType::NotConnected + }; + if multiaddrs.is_empty() { + // The provider is either the local node and we fill in + // the local addresses on demand, or it is a legacy + // provider record without addresses, in which case we + // try to find addresses in the routing table, as was + // done before provider records were stored along with + // their addresses. + if &node_id == kbuckets.local_key().preimage() { + Some(local_addrs.iter().cloned().collect::>()) + } else { + let key = kbucket::Key::new(node_id.clone()); + kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec()) + } + } else { + Some(multiaddrs) + } + .map(|multiaddrs| { + KadPeer { + node_id, + multiaddrs, + connection_ty, + } + }) } else { None }) @@ -1367,7 +1408,8 @@ where let record = ProviderRecord { key, provider: provider.node_id, - expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl) + expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl), + addresses: provider.multiaddrs, }; if let Err(e) = self.store.add_provider(record) { info!("Provider record not stored: {:?}", e); @@ -1746,6 +1788,20 @@ where }; } + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.local_addrs.insert(addr.clone()); + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.local_addrs.remove(addr); + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS { + self.local_addrs.insert(addr.clone()); + } + } + fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll< NetworkBehaviourAction< as ProtocolsHandler>::InEvent, @@ -2497,3 +2553,8 @@ pub enum RoutingUpdate { /// peer ID). Failed, } + +/// The maximum number of local addresses as reported by +/// other peers that the behaviour tracks. The behaviour +/// always tracks all its listen addresses. +const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20; diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index 2e9c0248e2c..d00e27d9326 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -23,7 +23,7 @@ pub mod store; use bytes::Bytes; -use libp2p_core::PeerId; +use libp2p_core::{PeerId, Multiaddr}; use multihash::Multihash; use std::borrow::Borrow; use std::hash::{Hash, Hasher}; @@ -112,6 +112,8 @@ pub struct ProviderRecord { pub provider: PeerId, /// The expiration time as measured by a local, monotonic clock. pub expires: Option, + /// The known addresses that the provider may be listening on. + pub addresses: Vec } impl Hash for ProviderRecord { @@ -123,12 +125,15 @@ impl Hash for ProviderRecord { impl ProviderRecord { /// Creates a new provider record for insertion into a `RecordStore`. - pub fn new(key: K, provider: PeerId) -> Self + pub fn new(key: K, provider: PeerId, addresses: Vec) -> Self where K: Into { ProviderRecord { - key: key.into(), provider, expires: None + key: key.into(), + provider, + expires: None, + addresses, } } @@ -178,6 +183,7 @@ mod tests { } else { None }, + addresses: vec![], } } } diff --git a/protocols/kad/src/record/store/memory.rs b/protocols/kad/src/record/store/memory.rs index e0a2e1d25b2..d4fdad7967c 100644 --- a/protocols/kad/src/record/store/memory.rs +++ b/protocols/kad/src/record/store/memory.rs @@ -259,7 +259,7 @@ mod tests { let key = Key::from(random_multihash()); let mut records = providers.into_iter().map(|p| { - ProviderRecord::new(key.clone(), p.into_preimage()) + ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new()) }).collect::>(); for r in &records { @@ -280,7 +280,7 @@ mod tests { let id = PeerId::random(); let mut store = MemoryStore::new(id.clone()); let key = random_multihash(); - let rec = ProviderRecord::new(key, id.clone()); + let rec = ProviderRecord::new(key, id.clone(), Vec::new()); assert!(store.add_provider(rec.clone()).is_ok()); assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::>()); store.remove_provider(&rec.key, &id); @@ -292,7 +292,7 @@ mod tests { let mut store = MemoryStore::new(PeerId::random()); let key = random_multihash(); let prv = PeerId::random(); - let mut rec = ProviderRecord::new(key, prv); + let mut rec = ProviderRecord::new(key, prv, Vec::new()); assert!(store.add_provider(rec.clone()).is_ok()); assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec()); rec.expires = Some(Instant::now()); @@ -306,12 +306,12 @@ mod tests { for _ in 0 .. store.config.max_provided_keys { let key = random_multihash(); let prv = PeerId::random(); - let rec = ProviderRecord::new(key, prv); + let rec = ProviderRecord::new(key, prv, Vec::new()); let _ = store.add_provider(rec); } let key = random_multihash(); let prv = PeerId::random(); - let rec = ProviderRecord::new(key, prv); + let rec = ProviderRecord::new(key, prv, Vec::new()); match store.add_provider(rec) { Err(Error::MaxProvidedKeys) => {} _ => panic!("Unexpected result"),