Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kad] Store addresses of provider records. #1708

Merged
merged 5 commits into from
Aug 18, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 65 additions & 4 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub struct Kademlia<TStore> {
/// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,

/// The currently known addresses of the local node.
local_addrs: HashSet<Multiaddr>,

/// The record storage.
store: TStore,
}
Expand Down Expand Up @@ -358,6 +361,7 @@ where
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout,
local_addrs: HashSet::with_capacity(6)
romanb marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -708,7 +712,14 @@ where
/// The results of the (repeated) provider announcements sent by this node are
/// reported via [`KademliaEvent::QueryResult{QueryResult::StartProviding}`].
pub fn start_providing(&mut self, key: record::Key) -> Result<QueryId, store::Error> {
let record = ProviderRecord::new(key.clone(), self.kbuckets.local_key().preimage().clone());
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead these are
// acquired on demand when returning a `ProviderRecord` for the local node.
let local_addrs = Vec::new();
let record = ProviderRecord::new(
key.clone(),
self.kbuckets.local_key().preimage().clone(),
local_addrs);
self.store.add_provider(record)?;
let target = kbucket::Key::new(key.clone());
let peers = self.kbuckets.closest_keys(&target);
Expand Down Expand Up @@ -784,12 +795,42 @@ where
/// Collects all peers who are known to be providers of the value for a given `Multihash`.
fn provider_peers(&mut self, key: &record::Key, source: &PeerId) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets;
let connected = &mut self.connected_peers;
let local_addrs = &self.local_addrs;
self.store.providers(key)
.into_iter()
.filter_map(move |p|
if &p.provider != source {
let key = kbucket::Key::new(p.provider.clone());
kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned()))
let node_id = p.provider;
let multiaddrs = p.addresses;
let connection_ty = if connected.contains(&node_id) {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected
};
if multiaddrs.is_empty() {
// The provider is either the local node and we fill in
// the local addresses on demand, or it is a legacy
// provider record without addresses, in which case we
// try to find addresses in the routing table, as was
// done before provider records were stored along with
// their addresses.
mxinden marked this conversation as resolved.
Show resolved Hide resolved
if &node_id == kbuckets.local_key().preimage() {
Some(local_addrs.iter().cloned().collect::<Vec<_>>())
} else {
let key = kbucket::Key::new(node_id.clone());
kbuckets.entry(&key).view().map(|e| e.node.value.clone().into_vec())
}
} else {
Some(multiaddrs)
}
.map(|multiaddrs| {
KadPeer {
node_id,
multiaddrs,
connection_ty,
}
})
} else {
None
})
Expand Down Expand Up @@ -1367,7 +1408,8 @@ where
let record = ProviderRecord {
key,
provider: provider.node_id,
expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl)
expires: self.provider_record_ttl.map(|ttl| Instant::now() + ttl),
addresses: provider.multiaddrs,
};
if let Err(e) = self.store.add_provider(record) {
info!("Provider record not stored: {:?}", e);
Expand Down Expand Up @@ -1746,6 +1788,20 @@ where
};
}

fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.local_addrs.insert(addr.clone());
}

fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.local_addrs.remove(addr);
}

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
if self.local_addrs.len() < MAX_LOCAL_EXTERNAL_ADDRS {
self.local_addrs.insert(addr.clone());
}
}

fn poll(&mut self, cx: &mut Context<'_>, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction<
<KademliaHandler<QueryId> as ProtocolsHandler>::InEvent,
Expand Down Expand Up @@ -2497,3 +2553,8 @@ pub enum RoutingUpdate {
/// peer ID).
Failed,
}

/// The maximum number of local addresses as reported by
/// other peers that the behaviour tracks. The behaviour
/// always tracks all its listen addresses.
romanb marked this conversation as resolved.
Show resolved Hide resolved
const MAX_LOCAL_EXTERNAL_ADDRS: usize = 20;
12 changes: 9 additions & 3 deletions protocols/kad/src/record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
pub mod store;

use bytes::Bytes;
use libp2p_core::PeerId;
use libp2p_core::{PeerId, Multiaddr};
use multihash::Multihash;
use std::borrow::Borrow;
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -112,6 +112,8 @@ pub struct ProviderRecord {
pub provider: PeerId,
/// The expiration time as measured by a local, monotonic clock.
pub expires: Option<Instant>,
/// The known addresses that the provider may be listening on.
pub addresses: Vec<Multiaddr>
}

impl Hash for ProviderRecord {
Expand All @@ -123,12 +125,15 @@ impl Hash for ProviderRecord {

impl ProviderRecord {
/// Creates a new provider record for insertion into a `RecordStore`.
pub fn new<K>(key: K, provider: PeerId) -> Self
pub fn new<K>(key: K, provider: PeerId, addresses: Vec<Multiaddr>) -> Self
where
K: Into<Key>
{
ProviderRecord {
key: key.into(), provider, expires: None
key: key.into(),
provider,
expires: None,
addresses,
}
}

Expand Down Expand Up @@ -178,6 +183,7 @@ mod tests {
} else {
None
},
addresses: vec![],
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions protocols/kad/src/record/store/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ mod tests {
let key = Key::from(random_multihash());

let mut records = providers.into_iter().map(|p| {
ProviderRecord::new(key.clone(), p.into_preimage())
ProviderRecord::new(key.clone(), p.into_preimage(), Vec::new())
}).collect::<Vec<_>>();

for r in &records {
Expand All @@ -280,7 +280,7 @@ mod tests {
let id = PeerId::random();
let mut store = MemoryStore::new(id.clone());
let key = random_multihash();
let rec = ProviderRecord::new(key, id.clone());
let rec = ProviderRecord::new(key, id.clone(), Vec::new());
assert!(store.add_provider(rec.clone()).is_ok());
assert_eq!(vec![Cow::Borrowed(&rec)], store.provided().collect::<Vec<_>>());
store.remove_provider(&rec.key, &id);
Expand All @@ -292,7 +292,7 @@ mod tests {
let mut store = MemoryStore::new(PeerId::random());
let key = random_multihash();
let prv = PeerId::random();
let mut rec = ProviderRecord::new(key, prv);
let mut rec = ProviderRecord::new(key, prv, Vec::new());
assert!(store.add_provider(rec.clone()).is_ok());
assert_eq!(vec![rec.clone()], store.providers(&rec.key).to_vec());
rec.expires = Some(Instant::now());
Expand All @@ -306,12 +306,12 @@ mod tests {
for _ in 0 .. store.config.max_provided_keys {
let key = random_multihash();
let prv = PeerId::random();
let rec = ProviderRecord::new(key, prv);
let rec = ProviderRecord::new(key, prv, Vec::new());
let _ = store.add_provider(rec);
}
let key = random_multihash();
let prv = PeerId::random();
let rec = ProviderRecord::new(key, prv);
let rec = ProviderRecord::new(key, prv, Vec::new());
match store.add_provider(rec) {
Err(Error::MaxProvidedKeys) => {}
_ => panic!("Unexpected result"),
Expand Down