diff --git a/dht-cache/src/cache.rs b/dht-cache/src/cache.rs index 2557a53..ff16f62 100644 --- a/dht-cache/src/cache.rs +++ b/dht-cache/src/cache.rs @@ -16,14 +16,30 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use crate::domolibp2p::{self, generate_rsa_key}; use crate::{ cache::local::DomoCacheStateMessage, - data::DomoEvent, - dht::{dht_channel, Command, Event}, + dht::{dht_channel, Command, Event as DhtEvent}, domolibp2p::DomoBehaviour, utils, Error, }; use self::local::{DomoCacheElement, LocalCache, Query}; +/// DHT state change +#[derive(Debug)] +pub enum Event { + /// Persistent, structured data + /// + /// The information is persisted across nodes. + /// Newly joining nodes will receive it from other participants and + /// the local cache can be queried for it. + PersistentData(DomoCacheElement), + /// Volatile, unstructured data + /// + /// The information is transmitted across all the nodes participating + VolatileData(Value), + /// Notify the peer availability + ReadyPeers(Vec), +} + /// Builder for a Cached DHT Node // TODO: make it Clone pub struct Builder { @@ -37,9 +53,7 @@ impl Builder { } /// Instantiate a new DHT node a return - pub async fn make_channel( - self, - ) -> Result<(Cache, impl Stream), crate::Error> { + pub async fn make_channel(self) -> Result<(Cache, impl Stream), crate::Error> { let loopback_only = self.cfg.loopback; let shared_key = domolibp2p::parse_hex_key(&self.cfg.shared_key)?; let private_key_file = self.cfg.private_key.as_ref(); @@ -239,7 +253,7 @@ pub fn cache_channel( local: LocalCache, swarm: Swarm, resend_interval: u64, -) -> (Cache, impl Stream) { +) -> (Cache, impl Stream) { let local_peer_id = swarm.local_peer_id().to_string(); let (cmd, r, _j) = dht_channel(swarm); @@ -289,7 +303,7 @@ pub fn cache_channel( let cmd = cmd.clone(); async move { match ev { - Event::Config(cfg) => { + DhtEvent::Config(cfg) => { let m: DomoCacheStateMessage = serde_json::from_str(&cfg).unwrap(); let hash = local_write.get_hash().await; @@ -338,16 +352,16 @@ pub fn cache_channel( None } - Event::Discovered(who) => Some(DomoEvent::NewPeers( + DhtEvent::Discovered(_who) => None /* Some(DomoEvent::NewPeers( who.into_iter().map(|w| w.to_string()).collect(), - )), - Event::VolatileData(data) => { + ))*/, + DhtEvent::VolatileData(data) => { // TODO we swallow errors quietly here serde_json::from_str(&data) .ok() - .map(DomoEvent::VolatileData) + .map(Event::VolatileData) } - Event::PersistentData(data) => { + DhtEvent::PersistentData(data) => { if let Ok(mut elem) = serde_json::from_str::(&data) { if elem.republication_timestamp != 0 { log::debug!("Retransmission"); @@ -358,7 +372,15 @@ pub fn cache_channel( .try_put(&elem) .await .ok() - .map(|_| DomoEvent::PersistentData(elem)) + .map(|_| Event::PersistentData(elem)) + } else { + None + } + } + DhtEvent::Ready(peers) => { + if !peers.is_empty() { + Some(Event::ReadyPeers( + peers.into_iter().map(|p| p.to_string()).collect())) } else { None } @@ -400,75 +422,68 @@ mod test { let a_local_cache = LocalCache::new(); let b_local_cache = LocalCache::new(); let c_local_cache = LocalCache::new(); - let d_local_cache = LocalCache::new(); let mut expected: HashSet<_> = (0..10) .into_iter() .map(|uuid| format!("uuid-{uuid}")) .collect(); - let (a_c, a_ev) = cache_channel(a_local_cache, a, 100); - let (b_c, b_ev) = cache_channel(b_local_cache, b, 100); - let (c_c, c_ev) = cache_channel(c_local_cache, c, 100); + let (a_c, a_ev) = cache_channel(a_local_cache, a, 5000); + let (b_c, b_ev) = cache_channel(b_local_cache, b, 5000); + let (c_c, c_ev) = cache_channel(c_local_cache, c, 5000); let mut expected_peers = HashSet::new(); expected_peers.insert(a_c.peer_id.clone()); expected_peers.insert(b_c.peer_id.clone()); expected_peers.insert(c_c.peer_id.clone()); - tokio::task::spawn(async move { - let a_ev = pin!(a_ev); - let b_ev = pin!(b_ev); - let c_ev = pin!(c_ev); - for uuid in 0..10 { - let _ = a_c - .put( - "Topic", - &format!("uuid-{uuid}"), - serde_json::json!({"key": uuid}), - ) - .await; - } + let mut a_ev = pin!(a_ev); + let b_ev = pin!(b_ev); + let c_ev = pin!(c_ev); - let mut s = ( - a_ev.map(|ev| ("a", ev)), - b_ev.map(|ev| ("b", ev)), - c_ev.map(|ev| ("c", ev)), - ) - .merge(); - - while let Some((node, ev)) = s.next().await { - match ev { - DomoEvent::PersistentData(data) => { - log::debug!("{node}: Got data {data:?}"); - } - _ => { - log::debug!("{node}: Other {ev:?}"); - } + while let Some(ev) = a_ev.next().await { + match ev { + Event::ReadyPeers(peers) => { + log::info!("Ready peers {peers:?}"); + break; } + _ => log::debug!("waiting for ready {ev:?}"), } - }); - - log::info!("Adding D"); + } - let (d_c, d_ev) = cache_channel(d_local_cache, d, 100); + for uuid in 0..10 { + let _ = a_c + .put( + "Topic", + &format!("uuid-{uuid}"), + serde_json::json!({"key": uuid}), + ) + .await; + } + let mut s = ( + a_ev.map(|ev| ("a", ev)), + b_ev.map(|ev| ("b", ev)), + c_ev.map(|ev| ("c", ev)), + ) + .merge(); - let mut d_ev = pin!(d_ev); while !expected.is_empty() { - let ev = d_ev.next().await.unwrap(); + let (node, ev) = s.next().await.unwrap(); match ev { - DomoEvent::PersistentData(data) => { - assert!(expected.remove(&data.topic_uuid)); - log::warn!("d: Got data {data:?}"); + Event::PersistentData(data) => { + log::debug!("{node}: Got data {data:?}"); + if node == "c" { + assert!(expected.remove(&data.topic_uuid)); + } } _ => { - log::warn!("d: Other {ev:?}"); + log::debug!("{node}: Other {ev:?}"); } } } - // d_c must had seen at least one of the expected peers - let peers: HashSet<_> = d_c.peers().await.into_iter().map(|p| p.peer_id).collect(); + // c_c must had seen at least one of the expected peers + let peers: HashSet<_> = c_c.peers().await.into_iter().map(|p| p.peer_id).collect(); log::info!("peers {peers:?}"); diff --git a/dht-cache/src/dht.rs b/dht-cache/src/dht.rs index 6d29c6c..5416722 100644 --- a/dht-cache/src/dht.rs +++ b/dht-cache/src/dht.rs @@ -1,6 +1,8 @@ //! DHT Abstraction //! +use std::time::Duration; + use crate::domolibp2p::{DomoBehaviour, OutEvent}; use futures::prelude::*; use libp2p::{gossipsub::IdentTopic as Topic, swarm::SwarmEvent, Swarm}; @@ -26,6 +28,7 @@ pub enum Event { VolatileData(String), Config(String), Discovered(Vec), + Ready(Vec), } fn handle_command(swarm: &mut Swarm, cmd: Command) -> bool { @@ -36,7 +39,7 @@ fn handle_command(swarm: &mut Swarm, cmd: Command) -> bool { let m = serde_json::to_string(&val).unwrap(); if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, m.as_bytes()) { - log::info!("Publish error: {e:?}"); + log::info!("Boradcast error: {e:?}"); } true } @@ -57,7 +60,7 @@ fn handle_command(swarm: &mut Swarm, cmd: Command) -> bool { let topic = Topic::new("domo-config"); let m = serde_json::to_string(&val).unwrap(); if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, m.as_bytes()) { - log::info!("Publish error: {e:?}"); + log::info!("Config error: {e:?}"); } true } @@ -117,6 +120,11 @@ fn handle_swarm_event( } } } + SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub( + libp2p::gossipsub::Event::Subscribed { peer_id, topic }, + )) => { + log::debug!("Peer {peer_id} subscribed to {}", topic.as_str()); + } SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(mdns::Event::Expired(list))) => { let local = OffsetDateTime::now_utc(); @@ -154,16 +162,37 @@ pub fn dht_channel( let (ev_send, ev_recv) = mpsc::unbounded_channel(); let handle = tokio::task::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let volatile = Topic::new("domo-volatile-data").hash(); + let persistent = Topic::new("domo-persistent-data").hash(); + let config = Topic::new("domo-config").hash(); loop { + log::trace!("Looping {}", swarm.local_peer_id()); tokio::select! { + // the mdns event is not enough to ensure we can send messages + _ = interval.tick() => { + log::debug!("{} Checking for peers", swarm.local_peer_id()); + let peers: Vec<_> = swarm.behaviour_mut().gossipsub.all_peers().filter_map(|(p, topics)| { + log::info!("{p}, {topics:?}"); + (topics.contains(&&volatile) && + topics.contains(&&persistent) && + topics.contains(&&config)).then( + ||p.to_owned()) + }).collect(); + if !peers.is_empty() && + ev_send.send(Event::Ready(peers)).is_err() { + return swarm; + } + } cmd = cmd_recv.recv() => { - log::debug!("command {cmd:?}"); + log::trace!("command {cmd:?}"); if !cmd.is_some_and(|cmd| handle_command(&mut swarm, cmd)) { log::debug!("Exiting cmd"); return swarm } } ev = swarm.select_next_some() => { + log::trace!("event {ev:?}"); if handle_swarm_event(&mut swarm, ev, &ev_send).is_err() { log::debug!("Exiting ev"); return swarm @@ -217,10 +246,9 @@ pub(crate) mod test { } pub async fn make_peer(variant: u8) -> Swarm { - let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); - a.listen().await; - - a + let mut swarm = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); + swarm.listen().await; + swarm } pub async fn connect_peer(a: &mut Swarm, b: &mut Swarm) { @@ -235,10 +263,15 @@ pub(crate) mod test { pub async fn make_peers(variant: u8) -> [Swarm; 3] { let _ = env_logger::builder().is_test(true).try_init(); + let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); let mut b = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); let mut c = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); - + /* + let mut a = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap()); + let mut b = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap()); + let mut c = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap()); + */ for a in a.external_addresses() { log::info!("{a:?}"); } @@ -251,8 +284,14 @@ pub(crate) mod test { b.connect(&mut c).await; c.connect(&mut a).await; + println!("a {}", a.local_peer_id()); + println!("b {}", b.local_peer_id()); + println!("c {}", c.local_peer_id()); + let peers: Vec<_> = a.connected_peers().cloned().collect(); + log::info!("Peers {peers:#?}"); + for peer in peers { a.behaviour_mut().gossipsub.add_explicit_peer(&peer); } @@ -280,6 +319,8 @@ pub(crate) mod test { let (b_s, br, _) = dht_channel(b); let (c_s, cr, _) = dht_channel(c); + log::info!("Waiting for peers"); + // Wait until peers are discovered while let Some(ev) = ar.recv().await { match ev { @@ -288,6 +329,9 @@ pub(crate) mod test { Event::Config(cfg) => log::info!("config {cfg}"), Event::Discovered(peers) => { log::info!("found peers: {peers:?}"); + } + Event::Ready(peers) => { + log::info!("ready peers: {peers:?}"); break; } } @@ -297,6 +341,7 @@ pub(crate) mod test { a_s.send(Command::Broadcast(msg.clone())).unwrap(); + log::info!("Sent volatile"); for r in [br, cr].iter_mut() { while let Some(ev) = r.recv().await { match ev { @@ -311,6 +356,9 @@ pub(crate) mod test { Event::Discovered(peers) => { log::info!("found peers: {peers:?}"); } + Event::Ready(peers) => { + log::info!("peers ready: {peers:?}"); + } } } }