Skip to content

Commit

Permalink
Signal when the nodes are available, not when they are discovered
Browse files Browse the repository at this point in the history
  • Loading branch information
lu-zero committed Aug 21, 2023
1 parent 5f79a70 commit c4000eb
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 65 deletions.
129 changes: 72 additions & 57 deletions dht-cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,30 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::domolibp2p::{self, generate_rsa_key};
use crate::{
cache::local::DomoCacheStateMessage,
data::DomoEvent,
dht::{dht_channel, Command, Event},
dht::{dht_channel, Command, Event as DhtEvent},
domolibp2p::DomoBehaviour,
utils, Error,
};

use self::local::{DomoCacheElement, LocalCache, Query};

/// DHT state change
#[derive(Debug)]

Check warning on line 27 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L27

Added line #L27 was not covered by tests
pub enum Event {
/// Persistent, structured data
///
/// The information is persisted across nodes.
/// Newly joining nodes will receive it from other participants and
/// the local cache can be queried for it.
PersistentData(DomoCacheElement),
/// Volatile, unstructured data
///
/// The information is transmitted across all the nodes participating
VolatileData(Value),
/// Notify the peer availability
ReadyPeers(Vec<String>),
}

/// Builder for a Cached DHT Node
// TODO: make it Clone
pub struct Builder {
Expand All @@ -37,9 +53,7 @@ impl Builder {
}

/// Instantiate a new DHT node a return
pub async fn make_channel(
self,
) -> Result<(Cache, impl Stream<Item = DomoEvent>), crate::Error> {
pub async fn make_channel(self) -> Result<(Cache, impl Stream<Item = Event>), crate::Error> {
let loopback_only = self.cfg.loopback;
let shared_key = domolibp2p::parse_hex_key(&self.cfg.shared_key)?;
let private_key_file = self.cfg.private_key.as_ref();
Expand Down Expand Up @@ -239,7 +253,7 @@ pub fn cache_channel(
local: LocalCache,
swarm: Swarm<DomoBehaviour>,
resend_interval: u64,
) -> (Cache, impl Stream<Item = DomoEvent>) {
) -> (Cache, impl Stream<Item = Event>) {
let local_peer_id = swarm.local_peer_id().to_string();

let (cmd, r, _j) = dht_channel(swarm);
Expand Down Expand Up @@ -289,7 +303,7 @@ pub fn cache_channel(
let cmd = cmd.clone();
async move {
match ev {
Event::Config(cfg) => {
DhtEvent::Config(cfg) => {
let m: DomoCacheStateMessage = serde_json::from_str(&cfg).unwrap();

let hash = local_write.get_hash().await;
Expand Down Expand Up @@ -338,16 +352,16 @@ pub fn cache_channel(

None
}
Event::Discovered(who) => Some(DomoEvent::NewPeers(
DhtEvent::Discovered(_who) => None /* Some(DomoEvent::NewPeers(
who.into_iter().map(|w| w.to_string()).collect(),
)),
Event::VolatileData(data) => {
))*/,
DhtEvent::VolatileData(data) => {
// TODO we swallow errors quietly here
serde_json::from_str(&data)
.ok()
.map(DomoEvent::VolatileData)
.map(Event::VolatileData)

Check warning on line 362 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L358-L362

Added lines #L358 - L362 were not covered by tests
}
Event::PersistentData(data) => {
DhtEvent::PersistentData(data) => {
if let Ok(mut elem) = serde_json::from_str::<DomoCacheElement>(&data) {
if elem.republication_timestamp != 0 {
log::debug!("Retransmission");

Check warning on line 367 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L367

Added line #L367 was not covered by tests
Expand All @@ -358,7 +372,15 @@ pub fn cache_channel(
.try_put(&elem)
.await

Check warning on line 373 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L373

Added line #L373 was not covered by tests
.ok()
.map(|_| DomoEvent::PersistentData(elem))
.map(|_| Event::PersistentData(elem))
} else {
None

Check warning on line 377 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L377

Added line #L377 was not covered by tests
}
}
DhtEvent::Ready(peers) => {
if !peers.is_empty() {
Some(Event::ReadyPeers(
peers.into_iter().map(|p| p.to_string()).collect()))
} else {
None

Check warning on line 385 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L385

Added line #L385 was not covered by tests
}
Expand Down Expand Up @@ -400,75 +422,68 @@ mod test {
let a_local_cache = LocalCache::new();
let b_local_cache = LocalCache::new();
let c_local_cache = LocalCache::new();
let d_local_cache = LocalCache::new();

let mut expected: HashSet<_> = (0..10)
.into_iter()
.map(|uuid| format!("uuid-{uuid}"))
.collect();

let (a_c, a_ev) = cache_channel(a_local_cache, a, 100);
let (b_c, b_ev) = cache_channel(b_local_cache, b, 100);
let (c_c, c_ev) = cache_channel(c_local_cache, c, 100);
let (a_c, a_ev) = cache_channel(a_local_cache, a, 5000);
let (b_c, b_ev) = cache_channel(b_local_cache, b, 5000);
let (c_c, c_ev) = cache_channel(c_local_cache, c, 5000);

let mut expected_peers = HashSet::new();
expected_peers.insert(a_c.peer_id.clone());
expected_peers.insert(b_c.peer_id.clone());
expected_peers.insert(c_c.peer_id.clone());

tokio::task::spawn(async move {
let a_ev = pin!(a_ev);
let b_ev = pin!(b_ev);
let c_ev = pin!(c_ev);
for uuid in 0..10 {
let _ = a_c
.put(
"Topic",
&format!("uuid-{uuid}"),
serde_json::json!({"key": uuid}),
)
.await;
}
let mut a_ev = pin!(a_ev);
let b_ev = pin!(b_ev);
let c_ev = pin!(c_ev);

let mut s = (
a_ev.map(|ev| ("a", ev)),
b_ev.map(|ev| ("b", ev)),
c_ev.map(|ev| ("c", ev)),
)
.merge();

while let Some((node, ev)) = s.next().await {
match ev {
DomoEvent::PersistentData(data) => {
log::debug!("{node}: Got data {data:?}");
}
_ => {
log::debug!("{node}: Other {ev:?}");
}
while let Some(ev) = a_ev.next().await {
match ev {
Event::ReadyPeers(peers) => {
log::info!("Ready peers {peers:?}");
break;
}
_ => log::debug!("waiting for ready {ev:?}"),

Check warning on line 450 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L450

Added line #L450 was not covered by tests
}
});

log::info!("Adding D");
}

let (d_c, d_ev) = cache_channel(d_local_cache, d, 100);
for uuid in 0..10 {
let _ = a_c
.put(
"Topic",
&format!("uuid-{uuid}"),
serde_json::json!({"key": uuid}),
)
.await;

Check warning on line 461 in dht-cache/src/cache.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/cache.rs#L461

Added line #L461 was not covered by tests
}
let mut s = (
a_ev.map(|ev| ("a", ev)),
b_ev.map(|ev| ("b", ev)),
c_ev.map(|ev| ("c", ev)),
)
.merge();

let mut d_ev = pin!(d_ev);
while !expected.is_empty() {
let ev = d_ev.next().await.unwrap();
let (node, ev) = s.next().await.unwrap();
match ev {
DomoEvent::PersistentData(data) => {
assert!(expected.remove(&data.topic_uuid));
log::warn!("d: Got data {data:?}");
Event::PersistentData(data) => {
log::debug!("{node}: Got data {data:?}");
if node == "c" {
assert!(expected.remove(&data.topic_uuid));
}
}
_ => {
log::warn!("d: Other {ev:?}");
log::debug!("{node}: Other {ev:?}");
}
}
}

// d_c must had seen at least one of the expected peers
let peers: HashSet<_> = d_c.peers().await.into_iter().map(|p| p.peer_id).collect();
// c_c must had seen at least one of the expected peers
let peers: HashSet<_> = c_c.peers().await.into_iter().map(|p| p.peer_id).collect();

log::info!("peers {peers:?}");

Expand Down
64 changes: 56 additions & 8 deletions dht-cache/src/dht.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! DHT Abstraction
//!

use std::time::Duration;

use crate::domolibp2p::{DomoBehaviour, OutEvent};
use futures::prelude::*;
use libp2p::{gossipsub::IdentTopic as Topic, swarm::SwarmEvent, Swarm};
Expand All @@ -26,6 +28,7 @@ pub enum Event {
VolatileData(String),
Config(String),
Discovered(Vec<PeerId>),
Ready(Vec<PeerId>),
}

fn handle_command(swarm: &mut Swarm<DomoBehaviour>, cmd: Command) -> bool {
Expand All @@ -36,7 +39,7 @@ fn handle_command(swarm: &mut Swarm<DomoBehaviour>, cmd: Command) -> bool {
let m = serde_json::to_string(&val).unwrap();

if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, m.as_bytes()) {
log::info!("Publish error: {e:?}");
log::info!("Boradcast error: {e:?}");

Check warning on line 42 in dht-cache/src/dht.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/dht.rs#L42

Added line #L42 was not covered by tests
}
true
}
Expand All @@ -57,7 +60,7 @@ fn handle_command(swarm: &mut Swarm<DomoBehaviour>, cmd: Command) -> bool {
let topic = Topic::new("domo-config");
let m = serde_json::to_string(&val).unwrap();
if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, m.as_bytes()) {
log::info!("Publish error: {e:?}");
log::info!("Config error: {e:?}");
}
true
}
Expand Down Expand Up @@ -117,6 +120,11 @@ fn handle_swarm_event<E>(
}
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub(
libp2p::gossipsub::Event::Subscribed { peer_id, topic },
)) => {
log::debug!("Peer {peer_id} subscribed to {}", topic.as_str());
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(mdns::Event::Expired(list))) => {
let local = OffsetDateTime::now_utc();

Check warning on line 129 in dht-cache/src/dht.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/dht.rs#L128-L129

Added lines #L128 - L129 were not covered by tests

Expand Down Expand Up @@ -154,16 +162,37 @@ pub fn dht_channel(
let (ev_send, ev_recv) = mpsc::unbounded_channel();

let handle = tokio::task::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
let volatile = Topic::new("domo-volatile-data").hash();
let persistent = Topic::new("domo-persistent-data").hash();
let config = Topic::new("domo-config").hash();
loop {
log::trace!("Looping {}", swarm.local_peer_id());
tokio::select! {
// the mdns event is not enough to ensure we can send messages
_ = interval.tick() => {
log::debug!("{} Checking for peers", swarm.local_peer_id());
let peers: Vec<_> = swarm.behaviour_mut().gossipsub.all_peers().filter_map(|(p, topics)| {
log::info!("{p}, {topics:?}");
(topics.contains(&&volatile) &&
topics.contains(&&persistent) &&
topics.contains(&&config)).then(
||p.to_owned())
}).collect();
if !peers.is_empty() &&
ev_send.send(Event::Ready(peers)).is_err() {
return swarm;
}
}
cmd = cmd_recv.recv() => {
log::debug!("command {cmd:?}");
log::trace!("command {cmd:?}");
if !cmd.is_some_and(|cmd| handle_command(&mut swarm, cmd)) {
log::debug!("Exiting cmd");
return swarm
}
}
ev = swarm.select_next_some() => {
log::trace!("event {ev:?}");
if handle_swarm_event(&mut swarm, ev, &ev_send).is_err() {
log::debug!("Exiting ev");
return swarm
Expand Down Expand Up @@ -217,10 +246,9 @@ pub(crate) mod test {
}

pub async fn make_peer(variant: u8) -> Swarm<DomoBehaviour> {
let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
a.listen().await;

a
let mut swarm = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
swarm.listen().await;
swarm
}

pub async fn connect_peer(a: &mut Swarm<DomoBehaviour>, b: &mut Swarm<DomoBehaviour>) {
Expand All @@ -235,10 +263,15 @@ pub(crate) mod test {

pub async fn make_peers(variant: u8) -> [Swarm<DomoBehaviour>; 3] {
let _ = env_logger::builder().is_test(true).try_init();

let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
let mut b = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);
let mut c = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant);

/*
let mut a = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap());
let mut b = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap());
let mut c = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap());
*/
for a in a.external_addresses() {
log::info!("{a:?}");

Check warning on line 276 in dht-cache/src/dht.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/dht.rs#L276

Added line #L276 was not covered by tests
}
Expand All @@ -251,8 +284,14 @@ pub(crate) mod test {
b.connect(&mut c).await;
c.connect(&mut a).await;

println!("a {}", a.local_peer_id());
println!("b {}", b.local_peer_id());
println!("c {}", c.local_peer_id());

let peers: Vec<_> = a.connected_peers().cloned().collect();

log::info!("Peers {peers:#?}");

Check warning on line 293 in dht-cache/src/dht.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/dht.rs#L293

Added line #L293 was not covered by tests

for peer in peers {
a.behaviour_mut().gossipsub.add_explicit_peer(&peer);
}
Expand Down Expand Up @@ -280,6 +319,8 @@ pub(crate) mod test {
let (b_s, br, _) = dht_channel(b);
let (c_s, cr, _) = dht_channel(c);

log::info!("Waiting for peers");

// Wait until peers are discovered
while let Some(ev) = ar.recv().await {
match ev {
Expand All @@ -288,6 +329,9 @@ pub(crate) mod test {
Event::Config(cfg) => log::info!("config {cfg}"),

Check warning on line 329 in dht-cache/src/dht.rs

View check run for this annotation

Codecov / codecov/patch

dht-cache/src/dht.rs#L327-L329

Added lines #L327 - L329 were not covered by tests
Event::Discovered(peers) => {
log::info!("found peers: {peers:?}");
}
Event::Ready(peers) => {
log::info!("ready peers: {peers:?}");
break;
}
}
Expand All @@ -297,6 +341,7 @@ pub(crate) mod test {

a_s.send(Command::Broadcast(msg.clone())).unwrap();

log::info!("Sent volatile");
for r in [br, cr].iter_mut() {
while let Some(ev) = r.recv().await {
match ev {
Expand All @@ -311,6 +356,9 @@ pub(crate) mod test {
Event::Discovered(peers) => {
log::info!("found peers: {peers:?}");
}
Event::Ready(peers) => {
log::info!("peers ready: {peers:?}");
}
}
}
}
Expand Down

0 comments on commit c4000eb

Please sign in to comment.