From aa23e2f0acd732bbe1545db35c2c775400c40986 Mon Sep 17 00:00:00 2001 From: Luca Barbato Date: Fri, 18 Aug 2023 14:39:37 +0200 Subject: [PATCH] Add a mean to access the list of peers --- dht-cache/src/cache.rs | 60 +++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/dht-cache/src/cache.rs b/dht-cache/src/cache.rs index a1f53d0..01bfddc 100644 --- a/dht-cache/src/cache.rs +++ b/dht-cache/src/cache.rs @@ -78,9 +78,23 @@ impl Builder { pub struct Cache { peer_id: String, local: LocalCache, + peers: Arc>, cmd: UnboundedSender, } +/// Information regarding the known peers +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)] +pub struct PeerInfo { + /// libp2p Identifier + pub peer_id: String, + /// Hash of its cache of the DHT + pub hash: u64, + /// Last time the peer updated its state + /// + /// TODO: use a better type + pub last_seen: u128, +} + impl Cache { /// Send a volatile message /// @@ -142,6 +156,20 @@ impl Cache { pub fn query(&self, topic: &str) -> Query { self.local.query(topic) } + + /// Get a list of the current peers + pub async fn peers(&self) -> Vec { + let peers = self.peers.read().await; + peers + .list + .values() + .map(|p| PeerInfo { + peer_id: p.peer_id.to_owned(), + hash: p.cache_hash, + last_seen: p.publication_timestamp, + }) + .collect() + } } #[derive(Default, Debug, Clone)] @@ -202,7 +230,12 @@ pub fn cache_channel( let (cmd, r, _j) = dht_channel(swarm); + let peers_state = Arc::new(RwLock::new(PeersState::with_interval( + resend_interval as u128, + ))); + let cache = Cache { + peers: peers_state.clone(), local: local.clone(), cmd: cmd.clone(), peer_id: local_peer_id.clone(), @@ -210,10 +243,6 @@ pub fn cache_channel( let stream = UnboundedReceiverStream::new(r); - let peers_state = Arc::new(RwLock::new(PeersState::with_interval( - resend_interval as u128, - ))); - let local_read = local.clone(); let cmd_update = cmd.clone(); let peer_id = local_peer_id.clone(); @@ -359,11 +388,18 @@ mod test { .map(|uuid| format!("uuid-{uuid}")) .collect(); - tokio::task::spawn(async move { - let (a_c, a_ev) = cache_channel(a_local_cache, a, 100); - let (_b_c, b_ev) = cache_channel(b_local_cache, b, 100); - let (_c_c, c_ev) = cache_channel(c_local_cache, c, 100); + let (a_c, a_ev) = cache_channel(a_local_cache, a, 100); + let (b_c, b_ev) = cache_channel(b_local_cache, b, 100); + let (c_c, c_ev) = cache_channel(c_local_cache, c, 100); + let mut expected_peers = [ + a_c.peer_id.clone(), + b_c.peer_id.clone(), + c_c.peer_id.clone(), + ]; + expected_peers.sort(); + + tokio::task::spawn(async move { let mut a_ev = pin!(a_ev); let mut b_ev = pin!(b_ev); let mut c_ev = pin!(c_ev); @@ -397,7 +433,7 @@ mod test { log::info!("Adding D"); - let (_d_c, d_ev) = cache_channel(d_local_cache, d, 1000); + let (d_c, d_ev) = cache_channel(d_local_cache, d, 1000); let mut d_ev = pin!(d_ev); while !expected.is_empty() { @@ -412,5 +448,11 @@ mod test { } } } + + let mut peers: Vec<_> = d_c.peers().await.into_iter().map(|p| p.peer_id).collect(); + + peers.sort(); + + assert_eq!(peers, expected_peers); } }