diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index a5b404ae6bd..cbb5a4decf2 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -3,9 +3,9 @@ - Included multiaddresses of found peers alongside peer IDs in `GetClosestPeers` query results. See [PR 5475](https://github.com/libp2p/rust-libp2p/pull/5475) - Changed `FIND_NODE` response: now includes a list of closest peers when querying the recipient peer ID. Previously, this request yielded an empty response. - See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270) -- Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451) - See [PR 3230](https://github.com/libp2p/rust-libp2p/pull/3230) + See [PR 5270](https://github.com/libp2p/rust-libp2p/pull/5270). +- Update to DHT republish interval and expiration time defaults to 22h and 48h respectively, rationale in [libp2p/specs#451](https://github.com/libp2p/specs/pull/451). + See [PR 3230](https://github.com/libp2p/rust-libp2p/pull/3230). - Use default dial conditions more consistently. See [PR 4957](https://github.com/libp2p/rust-libp2p/pull/4957) - QueryClose progress whenever closer in range, instead of having to be the closest. @@ -19,6 +19,8 @@ See [PR 5148](https://github.com/libp2p/rust-libp2p/pull/5148). - Derive `Copy` for `kbucket::key::Key`. See [PR 5317](https://github.com/libp2p/rust-libp2p/pull/5317). +⁻ `KBucket` size can now be modified without changing the `K_VALUE`. + See [PR 5414](https://github.com/libp2p/rust-libp2p/pull/5414). - Use `web-time` instead of `instant`. See [PR 5347](https://github.com/libp2p/rust-libp2p/pull/5347). diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 8d6f86d7e0f..fc3d8a1adaa 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -24,7 +24,7 @@ mod test; use crate::addresses::Addresses; use crate::handler::{Handler, HandlerEvent, HandlerIn, RequestId}; -use crate::kbucket::{self, Distance, KBucketsTable, NodeStatus}; +use crate::kbucket::{self, Distance, KBucketConfig, KBucketsTable, NodeStatus}; use crate::protocol::{ConnectionType, KadPeer, ProtocolConfig}; use crate::query::{Query, QueryConfig, QueryId, QueryPool, QueryPoolState}; use crate::record::{ @@ -172,7 +172,7 @@ pub enum StoreInserts { /// The configuration is consumed by [`Behaviour::new`]. #[derive(Debug, Clone)] pub struct Config { - kbucket_pending_timeout: Duration, + kbucket_config: KBucketConfig, query_config: QueryConfig, protocol_config: ProtocolConfig, record_ttl: Option, @@ -215,7 +215,7 @@ impl Config { /// Builds a new `Config` with the given protocol name. pub fn new(protocol_name: StreamProtocol) -> Self { Config { - kbucket_pending_timeout: Duration::from_secs(60), + kbucket_config: KBucketConfig::default(), query_config: QueryConfig::default(), protocol_config: ProtocolConfig::new(protocol_name), record_ttl: Some(Duration::from_secs(48 * 60 * 60)), @@ -424,6 +424,24 @@ impl Config { self } + /// Sets the configuration for the k-buckets. + /// + /// * Default to K_VALUE. + pub fn set_kbucket_size(&mut self, size: NonZeroUsize) -> &mut Self { + self.kbucket_config.set_bucket_size(size); + self + } + + /// Sets the timeout duration after creation of a pending entry after which + /// it becomes eligible for insertion into a full bucket, replacing the + /// least-recently (dis)connected node. + /// + /// * Default to `60` s. + pub fn set_kbucket_pending_timeout(&mut self, timeout: Duration) -> &mut Self { + self.kbucket_config.set_pending_timeout(timeout); + self + } + /// Sets the time to wait before calling [`Behaviour::bootstrap`] after a new peer is inserted in the routing table. /// This prevent cascading bootstrap requests when multiple peers are inserted into the routing table "at the same time". /// This also allows to wait a little bit for other potential peers to be inserted into the routing table before @@ -481,7 +499,7 @@ where Behaviour { store, caching: config.caching, - kbuckets: KBucketsTable::new(local_key, config.kbucket_pending_timeout), + kbuckets: KBucketsTable::new(local_key, config.kbucket_config), kbucket_inserts: config.kbucket_inserts, protocol_config: config.protocol_config, record_filtering: config.record_filtering, diff --git a/protocols/kad/src/kbucket.rs b/protocols/kad/src/kbucket.rs index 7ed10f7f853..28d7df03917 100644 --- a/protocols/kad/src/kbucket.rs +++ b/protocols/kad/src/kbucket.rs @@ -75,15 +75,49 @@ mod key; pub use bucket::NodeStatus; pub use entry::*; -use arrayvec::ArrayVec; use bucket::KBucket; use std::collections::VecDeque; +use std::num::NonZeroUsize; use std::time::Duration; use web_time::Instant; /// Maximum number of k-buckets. const NUM_BUCKETS: usize = 256; +/// The configuration for `KBucketsTable`. +#[derive(Debug, Clone, Copy)] +pub(crate) struct KBucketConfig { + /// Maximal number of nodes that a bucket can contain. + bucket_size: usize, + /// Specifies the duration after creation of a [`PendingEntry`] after which + /// it becomes eligible for insertion into a full bucket, replacing the + /// least-recently (dis)connected node. + pending_timeout: Duration, +} + +impl Default for KBucketConfig { + fn default() -> Self { + KBucketConfig { + bucket_size: K_VALUE.get(), + pending_timeout: Duration::from_secs(60), + } + } +} + +impl KBucketConfig { + /// Modifies the maximal number of nodes that a bucket can contain. + pub(crate) fn set_bucket_size(&mut self, bucket_size: NonZeroUsize) { + self.bucket_size = bucket_size.get(); + } + + /// Modifies the duration after creation of a [`PendingEntry`] after which + /// it becomes eligible for insertion into a full bucket, replacing the + /// least-recently (dis)connected node. + pub(crate) fn set_pending_timeout(&mut self, pending_timeout: Duration) { + self.pending_timeout = pending_timeout; + } +} + /// A `KBucketsTable` represents a Kademlia routing table. #[derive(Debug, Clone)] pub(crate) struct KBucketsTable { @@ -91,6 +125,8 @@ pub(crate) struct KBucketsTable { local_key: TKey, /// The buckets comprising the routing table. buckets: Vec>, + /// The maximal number of nodes that a bucket can contain. + bucket_size: usize, /// The list of evicted entries that have been replaced with pending /// entries since the last call to [`KBucketsTable::take_applied_pending`]. applied_pending: VecDeque>, @@ -151,17 +187,12 @@ where TVal: Clone, { /// Creates a new, empty Kademlia routing table with entries partitioned - /// into buckets as per the Kademlia protocol. - /// - /// The given `pending_timeout` specifies the duration after creation of - /// a [`PendingEntry`] after which it becomes eligible for insertion into - /// a full bucket, replacing the least-recently (dis)connected node. - pub(crate) fn new(local_key: TKey, pending_timeout: Duration) -> Self { + /// into buckets as per the Kademlia protocol using the provided config. + pub(crate) fn new(local_key: TKey, config: KBucketConfig) -> Self { KBucketsTable { local_key, - buckets: (0..NUM_BUCKETS) - .map(|_| KBucket::new(pending_timeout)) - .collect(), + buckets: (0..NUM_BUCKETS).map(|_| KBucket::new(config)).collect(), + bucket_size: config.bucket_size, applied_pending: VecDeque::new(), } } @@ -247,13 +278,16 @@ where T: AsRef, { let distance = self.local_key.as_ref().distance(target); + let bucket_size = self.bucket_size; ClosestIter { target, iter: None, table: self, buckets_iter: ClosestBucketsIter::new(distance), - fmap: |b: &KBucket| -> ArrayVec<_, { K_VALUE.get() }> { - b.iter().map(|(n, _)| n.key.clone()).collect() + fmap: move |b: &KBucket| -> Vec<_> { + let mut vec = Vec::with_capacity(bucket_size); + vec.extend(b.iter().map(|(n, _)| n.key.clone())); + vec }, } } @@ -269,13 +303,15 @@ where TVal: Clone, { let distance = self.local_key.as_ref().distance(target); + let bucket_size = self.bucket_size; ClosestIter { target, iter: None, table: self, buckets_iter: ClosestBucketsIter::new(distance), - fmap: |b: &KBucket<_, TVal>| -> ArrayVec<_, { K_VALUE.get() }> { + fmap: move |b: &KBucket<_, TVal>| -> Vec<_> { b.iter() + .take(bucket_size) .map(|(n, status)| EntryView { node: n.clone(), status, @@ -324,7 +360,7 @@ struct ClosestIter<'a, TTarget, TKey, TVal, TMap, TOut> { /// distance of the local key to the target. buckets_iter: ClosestBucketsIter, /// The iterator over the entries in the currently traversed bucket. - iter: Option>, + iter: Option>, /// The projection function / mapping applied on each bucket as /// it is encountered, producing the next `iter`ator. fmap: TMap, @@ -429,7 +465,7 @@ where TTarget: AsRef, TKey: Clone + AsRef, TVal: Clone, - TMap: Fn(&KBucket) -> ArrayVec, + TMap: Fn(&KBucket) -> Vec, TOut: AsRef, { type Item = TOut; @@ -535,11 +571,14 @@ mod tests { fn arbitrary(g: &mut Gen) -> TestTable { let local_key = Key::from(PeerId::random()); let timeout = Duration::from_secs(g.gen_range(1..360)); - let mut table = TestTable::new(local_key.into(), timeout); + let mut config = KBucketConfig::default(); + config.set_pending_timeout(timeout); + let bucket_size = config.bucket_size; + let mut table = TestTable::new(local_key.into(), config); let mut num_total = g.gen_range(0..100); for (i, b) in &mut table.buckets.iter_mut().enumerate().rev() { let ix = BucketIndex(i); - let num = g.gen_range(0..usize::min(K_VALUE.get(), num_total) + 1); + let num = g.gen_range(0..usize::min(bucket_size, num_total) + 1); num_total -= num; for _ in 0..num { let distance = ix.rand_distance(&mut rand::thread_rng()); @@ -560,7 +599,9 @@ mod tests { fn buckets_are_non_overlapping_and_exhaustive() { let local_key = Key::from(PeerId::random()); let timeout = Duration::from_secs(0); - let mut table = KBucketsTable::::new(local_key.into(), timeout); + let mut config = KBucketConfig::default(); + config.set_pending_timeout(timeout); + let mut table = KBucketsTable::::new(local_key.into(), config); let mut prev_max = U256::from(0); @@ -577,7 +618,9 @@ mod tests { fn bucket_contains_range() { fn prop(ix: u8) { let index = BucketIndex(ix as usize); - let mut bucket = KBucket::, ()>::new(Duration::from_secs(0)); + let mut config = KBucketConfig::default(); + config.set_pending_timeout(Duration::from_secs(0)); + let mut bucket = KBucket::, ()>::new(config); let bucket_ref = KBucketRef { index, bucket: &mut bucket, @@ -623,7 +666,7 @@ mod tests { let local_key = Key::from(PeerId::random()); let other_id = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default()); if let Some(Entry::Absent(entry)) = table.entry(&other_id) { match entry.insert((), NodeStatus::Connected) { InsertResult::Inserted => (), @@ -641,7 +684,7 @@ mod tests { #[test] fn entry_self() { let local_key = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default()); assert!(table.entry(&local_key).is_none()) } @@ -649,7 +692,7 @@ mod tests { #[test] fn closest() { let local_key = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_secs(5)); + let mut table = KBucketsTable::<_, ()>::new(local_key, KBucketConfig::default()); let mut count = 0; loop { if count == 100 { @@ -684,7 +727,9 @@ mod tests { #[test] fn applied_pending() { let local_key = Key::from(PeerId::random()); - let mut table = KBucketsTable::<_, ()>::new(local_key, Duration::from_millis(1)); + let mut config = KBucketConfig::default(); + config.set_pending_timeout(Duration::from_millis(1)); + let mut table = KBucketsTable::<_, ()>::new(local_key, config); let expected_applied; let full_bucket_index; loop { diff --git a/protocols/kad/src/kbucket/bucket.rs b/protocols/kad/src/kbucket/bucket.rs index 1bd4389eb3d..1426017aa7a 100644 --- a/protocols/kad/src/kbucket/bucket.rs +++ b/protocols/kad/src/kbucket/bucket.rs @@ -88,15 +88,17 @@ pub struct Node { } /// The position of a node in a `KBucket`, i.e. a non-negative integer -/// in the range `[0, K_VALUE)`. +/// in the range `[0, bucket_size)`. #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub(crate) struct Position(usize); -/// A `KBucket` is a list of up to `K_VALUE` keys and associated values, +/// A `KBucket` is a list of up to `capacity` keys and associated values, /// ordered from least-recently connected to most-recently connected. #[derive(Debug, Clone)] pub(crate) struct KBucket { /// The nodes contained in the bucket. - nodes: ArrayVec, { K_VALUE.get() }>, + nodes: Vec>, + /// The maximal number of nodes that a bucket can contain. + capacity: usize, /// The position (index) in `nodes` that marks the first connected node. /// @@ -104,7 +106,7 @@ pub(crate) struct KBucket { /// most-recently connected, all entries above this index are also considered /// connected, i.e. the range `[0, first_connected_pos)` marks the sub-list of entries /// that are considered disconnected and the range - /// `[first_connected_pos, K_VALUE)` marks sub-list of entries that are + /// `[first_connected_pos, capacity)` marks sub-list of entries that are /// considered connected. /// /// `None` indicates that there are no connected entries in the bucket, i.e. @@ -156,18 +158,31 @@ pub(crate) struct AppliedPending { pub(crate) evicted: Option>, } +impl Default for KBucket { + fn default() -> Self { + KBucket { + nodes: Vec::with_capacity(K_VALUE.get()), + capacity: K_VALUE.get(), + first_connected_pos: None, + pending: None, + pending_timeout: Duration::from_secs(60), + } + } +} + impl KBucket where TKey: Clone + AsRef, TVal: Clone, { - /// Creates a new `KBucket` with the given timeout for pending entries. - pub(crate) fn new(pending_timeout: Duration) -> Self { + /// Creates a new `KBucket` with the given configuration. + pub(crate) fn new(config: KBucketConfig) -> Self { KBucket { - nodes: ArrayVec::new(), + nodes: Vec::with_capacity(config.bucket_size), + capacity: config.bucket_size, first_connected_pos: None, pending: None, - pending_timeout, + pending_timeout: config.pending_timeout, } } @@ -205,7 +220,7 @@ where pub(crate) fn apply_pending(&mut self) -> Option> { if let Some(pending) = self.pending.take() { if pending.replace <= Instant::now() { - if self.nodes.is_full() { + if self.nodes.len() >= self.capacity { if self.status(Position(0)) == NodeStatus::Connected { // The bucket is full with connected nodes. Drop the pending node. return None; @@ -316,7 +331,7 @@ where ) -> InsertResult { match status { NodeStatus::Connected => { - if self.nodes.is_full() { + if self.nodes.len() >= self.capacity { if self.first_connected_pos == Some(0) || self.pending.is_some() { return InsertResult::Full; } else { @@ -336,7 +351,7 @@ where InsertResult::Inserted } NodeStatus::Disconnected => { - if self.nodes.is_full() { + if self.nodes.len() >= self.capacity { return InsertResult::Full; } if let Some(ref mut p) = self.first_connected_pos { @@ -435,8 +450,10 @@ mod tests { impl Arbitrary for KBucket, ()> { fn arbitrary(g: &mut Gen) -> KBucket, ()> { let timeout = Duration::from_secs(g.gen_range(1..g.size()) as u64); - let mut bucket = KBucket::, ()>::new(timeout); - let num_nodes = g.gen_range(1..K_VALUE.get() + 1); + let mut config = KBucketConfig::default(); + config.set_pending_timeout(timeout); + let mut bucket = KBucket::, ()>::new(config); + let num_nodes = g.gen_range(1..bucket.capacity + 1); for _ in 0..num_nodes { let key = Key::from(PeerId::random()); let node = Node { key, value: () }; @@ -469,7 +486,7 @@ mod tests { // Fill a bucket with random nodes with the given status. fn fill_bucket(bucket: &mut KBucket, ()>, status: NodeStatus) { let num_entries_start = bucket.num_entries(); - for i in 0..K_VALUE.get() - num_entries_start { + for i in 0..bucket.capacity - num_entries_start { let key = Key::from(PeerId::random()); let node = Node { key, value: () }; assert_eq!(InsertResult::Inserted, bucket.insert(node, status)); @@ -480,7 +497,7 @@ mod tests { #[test] fn ordering() { fn prop(status: Vec) -> bool { - let mut bucket = KBucket::, ()>::new(Duration::from_secs(1)); + let mut bucket = KBucket::, ()>::default(); // The expected lists of connected and disconnected nodes. let mut connected = VecDeque::new(); @@ -490,7 +507,7 @@ mod tests { for status in status { let key = Key::from(PeerId::random()); let node = Node { key, value: () }; - let full = bucket.num_entries() == K_VALUE.get(); + let full = bucket.num_entries() == bucket.capacity; if let InsertResult::Inserted = bucket.insert(node, status) { let vec = match status { NodeStatus::Connected => &mut connected, @@ -522,7 +539,7 @@ mod tests { #[test] fn full_bucket() { - let mut bucket = KBucket::, ()>::new(Duration::from_secs(1)); + let mut bucket = KBucket::, ()>::default(); // Fill the bucket with disconnected nodes. fill_bucket(&mut bucket, NodeStatus::Disconnected); @@ -536,7 +553,7 @@ mod tests { } // One-by-one fill the bucket with connected nodes, replacing the disconnected ones. - for i in 0..K_VALUE.get() { + for i in 0..bucket.capacity { let (first, first_status) = bucket.iter().next().unwrap(); let first_disconnected = first.clone(); assert_eq!(first_status, NodeStatus::Disconnected); @@ -573,11 +590,11 @@ mod tests { ); assert_eq!(Some((&node, NodeStatus::Connected)), bucket.iter().last()); assert!(bucket.pending().is_none()); - assert_eq!(Some(K_VALUE.get() - (i + 1)), bucket.first_connected_pos); + assert_eq!(Some(bucket.capacity - (i + 1)), bucket.first_connected_pos); } assert!(bucket.pending().is_none()); - assert_eq!(K_VALUE.get(), bucket.num_entries()); + assert_eq!(bucket.capacity, bucket.num_entries()); // Trying to insert another connected node fails. let key = Key::from(PeerId::random()); @@ -590,7 +607,7 @@ mod tests { #[test] fn full_bucket_discard_pending() { - let mut bucket = KBucket::, ()>::new(Duration::from_secs(1)); + let mut bucket = KBucket::, ()>::default(); fill_bucket(&mut bucket, NodeStatus::Disconnected); let (first, _) = bucket.iter().next().unwrap(); let first_disconnected = first.clone(); @@ -622,7 +639,7 @@ mod tests { bucket.first_connected_pos ); assert_eq!(1, bucket.num_connected()); - assert_eq!(K_VALUE.get() - 1, bucket.num_disconnected()); + assert_eq!(bucket.capacity - 1, bucket.num_disconnected()); } #[test] @@ -654,4 +671,21 @@ mod tests { quickcheck(prop as fn(_, _, _) -> _); } + + #[test] + fn test_custom_bucket_size() { + let bucket_sizes: [NonZeroUsize; 4] = [ + NonZeroUsize::new(2).unwrap(), + NonZeroUsize::new(20).unwrap(), + NonZeroUsize::new(200).unwrap(), + NonZeroUsize::new(2000).unwrap(), + ]; + for &size in &bucket_sizes { + let mut config = KBucketConfig::default(); + config.set_bucket_size(size); + let mut bucket = KBucket::, ()>::new(config); + fill_bucket(&mut bucket, NodeStatus::Disconnected); + assert_eq!(size.get(), bucket.num_entries()); + } + } }