Skip to content

Commit

Permalink
Refresh providers: dry-run without network queries
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Aug 28, 2024
1 parent d02acee commit 5432c9b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/protocol/libp2p/kademlia/futures_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use std::{
};

/// Wrapper around [`FuturesUnordered`] that wakes a task up automatically.
/// The [`Stream`] implemented by [`FuturesStream`] never terminates and can be
/// polled when contains no futures.
#[derive(Default)]
pub struct FuturesStream<F> {
futures: FuturesUnordered<F>,
Expand Down
29 changes: 27 additions & 2 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
query::{QueryAction, QueryEngine},
record::ProviderRecord,
routing_table::RoutingTable,
store::{MemoryStore, MemoryStoreConfig},
store::{MemoryStore, MemoryStoreAction, MemoryStoreConfig},
types::{ConnectionType, KademliaPeer, Key},
},
Direction, TransportEvent, TransportService,
Expand Down Expand Up @@ -935,7 +935,7 @@ impl Kademlia {
self.disconnect_peer(peer, query_id).await;
}
}
}
},
command = self.cmd_rx.recv() => {
match command {
Some(KademliaCommand::FindNode { peer, query_id }) => {
Expand Down Expand Up @@ -1107,6 +1107,31 @@ impl Kademlia {
None => return Err(Error::EssentialTaskClosed),
}
},
action = self.store.next_action() => match action {
Some(MemoryStoreAction::RefreshProvider { mut provider }) => {
tracing::trace!(
target: LOG_TARGET,
key = ?provider.key,
"republishing local provider",
);

// Make sure to roll expiration time.
provider.expires = Instant::now() + self.provider_ttl;

self.store.put_provider(provider.clone());


todo!("obtain a query ID and start query");
// self.engine.start_add_provider(
// query_id,
// provider,
// self.routing_table
// .closest(Key::new(provider.key), self.replication_factor)
// .into(),
// );
}
None => {}
}
}
}
}
Expand Down
65 changes: 50 additions & 15 deletions src/protocol/libp2p/kademlia/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
use crate::{
protocol::libp2p::kademlia::{
config::DEFAULT_PROVIDER_REFRESH_INTERVAL,
futures_stream::FuturesStream,
record::{Key, ProviderRecord, Record},
},
PeerId,
};

use futures::{future::BoxFuture, stream::FuturesUnordered};
use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use std::{
collections::{hash_map::Entry, HashMap},
num::NonZeroUsize,
Expand All @@ -40,7 +41,9 @@ use std::{
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::store";

/// Memory store events.
pub enum MemoryStoreEvent {}
pub enum MemoryStoreAction {
RefreshProvider { provider: ProviderRecord },
}

/// Memory store.
pub struct MemoryStore {
Expand All @@ -55,7 +58,7 @@ pub struct MemoryStore {
/// Local providers.
local_providers: HashMap<Key, ProviderRecord>,
/// Futures to signal it's time to republish a local provider.
pending_provider_republish: FuturesUnordered<BoxFuture<'static, Key>>,
pending_provider_refresh: FuturesStream<BoxFuture<'static, Key>>,
}

impl MemoryStore {
Expand All @@ -67,7 +70,7 @@ impl MemoryStore {
records: HashMap::new(),
provider_keys: HashMap::new(),
local_providers: HashMap::new(),
pending_provider_republish: FuturesUnordered::new(),
pending_provider_refresh: FuturesStream::new(),
}
}

Expand All @@ -79,7 +82,7 @@ impl MemoryStore {
records: HashMap::new(),
provider_keys: HashMap::new(),
local_providers: HashMap::new(),
pending_provider_republish: FuturesUnordered::new(),
pending_provider_refresh: FuturesStream::new(),
}
}

Expand Down Expand Up @@ -204,9 +207,7 @@ impl MemoryStore {
match provider_position {
Ok(i) => {
// Update the provider in place.
providers[i] = provider_record;

true
providers[i] = provider_record.clone();
}
Err(i) => {
// `Err(i)` contains the insertion point.
Expand All @@ -220,25 +221,59 @@ impl MemoryStore {
existing `max_providers_per_key`",
);

false
return false;
} else {
if providers.len() == usize::from(self.config.max_providers_per_key) {
providers.pop();
}

providers.insert(i, provider_record);

true
providers.insert(i, provider_record.clone());
}
}
}

if provider_record.provider == self.local_peer_id {
// We must make sure to refresh the local provider.
let key = provider_record.key.clone();
let refresh_interval = self.config.provider_refresh_interval;
self.local_providers.insert(key.clone(), provider_record);
self.pending_provider_refresh.push(Box::pin(async move {
tokio::time::sleep(refresh_interval).await;
key
}));
}

true
}
}
}

/// Poll next event from the store.
async fn next_event() -> Option<MemoryStoreEvent> {
None
/// Poll next action from the store.
pub async fn next_action(&mut self) -> Option<MemoryStoreAction> {
// [`FuturesStream`] never terminates, so `map()` below is always triggered.
self.pending_provider_refresh
.next()
.await
.map(|key| {
if let Some(provider) = self.local_providers.get(&key).cloned() {
tracing::trace!(
target: LOG_TARGET,
?key,
"refresh provider"
);

Some(MemoryStoreAction::RefreshProvider { provider })
} else {
tracing::trace!(
target: LOG_TARGET,
?key,
"it's time to refresh a provider, but we do not provide this key anymore",
);

None
}
})
.flatten()
}
}

Expand Down

0 comments on commit 5432c9b

Please sign in to comment.