diff --git a/src/protocol/libp2p/kademlia/config.rs b/src/protocol/libp2p/kademlia/config.rs index 8a02a3e3..f4fb5f29 100644 --- a/src/protocol/libp2p/kademlia/config.rs +++ b/src/protocol/libp2p/kademlia/config.rs @@ -31,7 +31,11 @@ use crate::{ use multiaddr::Multiaddr; use tokio::sync::mpsc::{channel, Receiver, Sender}; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc}, + time::Duration, +}; /// Default TTL for the records. const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); @@ -39,6 +43,9 @@ const DEFAULT_TTL: Duration = Duration::from_secs(36 * 60 * 60); /// Default provider record TTL. const DEFAULT_PROVIDER_TTL: Duration = Duration::from_secs(48 * 60 * 60); +/// Default provider republish interval. +pub(super) const DEFAULT_PROVIDER_REFRESH_INTERVAL: Duration = Duration::from_secs(22 * 60 * 60); + /// Protocol name. const PROTOCOL_NAME: &str = "/ipfs/kad/1.0.0"; @@ -74,11 +81,17 @@ pub struct Config { /// Provider record TTL. pub(super) provider_ttl: Duration, + /// Provider republish interval. + pub(super) provider_refresh_interval: Duration, + /// TX channel for sending events to `KademliaHandle`. pub(super) event_tx: Sender, /// RX channel for receiving commands from `KademliaHandle`. pub(super) cmd_rx: Receiver, + + /// Next query ID counter shared with the handle. + pub(super) next_query_id: Arc, } impl Config { @@ -90,9 +103,11 @@ impl Config { validation_mode: IncomingRecordValidationMode, record_ttl: Duration, provider_ttl: Duration, + provider_refresh_interval: Duration, ) -> (Self, KademliaHandle) { let (cmd_tx, cmd_rx) = channel(DEFAULT_CHANNEL_SIZE); let (event_tx, event_rx) = channel(DEFAULT_CHANNEL_SIZE); + let next_query_id = Arc::new(AtomicUsize::new(0usize)); // if no protocol names were provided, use the default protocol if protocol_names.is_empty() { @@ -106,13 +121,15 @@ impl Config { validation_mode, record_ttl, provider_ttl, + provider_refresh_interval, codec: ProtocolCodec::UnsignedVarint(None), replication_factor, known_peers, cmd_rx, event_tx, + next_query_id: next_query_id.clone(), }, - KademliaHandle::new(cmd_tx, event_rx), + KademliaHandle::new(cmd_tx, event_rx, next_query_id), ) } @@ -126,6 +143,7 @@ impl Config { IncomingRecordValidationMode::Automatic, DEFAULT_TTL, DEFAULT_PROVIDER_TTL, + DEFAULT_PROVIDER_REFRESH_INTERVAL, ) } } @@ -151,8 +169,11 @@ pub struct ConfigBuilder { /// Default TTL for the records. pub(super) record_ttl: Duration, - /// Default TTL for the provider records. + /// TTL for the provider records. pub(super) provider_ttl: Duration, + + /// Republish interval for the provider records. + pub(super) provider_refresh_interval: Duration, } impl Default for ConfigBuilder { @@ -172,6 +193,7 @@ impl ConfigBuilder { validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: DEFAULT_TTL, provider_ttl: DEFAULT_PROVIDER_TTL, + provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL, } } @@ -224,7 +246,7 @@ impl ConfigBuilder { self } - /// Set default TTL for the provider records. Recommended value is 2 * (refresh interval) + 20%. + /// Set TTL for the provider records. Recommended value is 2 * (refresh interval) + 10%. /// /// If unspecified, the default TTL is 48 hours. pub fn with_provider_record_ttl(mut self, provider_record_ttl: Duration) -> Self { @@ -232,6 +254,14 @@ impl ConfigBuilder { self } + /// Set the refresh (republish) interval for provider records. + /// + /// If unspecified, the default interval is 22 hours. + pub fn with_provider_refresh_interval(mut self, provider_refresh_interval: Duration) -> Self { + self.provider_refresh_interval = provider_refresh_interval; + self + } + /// Build Kademlia [`Config`]. pub fn build(self) -> (Config, KademliaHandle) { Config::new( @@ -242,6 +272,7 @@ impl ConfigBuilder { self.validation_mode, self.record_ttl, self.provider_ttl, + self.provider_refresh_interval, ) } } diff --git a/src/protocol/libp2p/kademlia/executor.rs b/src/protocol/libp2p/kademlia/executor.rs index c2a04bf9..eb7e01ba 100644 --- a/src/protocol/libp2p/kademlia/executor.rs +++ b/src/protocol/libp2p/kademlia/executor.rs @@ -18,15 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{protocol::libp2p::kademlia::query::QueryId, substream::Substream, PeerId}; +use crate::{ + protocol::libp2p::kademlia::{futures_stream::FuturesStream, query::QueryId}, + substream::Substream, + PeerId, +}; use bytes::{Bytes, BytesMut}; -use futures::{future::BoxFuture, stream::FuturesUnordered, Stream, StreamExt}; +use futures::{future::BoxFuture, Stream, StreamExt}; use std::{ - future::Future, pin::Pin, - task::{Context, Poll, Waker}, + task::{Context, Poll}, time::Duration, }; @@ -71,53 +74,6 @@ pub struct QueryContext { pub result: QueryResult, } -/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. -#[derive(Default)] -pub struct FuturesStream { - futures: FuturesUnordered, - waker: Option, -} - -impl FuturesStream { - /// Create new [`FuturesStream`]. - pub fn new() -> Self { - Self { - futures: FuturesUnordered::new(), - waker: None, - } - } - - /// Push a future for processing. - pub fn push(&mut self, future: F) { - self.futures.push(future); - - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } -} - -impl Stream for FuturesStream { - type Item = ::Output; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { - // We must save the current waker to wake up the task when new futures are inserted. - // - // Otherwise, simply returning `Poll::Pending` here would cause the task to never be - // woken up again. - // - // We were previously relying on some other task from the `loop tokio::select!` to - // finish. - self.waker = Some(cx.waker().clone()); - - return Poll::Pending; - }; - - Poll::Ready(Some(result)) - } -} - /// Query executor. pub struct QueryExecutor { /// Pending futures. diff --git a/src/protocol/libp2p/kademlia/futures_stream.rs b/src/protocol/libp2p/kademlia/futures_stream.rs new file mode 100644 index 00000000..9c7d8039 --- /dev/null +++ b/src/protocol/libp2p/kademlia/futures_stream.rs @@ -0,0 +1,76 @@ +// Copyright 2024 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{stream::FuturesUnordered, Stream, StreamExt}; + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll, Waker}, +}; + +/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically. +/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be +/// polled when contains no futures. +#[derive(Default)] +pub struct FuturesStream { + futures: FuturesUnordered, + waker: Option, +} + +impl FuturesStream { + /// Create new [`FuturesStream`]. + pub fn new() -> Self { + Self { + futures: FuturesUnordered::new(), + waker: None, + } + } + + /// Push a future for processing. + pub fn push(&mut self, future: F) { + self.futures.push(future); + + if let Some(waker) = self.waker.take() { + waker.wake(); + } + } +} + +impl Stream for FuturesStream { + type Item = ::Output; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Poll::Ready(Some(result)) = self.futures.poll_next_unpin(cx) else { + // We must save the current waker to wake up the task when new futures are inserted. + // + // Otherwise, simply returning `Poll::Pending` here would cause the task to never be + // woken up again. + // + // We were previously relying on some other task from the `loop tokio::select!` to + // finish. + self.waker = Some(cx.waker().clone()); + + return Poll::Pending; + }; + + Poll::Ready(Some(result)) + } +} diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index f1b4c218..bf6ee78c 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -30,6 +30,10 @@ use tokio::sync::mpsc::{Receiver, Sender}; use std::{ num::NonZeroUsize, pin::Pin, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, }; @@ -234,23 +238,26 @@ pub struct KademliaHandle { event_rx: Receiver, /// Next query ID. - next_query_id: usize, + next_query_id: Arc, } impl KademliaHandle { /// Create new [`KademliaHandle`]. - pub(super) fn new(cmd_tx: Sender, event_rx: Receiver) -> Self { + pub(super) fn new( + cmd_tx: Sender, + event_rx: Receiver, + next_query_id: Arc, + ) -> Self { Self { cmd_tx, event_rx, - next_query_id: 0usize, + next_query_id, } } /// Allocate next query ID. fn next_query_id(&mut self) -> QueryId { - let query_id = self.next_query_id; - self.next_query_id += 1; + let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed); QueryId(query_id) } diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 0b3da797..18c1b17d 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -31,7 +31,7 @@ use crate::{ query::{QueryAction, QueryEngine}, record::ProviderRecord, routing_table::RoutingTable, - store::MemoryStore, + store::{MemoryStore, MemoryStoreAction, MemoryStoreConfig}, types::{ConnectionType, KademliaPeer, Key}, }, Direction, TransportEvent, TransportService, @@ -48,6 +48,10 @@ use tokio::sync::mpsc::{Receiver, Sender}; use std::{ collections::{hash_map::Entry, HashMap}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::{Duration, Instant}, }; @@ -68,6 +72,7 @@ const PARALLELISM_FACTOR: usize = 3; mod bucket; mod config; mod executor; +mod futures_stream; mod handle; mod message; mod query; @@ -133,6 +138,9 @@ pub(crate) struct Kademlia { /// RX channel for receiving commands from `KademliaHandle`. cmd_rx: Receiver, + /// Next query ID. + next_query_id: Arc, + /// Routing table. routing_table: RoutingTable, @@ -181,12 +189,21 @@ impl Kademlia { service.add_known_address(&peer, addresses.into_iter()); } + let store = MemoryStore::with_config( + local_peer_id, + MemoryStoreConfig { + provider_refresh_interval: config.provider_refresh_interval, + ..Default::default() + }, + ); + Self { service, routing_table, peers: HashMap::new(), cmd_rx: config.cmd_rx, - store: MemoryStore::new(), + next_query_id: config.next_query_id, + store, event_tx: config.event_tx, local_key, pending_dials: HashMap::new(), @@ -201,6 +218,13 @@ impl Kademlia { } } + /// Allocate next query ID. + fn next_query_id(&mut self) -> QueryId { + let query_id = self.next_query_id.fetch_add(1, Ordering::Relaxed); + + QueryId(query_id) + } + /// Connection established to remote peer. fn on_connection_established(&mut self, peer: PeerId) -> crate::Result<()> { tracing::trace!(target: LOG_TARGET, ?peer, "connection established"); @@ -912,7 +936,7 @@ impl Kademlia { self.disconnect_peer(peer, query_id).await; } } - } + }, command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { @@ -1084,6 +1108,31 @@ impl Kademlia { None => return Err(Error::EssentialTaskClosed), } }, + action = self.store.next_action() => match action { + Some(MemoryStoreAction::RefreshProvider { mut provider }) => { + tracing::trace!( + target: LOG_TARGET, + key = ?provider.key, + "republishing local provider", + ); + + // Make sure to roll expiration time. + provider.expires = Instant::now() + self.provider_ttl; + + self.store.put_provider(provider.clone()); + + let key = provider.key.clone(); + let query_id = self.next_query_id(); + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); + } + None => {} + } } } } @@ -1132,6 +1181,7 @@ mod tests { ); let (event_tx, event_rx) = channel(64); let (_cmd_tx, cmd_rx) = channel(64); + let next_query_id = Arc::new(AtomicUsize::new(0usize)); let config = Config { protocol_names: vec![ProtocolName::from("/kad/1")], @@ -1142,8 +1192,10 @@ mod tests { validation_mode: IncomingRecordValidationMode::Automatic, record_ttl: Duration::from_secs(36 * 60 * 60), provider_ttl: Duration::from_secs(48 * 60 * 60), + provider_refresh_interval: Duration::from_secs(22 * 60 * 60), event_tx, cmd_rx, + next_query_id, }; ( diff --git a/src/protocol/libp2p/kademlia/store.rs b/src/protocol/libp2p/kademlia/store.rs index 6222a6fd..8ea30ecb 100644 --- a/src/protocol/libp2p/kademlia/store.rs +++ b/src/protocol/libp2p/kademlia/store.rs @@ -21,45 +21,68 @@ //! Memory store implementation for Kademlia. #![allow(unused)] -use crate::protocol::libp2p::kademlia::record::{Key, ProviderRecord, Record}; +use crate::{ + protocol::libp2p::kademlia::{ + config::DEFAULT_PROVIDER_REFRESH_INTERVAL, + futures_stream::FuturesStream, + record::{Key, ProviderRecord, Record}, + }, + PeerId, +}; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use std::{ collections::{hash_map::Entry, HashMap}, num::NonZeroUsize, + time::Duration, }; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store"; /// Memory store events. -pub enum MemoryStoreEvent {} +pub enum MemoryStoreAction { + RefreshProvider { provider: ProviderRecord }, +} /// Memory store. pub struct MemoryStore { + /// Local peer ID. Used to track local providers. + local_peer_id: PeerId, + /// Configuration. + config: MemoryStoreConfig, /// Records. records: HashMap, /// Provider records. provider_keys: HashMap>, - /// Configuration. - config: MemoryStoreConfig, + /// Local providers. + local_providers: HashMap, + /// Futures to signal it's time to republish a local provider. + pending_provider_refresh: FuturesStream>, } impl MemoryStore { /// Create new [`MemoryStore`]. - pub fn new() -> Self { + pub fn new(local_peer_id: PeerId) -> Self { Self { + local_peer_id, + config: MemoryStoreConfig::default(), records: HashMap::new(), provider_keys: HashMap::new(), - config: MemoryStoreConfig::default(), + local_providers: HashMap::new(), + pending_provider_refresh: FuturesStream::new(), } } /// Create new [`MemoryStore`] with the provided configuration. - pub fn with_config(config: MemoryStoreConfig) -> Self { + pub fn with_config(local_peer_id: PeerId, config: MemoryStoreConfig) -> Self { Self { + local_peer_id, + config, records: HashMap::new(), provider_keys: HashMap::new(), - config, + local_providers: HashMap::new(), + pending_provider_refresh: FuturesStream::new(), } } @@ -148,6 +171,17 @@ impl MemoryStore { /// /// Returns `true` if the provider was added, `false` otherwise. pub fn put_provider(&mut self, provider_record: ProviderRecord) -> bool { + // Helper to schedule local provider refresh. + let mut schedule_local_provider_refresh = |provider_record: ProviderRecord| { + let key = provider_record.key.clone(); + let refresh_interval = self.config.provider_refresh_interval; + self.local_providers.insert(key.clone(), provider_record); + self.pending_provider_refresh.push(Box::pin(async move { + tokio::time::sleep(refresh_interval).await; + key + })); + }; + // Make sure we have no more than `max_provider_addresses`. let provider_record = { let mut record = provider_record; @@ -160,6 +194,10 @@ impl MemoryStore { match self.provider_keys.entry(provider_record.key.clone()) { Entry::Vacant(entry) => if can_insert_new_key { + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } + entry.insert(vec![provider_record]); true @@ -183,8 +221,11 @@ impl MemoryStore { match provider_position { Ok(i) => { + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } // Update the provider in place. - providers[i] = provider_record; + providers[i] = provider_record.clone(); true } @@ -206,7 +247,11 @@ impl MemoryStore { providers.pop(); } - providers.insert(i, provider_record); + if provider_record.provider == self.local_peer_id { + schedule_local_provider_refresh(provider_record.clone()); + } + + providers.insert(i, provider_record.clone()); true } @@ -216,9 +261,32 @@ impl MemoryStore { } } - /// Poll next event from the store. - async fn next_event() -> Option { - None + /// Poll next action from the store. + pub async fn next_action(&mut self) -> Option { + // [`FuturesStream`] never terminates, so `map()` below is always triggered. + self.pending_provider_refresh + .next() + .await + .map(|key| { + if let Some(provider) = self.local_providers.get(&key).cloned() { + tracing::trace!( + target: LOG_TARGET, + ?key, + "refresh provider" + ); + + Some(MemoryStoreAction::RefreshProvider { provider }) + } else { + tracing::trace!( + target: LOG_TARGET, + ?key, + "it's time to refresh a provider, but we do not provide this key anymore", + ); + + None + } + }) + .flatten() } } @@ -238,6 +306,9 @@ pub struct MemoryStoreConfig { /// Maximum number of providers per key. Only providers with peer IDs closest to the key are /// kept. pub max_providers_per_key: usize, + + /// Local providers republish interval. + pub provider_refresh_interval: Duration, } impl Default for MemoryStoreConfig { @@ -248,6 +319,7 @@ impl Default for MemoryStoreConfig { max_provider_keys: 1024, max_provider_addresses: 30, max_providers_per_key: 20, + provider_refresh_interval: DEFAULT_PROVIDER_REFRESH_INTERVAL, } } } @@ -263,7 +335,7 @@ mod tests { #[test] fn put_get_record() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record = Record::new(key.clone(), vec![4, 5, 6]); @@ -273,11 +345,14 @@ mod tests { #[test] fn max_records() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_records: 1, - max_record_size_bytes: 1024, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_records: 1, + max_record_size_bytes: 1024, + ..Default::default() + }, + ); let key1 = Key::from(vec![1, 2, 3]); let key2 = Key::from(vec![4, 5, 6]); @@ -293,7 +368,7 @@ mod tests { #[test] fn expired_record_removed() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record = Record { key: key.clone(), @@ -310,7 +385,7 @@ mod tests { #[test] fn new_record_overwrites() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let record1 = Record { key: key.clone(), @@ -334,11 +409,14 @@ mod tests { #[test] fn max_record_size() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_records: 1024, - max_record_size_bytes: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_records: 1024, + max_record_size_bytes: 2, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let record = Record::new(key.clone(), vec![4, 5]); @@ -352,7 +430,7 @@ mod tests { #[test] fn put_get_provider() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), provider: PeerId::random(), @@ -366,7 +444,7 @@ mod tests { #[test] fn multiple_providers_per_key() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let provider1 = ProviderRecord { key: key.clone(), @@ -392,7 +470,7 @@ mod tests { #[test] fn providers_sorted_by_distance() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let providers = (0..10) .map(|_| ProviderRecord { @@ -418,10 +496,13 @@ mod tests { #[test] fn max_providers_per_key() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..20) .map(|_| ProviderRecord { @@ -440,10 +521,13 @@ mod tests { #[test] fn closest_providers_kept() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..20) .map(|_| ProviderRecord { @@ -470,10 +554,13 @@ mod tests { #[test] fn furthest_provider_discarded() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let providers = (0..11) .map(|_| ProviderRecord { @@ -503,10 +590,13 @@ mod tests { #[test] fn update_provider_in_place() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_providers_per_key: 10, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_providers_per_key: 10, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let peer_ids = (0..10).map(|_| PeerId::random()).collect::>(); let peer_id0 = peer_ids[0]; @@ -558,7 +648,7 @@ mod tests { #[test] fn provider_record_expires() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), provider: PeerId::random(), @@ -575,7 +665,7 @@ mod tests { #[test] fn individual_provider_record_expires() { - let mut store = MemoryStore::new(); + let mut store = MemoryStore::new(PeerId::random()); let key = Key::from(vec![1, 2, 3]); let provider1 = ProviderRecord { key: key.clone(), @@ -600,10 +690,13 @@ mod tests { #[test] fn max_addresses_per_provider() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_provider_addresses: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_provider_addresses: 2, + ..Default::default() + }, + ); let key = Key::from(vec![1, 2, 3]); let provider = ProviderRecord { key: Key::from(vec![1, 2, 3]), @@ -628,10 +721,13 @@ mod tests { #[test] fn max_provider_keys() { - let mut store = MemoryStore::with_config(MemoryStoreConfig { - max_provider_keys: 2, - ..Default::default() - }); + let mut store = MemoryStore::with_config( + PeerId::random(), + MemoryStoreConfig { + max_provider_keys: 2, + ..Default::default() + }, + ); let provider1 = ProviderRecord { key: Key::from(vec![1, 2, 3]), @@ -660,4 +756,6 @@ mod tests { assert_eq!(store.get_providers(&provider2.key), vec![provider2]); assert_eq!(store.get_providers(&provider3.key), vec![]); } + + // TODO: test local providers. }