Skip to content

Commit

Permalink
Add a mean to access the list of peers
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zero committed Aug 18, 2023
1 parent 0deec23 commit aa23e2f
Showing 1 changed file with 51 additions and 9 deletions.
60 changes: 51 additions & 9 deletions dht-cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,23 @@ impl Builder {
pub struct Cache {
peer_id: String,
local: LocalCache,
peers: Arc<RwLock<PeersState>>,
cmd: UnboundedSender<Command>,
}

/// Information regarding the known peers
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub struct PeerInfo {
/// libp2p Identifier
pub peer_id: String,
/// Hash of its cache of the DHT
pub hash: u64,
/// Last time the peer updated its state
///
/// TODO: use a better type
pub last_seen: u128,
}

impl Cache {
/// Send a volatile message
///
Expand Down Expand Up @@ -142,6 +156,20 @@ impl Cache {
pub fn query(&self, topic: &str) -> Query {
self.local.query(topic)
}

/// Get a list of the current peers
pub async fn peers(&self) -> Vec<PeerInfo> {
let peers = self.peers.read().await;
peers
.list
.values()
.map(|p| PeerInfo {
peer_id: p.peer_id.to_owned(),
hash: p.cache_hash,
last_seen: p.publication_timestamp,
})
.collect()
}
}

#[derive(Default, Debug, Clone)]
Expand Down Expand Up @@ -202,18 +230,19 @@ pub fn cache_channel(

let (cmd, r, _j) = dht_channel(swarm);

let peers_state = Arc::new(RwLock::new(PeersState::with_interval(
resend_interval as u128,
)));

let cache = Cache {
peers: peers_state.clone(),
local: local.clone(),
cmd: cmd.clone(),
peer_id: local_peer_id.clone(),
};

let stream = UnboundedReceiverStream::new(r);

let peers_state = Arc::new(RwLock::new(PeersState::with_interval(
resend_interval as u128,
)));

let local_read = local.clone();
let cmd_update = cmd.clone();
let peer_id = local_peer_id.clone();
Expand Down Expand Up @@ -359,11 +388,18 @@ mod test {
.map(|uuid| format!("uuid-{uuid}"))
.collect();

tokio::task::spawn(async move {
let (a_c, a_ev) = cache_channel(a_local_cache, a, 100);
let (_b_c, b_ev) = cache_channel(b_local_cache, b, 100);
let (_c_c, c_ev) = cache_channel(c_local_cache, c, 100);
let (a_c, a_ev) = cache_channel(a_local_cache, a, 100);
let (b_c, b_ev) = cache_channel(b_local_cache, b, 100);
let (c_c, c_ev) = cache_channel(c_local_cache, c, 100);

let mut expected_peers = [
a_c.peer_id.clone(),
b_c.peer_id.clone(),
c_c.peer_id.clone(),
];
expected_peers.sort();

tokio::task::spawn(async move {
let mut a_ev = pin!(a_ev);
let mut b_ev = pin!(b_ev);
let mut c_ev = pin!(c_ev);
Expand Down Expand Up @@ -397,7 +433,7 @@ mod test {

log::info!("Adding D");

let (_d_c, d_ev) = cache_channel(d_local_cache, d, 1000);
let (d_c, d_ev) = cache_channel(d_local_cache, d, 1000);

let mut d_ev = pin!(d_ev);
while !expected.is_empty() {
Expand All @@ -412,5 +448,11 @@ mod test {
}
}
}

let mut peers: Vec<_> = d_c.peers().await.into_iter().map(|p| p.peer_id).collect();

peers.sort();

assert_eq!(peers, expected_peers);
}
}

0 comments on commit aa23e2f

Please sign in to comment.