From bd0c9bfbb101785963163d92da1101fca381d24a Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Mon, 17 Aug 2020 14:25:24 +0200 Subject: [PATCH 1/4] Store addresses of provider records. So far, provider records are stored without their addresses and the addresses of provider records are obtained from the routing table on demand. This has two shortcomings: 1. We can only return provider records whose provider peers happen to currently be in the local routing table. 2. The local node never returns itself as a provider for a key, even if it is indeed a provider. These issues are addressed here by storing the addresses together with the provider records, falling back to addresses from the routing table only for backward-compatibility with existing implementations of `RecordStore` using persistent storage. Resolves https://github.com/libp2p/rust-libp2p/issues/1526. --- protocols/kad/src/behaviour.rs | 69 ++++++++++++++++++++++-- protocols/kad/src/record.rs | 12 +++-- protocols/kad/src/record/store/memory.rs | 10 ++-- 3 files changed, 79 insertions(+), 12 deletions(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 3ad98d44f74..8b260b96e89 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -92,6 +92,9 @@ pub struct Kademlia { /// Queued events to return when the behaviour is being polled. queued_events: VecDeque, KademliaEvent>>, + /// The currently known addresses of the local node. + local_addrs: HashSet, + /// The record storage. store: TStore, } @@ -358,6 +361,7 @@ where record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, connection_idle_timeout: config.connection_idle_timeout, + local_addrs: HashSet::with_capacity(6) } } @@ -708,7 +712,14 @@ where /// The results of the (repeated) provider announcements sent by this node are /// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`]. pub fn start_providing(&mut self, key: record::Key) -> Result { - let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone()); + // Note: We store our own provider records locally without local addresses + // to avoid redundant storage and outdated addresses. Instead these are + // acquired on demand when returning a `ProviderRecord` for the local node. + let local_addrs = Vec::new(); + let record = ProviderRecord::new( + key.clone(), + self.kbuckets.local_key().preimage().clone(), + local_addrs); self.store.add_provider(record)?; let target = kbucket::Key::new(key.clone()); let peers = self.kbuckets.closest_keys(&target); @@ -784,12 +795,42 @@ where /// Collects all peers who are known to be providers of the value for a given `Multihash`. fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec { let kbuckets = &mut self.kbuckets; + let connected = &mut self.connected_peers; + let local_addrs = &self.local_addrs; self.store.providers(key) .into_iter() .filter_map(move |p| if &p.provider != source { - let key = kbucket::Key::new(p.provider.clone()); - kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned())) + let node_id = p.provider; + let multiaddrs = p.addresses; + let connection_ty = if connected.contains(&node_id) { + KadConnectionType::Connected + } else { + KadConnectionType::NotConnected + }; + if multiaddrs.is_empty() { + // The provider is either the local node and we fill in + // the local addresses on demand, or it is a legacy + // provider record without addresses, in which case we + // try to find addresses in the routing table, as was + // done before provider records were stored along with + // their addresses. + if &node_id == kbuckets.local_key().preimage() { + Some(local_addrs.iter().cloned().collect::>()) + } else { + let key = kbucket::Key::new(node_id.clone()); + kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec()) + } + } else { + Some(multiaddrs) + } + .map(|multiaddrs| { + KadPeer { + node_id, + multiaddrs, + connection_ty, + } + }) } else { None }) @@ -1367,7 +1408,8 @@ where let record = ProviderRecord { key, provider: provider.node_id, - expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl) + expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl), + addresses: provider.multiaddrs, }; if let Err(e) = self.store.add_provider(record) { info!("Provider record not stored: {:?}", e); @@ -1746,6 +1788,20 @@ where }; } + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.local_addrs.insert(addr.clone()); + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.local_addrs.remove(addr); + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS { + self.local_addrs.insert(addr.clone()); + } + } + fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll< NetworkBehaviourAction< as ProtocolsHandler>::InEvent, @@ -2497,3 +2553,8 @@ pub enum RoutingUpdate { /// peer ID). Failed, } + +/// The maximum number of local addresses as reported by +/// other peers that the behaviour tracks. The behaviour +/// always tracks all its listen addresses. +const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20; diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index 2e9c0248e2c..d00e27d9326 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -23,7 +23,7 @@ pub mod store; use bytes::Bytes; -use libp2p_core::PeerId; +use libp2p_core::{PeerId, Multiaddr}; use multihash::Multihash; use std::borrow::Borrow; use std::hash::{Hash, Hasher}; @@ -112,6 +112,8 @@ pub struct ProviderRecord { pub provider: PeerId, /// The expiration time as measured by a local, monotonic clock. pub expires: Option, + /// The known addresses that the provider may be listening on. + pub addresses: Vec } impl Hash for ProviderRecord { @@ -123,12 +125,15 @@ impl Hash for ProviderRecord { impl ProviderRecord { /// Creates a new provider record for insertion into a `RecordStore`. - pub fn new(key: K, provider: PeerId) -> Self + pub fn new(key: K, provider: PeerId, addresses: Vec) -> Self where K: Into { ProviderRecord { - key: key.into(), provider, expires: None + key: key.into(), + provider, + expires: None, + addresses, } } @@ -178,6 +183,7 @@ mod tests { } else { None }, + addresses: vec![], } } } diff --git a/protocols/kad/src/record/store/memory.rs b/protocols/kad/src/record/store/memory.rs index e0a2e1d25b2..d4fdad7967c 100644 --- a/protocols/kad/src/record/store/memory.rs +++ b/protocols/kad/src/record/store/memory.rs @@ -259,7 +259,7 @@ mod tests { let key = Key::from(random_multihash()); let mut records = providers.into_iter().map(|p| { - ProviderRecord::new(key.clone(), p.into_preimage()) + ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new()) }).collect::>(); for r in &records { @@ -280,7 +280,7 @@ mod tests { let id = PeerId::random(); let mut store = MemoryStore::new(id.clone()); let key = random_multihash(); - let rec = ProviderRecord::new(key, id.clone()); + let rec = ProviderRecord::new(key, id.clone(), Vec::new()); assert!(store.add_provider(rec.clone()).is_ok()); assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::>()); store.remove_provider(&rec.key, &id); @@ -292,7 +292,7 @@ mod tests { let mut store = MemoryStore::new(PeerId::random()); let key = random_multihash(); let prv = PeerId::random(); - let mut rec = ProviderRecord::new(key, prv); + let mut rec = ProviderRecord::new(key, prv, Vec::new()); assert!(store.add_provider(rec.clone()).is_ok()); assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec()); rec.expires = Some(Instant::now()); @@ -306,12 +306,12 @@ mod tests { for _ in 0 .. store.config.max_provided_keys { let key = random_multihash(); let prv = PeerId::random(); - let rec = ProviderRecord::new(key, prv); + let rec = ProviderRecord::new(key, prv, Vec::new()); let _ = store.add_provider(rec); } let key = random_multihash(); let prv = PeerId::random(); - let rec = ProviderRecord::new(key, prv); + let rec = ProviderRecord::new(key, prv, Vec::new()); match store.add_provider(rec) { Err(Error::MaxProvidedKeys) => {} _ => panic!("Unexpected result"), From d92fd500cb9ba6edd507f5bd5483ff410a2aabc3 Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Tue, 18 Aug 2020 09:01:24 +0200 Subject: [PATCH 2/4] Update protocols/kad/src/behaviour.rs Co-authored-by: Max Inden --- protocols/kad/src/behaviour.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 8b260b96e89..e9bb5589155 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -2554,7 +2554,7 @@ pub enum RoutingUpdate { Failed, } -/// The maximum number of local addresses as reported by -/// other peers that the behaviour tracks. The behaviour -/// always tracks all its listen addresses. +/// The maximum number of local external addresses. When reached any +/// further externally reported addresses are ignored. The behaviour always +/// tracks all its listen addresses. const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20; From f40e8a68fd71eb9c38ec017ac33af77199127f4e Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 18 Aug 2020 11:17:58 +0200 Subject: [PATCH 3/4] Remove negligible use of with_capacity. --- protocols/kad/src/behaviour.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index e9bb5589155..2dc3ddb0e84 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -361,7 +361,7 @@ where record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, connection_idle_timeout: config.connection_idle_timeout, - local_addrs: HashSet::with_capacity(6) + local_addrs: HashSet::new() } } From 88ba56c1e704a38197cc46e1eaef5076933771d8 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Tue, 18 Aug 2020 11:19:22 +0200 Subject: [PATCH 4/4] Update changelog. --- protocols/kad/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 0fb4f980121..ace3fa40855 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,5 +1,8 @@ # 0.22.0 [unreleased] +- Store addresses in provider records. + See [PR 1708](https://github.com/libp2p/rust-libp2p/pull/1708). + - Update `libp2p-core` and `libp2p-swarm` dependencies. - Add `KBucketRef::range` exposing the minimum inclusive and maximum inclusive