diff --git a/.github/workflows/libp2p-rust-dht.yml b/.github/workflows/libp2p-rust-dht.yml index 5214038..55c891c 100644 --- a/.github/workflows/libp2p-rust-dht.yml +++ b/.github/workflows/libp2p-rust-dht.yml @@ -144,6 +144,19 @@ jobs: runs-on: ubuntu-latest + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: mysecretpassword + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + steps: - uses: actions/checkout@v3 @@ -179,7 +192,7 @@ jobs: RUSTFLAGS: "-Cinstrument-coverage" LLVM_PROFILE_FILE: "libp2p-rust-dht-%p-%m.profraw" run: | - cargo test --verbose + cargo test --verbose --all - name: Get coverage data for codecov run: | @@ -220,34 +233,6 @@ jobs: exit 1 fi - weighted-code-coverage: - - needs: [build, docs] - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v3 - - - name: Install protobuf compiler - run: | - sudo apt-get update - sudo apt-get install protobuf-compiler - - - name: Install Rust stable - uses: dtolnay/rust-toolchain@stable - with: - toolchain: stable - - - name: Install grcov - env: - GRCOV_LINK: https://github.com/mozilla/grcov/releases/download - GRCOV_VERSION: v0.8.13 - GRCOV_BINARY: grcov-x86_64-unknown-linux-musl.tar.bz2 - run: | - curl -L "$GRCOV_LINK/$GRCOV_VERSION/$GRCOV_BINARY" | - tar xj -C $HOME/.cargo/bin - - name: Install weighted-code-coverage env: WCC_LINK: https://github.com/SoftengPoliTo/weighted-code-coverage/releases/download @@ -257,23 +242,7 @@ jobs: curl -L "$WCC_LINK/$WCC_VERSION/$WCC_BINARY" | tar xz -C $HOME/.cargo/bin - - name: Install llvm-tools-preview - run: | - rustup component add llvm-tools-preview - - # Not necessary on a newly created image, but strictly advised - - name: Run cargo clean - run: | - cargo clean - - - name: Run tests - env: - RUSTFLAGS: "-Cinstrument-coverage" - LLVM_PROFILE_FILE: "libp2p-rust-dht-%p-%m.profraw" - run: | - cargo test --verbose - - - name: Run grcov + - name: Run grcov to produce a coveralls json run: | grcov . --binary-path ./target/debug/ -t coveralls -s . --token YOUR_COVERALLS_TOKEN > coveralls.json @@ -292,7 +261,7 @@ jobs: audit: - needs: [code-coverage, weighted-code-coverage] + needs: [code-coverage] runs-on: ubuntu-latest @@ -322,7 +291,7 @@ jobs: deny: - needs: [code-coverage, weighted-code-coverage] + needs: [code-coverage] runs-on: ubuntu-latest @@ -368,7 +337,7 @@ jobs: udeps: - needs: [code-coverage, weighted-code-coverage] + needs: [code-coverage] runs-on: ubuntu-latest @@ -446,39 +415,6 @@ jobs: ################################## UNSAFE CHECKS LEVEL ######################### - valgrind: - - needs: cache-level - - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v3 - - - name: Install protobuf compiler - run: | - sudo apt-get update - sudo apt-get install protobuf-compiler - - - name: Install valgrind - run: | - sudo apt-get install valgrind - - - name: Install cargo-valgrind - env: - VALGRIND_LINK: https://github.com/jfrimmel/cargo-valgrind/releases/download - VALGRIND_VERSION: 2.1.0 - run: | - curl -L "$VALGRIND_LINK/v$VALGRIND_VERSION/cargo-valgrind-$VALGRIND_VERSION-x86_64-unknown-linux-musl.tar.gz" | - tar xz -C $HOME/.cargo/bin - - # Usage of the `help` command as base command, please replace it - # with the effective command that valgrind has to analyze - - name: Run cargo-valgrind - run: | - cargo valgrind run -- --help - # cargo valgrind test - careful: needs: cache-level diff --git a/dht-cache/Cargo.toml b/dht-cache/Cargo.toml index 1098d70..0b61385 100644 --- a/dht-cache/Cargo.toml +++ b/dht-cache/Cargo.toml @@ -24,11 +24,19 @@ time = "0.3.17" tokio = { version = "1.19.0", features = ["full"] } url = "2.2.2" rsa = "0.9" -pem-rfc7468 = { version = "0.7", features = ["alloc"] } +pem-rfc7468 = { version = "0.7", features = ["std"] } sifis-config = { path = "../dht-config" } openssl-sys = "*" libsqlite3-sys = "*" +thiserror = "1.0.43" +anyhow = "1.0.72" +tokio-stream = "0.1.14" +futures-concurrency = "7.4.1" +[dev-dependencies] +env_logger = "0.10.0" +libp2p-swarm-test = "0.2.0" +libp2p = { version = "0.52.0", features = ["plaintext"] } [package.metadata.cargo-udeps.ignore] normal = ["openssl-sys", "libsqlite3-sys", "libc"] diff --git a/dht-cache/src/cache.rs b/dht-cache/src/cache.rs new file mode 100644 index 0000000..43706eb --- /dev/null +++ b/dht-cache/src/cache.rs @@ -0,0 +1,495 @@ +//! Cached access to the DHT + +mod local; + +use std::sync::Arc; +use std::{collections::BTreeMap, time::Duration}; + +use futures_util::{Stream, StreamExt}; +use libp2p::Swarm; +use serde_json::Value; +use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::RwLock; +use tokio::time; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::domolibp2p::{self, generate_rsa_key}; +use crate::{ + cache::local::DomoCacheStateMessage, + dht::{dht_channel, Command, Event as DhtEvent}, + domolibp2p::DomoBehaviour, + utils, Error, +}; + +use self::local::DomoCacheElement; +pub use self::local::{LocalCache, Query, QueryGet, QueryGetIter}; + +/// DHT state change +#[derive(Debug)] +pub enum Event { + /// Persistent, structured data + /// + /// The information is persisted across nodes. + /// Newly joining nodes will receive it from other participants and + /// the local cache can be queried for it. + PersistentData(DomoCacheElement), + /// Volatile, unstructured data + /// + /// The information is transmitted across all the nodes participating + VolatileData(Value), + /// Notify the peer availability + ReadyPeers(Vec), +} + +/// Builder for a Cached DHT Node +// TODO: make it Clone +pub struct Builder { + cfg: crate::Config, +} + +impl Builder { + /// Create a new Builder from a [crate::Config] + pub fn from_config(cfg: crate::Config) -> Builder { + Builder { cfg } + } + + /// Instantiate a new DHT node a return + pub async fn make_channel(self) -> Result<(Cache, impl Stream), crate::Error> { + let loopback_only = self.cfg.loopback; + let shared_key = domolibp2p::parse_hex_key(&self.cfg.shared_key)?; + let private_key_file = self.cfg.private_key.as_ref(); + + // Create a random local key. + let mut pkcs8_der = if let Some(pk_path) = private_key_file { + match std::fs::read(pk_path) { + Ok(pem) => { + let der = pem_rfc7468::decode_vec(&pem)?; + der.1 + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // Generate a new key and put it into the file at the given path + let (pem, der) = generate_rsa_key(); + std::fs::write(pk_path, pem)?; + der + } + Err(e) => return Err(e.into()), + } + } else { + generate_rsa_key().1 + }; + + let local_key_pair = crate::Keypair::rsa_from_pkcs8(&mut pkcs8_der)?; + let swarm = domolibp2p::start(shared_key, local_key_pair, loopback_only).await?; + + let local = LocalCache::with_config(&self.cfg).await; + // TODO: add a configuration item for the resend interval + Ok(cache_channel(local, swarm, 1000)) + } +} + +/// Cached DHT +/// +/// It keeps a local cache of the dht state and allow to query the persistent topics +#[derive(Clone)] +pub struct Cache { + peer_id: String, + local: LocalCache, + peers: Arc>, + cmd: UnboundedSender, +} + +/// Information regarding the known peers +#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)] +pub struct PeerInfo { + /// libp2p Identifier + pub peer_id: String, + /// Hash of its cache of the DHT + pub hash: u64, + /// Last time the peer updated its state + /// + /// TODO: use a better type + pub last_seen: u128, +} + +impl Cache { + /// Send a volatile message + /// + /// Volatile messages are unstructured and do not persist in the DHT. + pub fn send(&self, value: Value) -> Result<(), Error> { + self.cmd + .send(Command::Broadcast(value.to_owned())) + .map_err(|_| Error::Channel)?; + + Ok(()) + } + + /// Persist a value within the DHT + /// + /// It is identified by the topic and uuid value + pub async fn put( + &self, + topic: impl Into, + uuid: impl Into, + value: Value, + ) -> Result<(), Error> { + let elem = DomoCacheElement { + topic_name: topic.into(), + topic_uuid: uuid.into(), + value, + publication_timestamp: utils::get_epoch_ms(), + publisher_peer_id: self.peer_id.clone(), + ..Default::default() + }; + + self.cmd + .send(Command::Publish(serde_json::to_value(&elem)?)) + .map_err(|_| Error::Channel)?; + + self.local.put(elem).await; + + Ok(()) + } + + /// Delete a value within the DHT + /// + /// It inserts the deletion entry and the entry value will be marked as deleted and removed + /// from the stored cache. + pub async fn del( + &self, + topic: impl Into, + uuid: impl Into, + ) -> Result<(), Error> { + let elem = DomoCacheElement { + topic_name: topic.into(), + topic_uuid: uuid.into(), + publication_timestamp: utils::get_epoch_ms(), + publisher_peer_id: self.peer_id.clone(), + deleted: true, + ..Default::default() + }; + + self.cmd + .send(Command::Publish(serde_json::to_value(&elem)?)) + .map_err(|_| Error::Channel)?; + + self.local.put(elem).await; + + Ok(()) + } + + /// Query the local cache + #[must_use] + pub fn query<'a>(&'a self, topic: &'a str) -> Query<'a> { + self.local.query(topic) + } + + /// Get a list of the current peers + pub async fn peers(&self) -> Vec { + let peers = self.peers.read().await; + peers + .list + .values() + .map(|p| PeerInfo { + peer_id: p.peer_id.to_owned(), + hash: p.cache_hash, + last_seen: p.publication_timestamp, + }) + .collect() + } + + /// Return the current cache hash + pub async fn get_hash(&self) -> u64 { + self.local.get_hash().await + } +} + +#[derive(Default, Debug, Clone)] +pub(crate) struct PeersState { + list: BTreeMap, + last_repub_timestamp: u128, + repub_interval: u128, +} + +#[derive(Debug)] +enum CacheState { + Synced, + Desynced { is_leader: bool }, +} + +impl PeersState { + fn with_interval(repub_interval: u128) -> Self { + Self { + repub_interval, + ..Default::default() + } + } + + fn insert(&mut self, state: DomoCacheStateMessage) { + self.list.insert(state.peer_id.to_string(), state); + } + + fn is_synchronized(&self, peer_id: &str, hash: u64) -> CacheState { + let cur_ts = utils::get_epoch_ms() - self.repub_interval; + let desync = self + .list + .values() + .any(|data| data.cache_hash != hash && data.publication_timestamp > cur_ts); + + if desync { + CacheState::Desynced { + is_leader: !self.list.values().any(|data| { + data.cache_hash == hash + && data.peer_id.as_str() < peer_id + && data.publication_timestamp > cur_ts + }), + } + } else { + CacheState::Synced + } + } +} + +/// Join the dht and keep a local cache up to date +/// +/// the resend interval is expressed in milliseconds +pub fn cache_channel( + local: LocalCache, + swarm: Swarm, + resend_interval: u64, +) -> (Cache, impl Stream) { + let local_peer_id = swarm.local_peer_id().to_string(); + + let (cmd, r, _j) = dht_channel(swarm); + + let peers_state = Arc::new(RwLock::new(PeersState::with_interval( + resend_interval as u128, + ))); + + let cache = Cache { + peers: peers_state.clone(), + local: local.clone(), + cmd: cmd.clone(), + peer_id: local_peer_id.clone(), + }; + + let stream = UnboundedReceiverStream::new(r); + + let local_read = local.clone(); + let cmd_update = cmd.clone(); + let peer_id = local_peer_id.clone(); + + tokio::task::spawn(async move { + let mut interval = time::interval(Duration::from_millis(resend_interval.max(100))); + while !cmd_update.is_closed() { + interval.tick().await; + let hash = local_read.get_hash().await; + let m = DomoCacheStateMessage { + peer_id: peer_id.clone(), + cache_hash: hash, + publication_timestamp: utils::get_epoch_ms(), + }; + + if cmd_update + .send(Command::Config(serde_json::to_value(&m).unwrap())) + .is_err() + { + break; + } + } + }); + + // TODO: refactor once async closures are stable + let events = stream.filter_map(move |ev| { + let local_write = local.clone(); + let peers_state = peers_state.clone(); + let peer_id = local_peer_id.clone(); + let cmd = cmd.clone(); + async move { + match ev { + DhtEvent::Config(cfg) => { + let m: DomoCacheStateMessage = serde_json::from_str(&cfg).unwrap(); + + let hash = local_write.get_hash().await; + + let republish = { + let mut peers_state = peers_state.write().await; + + // update the peers_caches_state + peers_state.insert(m); + + // check for desync + let sync_info = peers_state.is_synchronized(&peer_id, hash); + + log::debug!("local {peer_id:?} {sync_info:?} -> {peers_state:#?}"); + + if let CacheState::Desynced { is_leader } = sync_info { + is_leader + && utils::get_epoch_ms() - peers_state.last_repub_timestamp + >= peers_state.repub_interval + } else { + false + } + }; + + // republish the local cache if needed + if republish { + local_write + .read_owned() + .await + .mem + .values() + .flat_map(|topic| topic.values()) + .for_each(|elem| { + let mut elem = elem.to_owned(); + log::debug!("resending {}", elem.topic_uuid); + elem.republication_timestamp = utils::get_epoch_ms(); + + // This cannot fail because `cmd` is the sender part of the + // `stream` we are currently reading. In practice, we are + // queueing the commands in order to read them later. + cmd.send(Command::Publish(serde_json::to_value(&elem).unwrap())) + .unwrap(); + }); + peers_state.write().await.last_repub_timestamp = utils::get_epoch_ms(); + } + + None + } + DhtEvent::Discovered(_who) => None /* Some(DomoEvent::NewPeers( + who.into_iter().map(|w| w.to_string()).collect(), + ))*/, + DhtEvent::VolatileData(data) => { + // TODO we swallow errors quietly here + serde_json::from_str(&data) + .ok() + .map(Event::VolatileData) + } + DhtEvent::PersistentData(data) => { + if let Ok(mut elem) = serde_json::from_str::(&data) { + if elem.republication_timestamp != 0 { + log::debug!("Retransmission"); + } + // TODO: do something with this value instead + elem.republication_timestamp = 0; + local_write + .try_put(elem.clone()) + .await + .then_some( + Event::PersistentData(elem)) + } else { + None + } + } + DhtEvent::Ready(peers) => { + if !peers.is_empty() { + Some(Event::ReadyPeers( + peers.into_iter().map(|p| p.to_string()).collect())) + } else { + None + } + } + } + } + }); + + (cache, events) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::dht::test::*; + use futures_concurrency::prelude::*; + use std::{collections::HashSet, pin::pin}; + + #[tokio::test(flavor = "multi_thread", worker_threads = 3)] + async fn builder() { + let cfg = crate::Config { + shared_key: "d061545647652562b4648f52e8373b3a417fc0df56c332154460da1801b341e9" + .to_owned(), + ..Default::default() + }; + + let (_cache, _events) = Builder::from_config(cfg).make_channel().await.unwrap(); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] + async fn syncronization() { + let [mut a, mut b, mut c] = make_peers(2).await; + let mut d = make_peer(2).await; + + connect_peer(&mut a, &mut d).await; + connect_peer(&mut b, &mut d).await; + connect_peer(&mut c, &mut d).await; + + let a_local_cache = LocalCache::new(); + let b_local_cache = LocalCache::new(); + let c_local_cache = LocalCache::new(); + + let mut expected: HashSet<_> = (0..10) + .into_iter() + .map(|uuid| format!("uuid-{uuid}")) + .collect(); + + let (a_c, a_ev) = cache_channel(a_local_cache, a, 5000); + let (b_c, b_ev) = cache_channel(b_local_cache, b, 5000); + let (c_c, c_ev) = cache_channel(c_local_cache, c, 5000); + + let mut expected_peers = HashSet::new(); + expected_peers.insert(a_c.peer_id.clone()); + expected_peers.insert(b_c.peer_id.clone()); + expected_peers.insert(c_c.peer_id.clone()); + + let mut a_ev = pin!(a_ev); + let b_ev = pin!(b_ev); + let c_ev = pin!(c_ev); + + while let Some(ev) = a_ev.next().await { + match ev { + Event::ReadyPeers(peers) => { + log::info!("Ready peers {peers:?}"); + break; + } + _ => log::debug!("waiting for ready {ev:?}"), + } + } + + for uuid in 0..10 { + let _ = a_c + .put( + "Topic", + &format!("uuid-{uuid}"), + serde_json::json!({"key": uuid}), + ) + .await; + } + let mut s = ( + a_ev.map(|ev| ("a", ev)), + b_ev.map(|ev| ("b", ev)), + c_ev.map(|ev| ("c", ev)), + ) + .merge(); + + while !expected.is_empty() { + let (node, ev) = s.next().await.unwrap(); + match ev { + Event::PersistentData(data) => { + log::debug!("{node}: Got data {data:?}"); + if node == "c" { + assert!(expected.remove(&data.topic_uuid)); + } + } + _ => { + log::debug!("{node}: Other {ev:?}"); + } + } + } + + // c_c must had seen at least one of the expected peers + let peers: HashSet<_> = c_c.peers().await.into_iter().map(|p| p.peer_id).collect(); + + log::info!("peers {peers:?}"); + + assert!(peers.is_subset(&expected_peers)); + } +} diff --git a/dht-cache/src/cache/local.rs b/dht-cache/src/cache/local.rs new file mode 100644 index 0000000..73195eb --- /dev/null +++ b/dht-cache/src/cache/local.rs @@ -0,0 +1,427 @@ +//! Local in-memory cache + +pub use crate::data::*; +use crate::domopersistentstorage::{DomoPersistentStorage, SqlxStorage}; +use serde_json::Value; +use std::collections::hash_map::DefaultHasher; +use std::collections::{btree_map, BTreeMap}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; +use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; +use tokio::sync::{OwnedRwLockReadGuard, RwLock, RwLockReadGuard}; + +enum SqlxCommand { + Write(DomoCacheElement), +} + +#[derive(Debug, Default)] +pub(crate) struct InnerCache { + pub mem: BTreeMap>, + store: Option>, +} + +impl InnerCache { + pub fn put(&mut self, elem: DomoCacheElement) { + let topic_name = &elem.topic_name; + let topic_uuid = elem.topic_uuid.to_owned(); + + if let Some(topic) = self.mem.get_mut(topic_name) { + topic.insert(topic_uuid, elem); + } else { + self.mem.insert( + topic_name.to_owned(), + [(topic_uuid.to_owned(), elem)].into(), + ); + } + } +} + +/// Local cache +#[derive(Default, Clone)] +pub struct LocalCache(Arc>); + +impl LocalCache { + /// Instantiate a local cache from the configuration provided + /// + /// If url is empty do not try to bootstrap the cache from the db + /// + /// TODO: propagate errors + pub async fn with_config(db_config: &sifis_config::Cache) -> Self { + let mut inner = InnerCache::default(); + + if !db_config.url.is_empty() { + let mut store = SqlxStorage::new(db_config).await; + + for a in store.get_all_elements().await { + inner.put(a); + } + + if db_config.persistent { + let (s, mut r) = unbounded_channel(); + + tokio::task::spawn(async move { + while let Some(SqlxCommand::Write(elem)) = r.recv().await { + store.store(&elem).await + } + panic!("I'm out!"); + }); + inner.store = Some(s); + } + } + + LocalCache(Arc::new(RwLock::new(inner))) + } + + pub fn new() -> Self { + Default::default() + } + + /// Feeds a slice of this type into the given [`Hasher`]. + pub async fn hash(&self, state: &mut H) { + let cache = &self.0.read().await.mem; + for (topic_name, map_topic_name) in cache.iter() { + topic_name.hash(state); + + for (topic_uuid, value) in map_topic_name.iter() { + topic_uuid.hash(state); + value.to_string().hash(state); + } + } + } + + /// Put the element in the cache + /// + /// If it is already present overwrite it + pub async fn put(&self, elem: DomoCacheElement) { + let mut cache = self.0.write().await; + + if let Some(s) = cache.store.as_mut() { + let _ = s.send(SqlxCommand::Write(elem.to_owned())); + } + + cache.put(elem); + } + + /// Try to insert the element in the cache + /// + /// Return false if the element to insert is older than the one in the cache + pub async fn try_put(&self, elem: DomoCacheElement) -> bool { + let mut cache = self.0.write().await; + let topic_name = elem.topic_name.clone(); + let topic_uuid = &elem.topic_uuid; + + let topic = cache.mem.entry(topic_name).or_default(); + + let e = if topic + .get(topic_uuid) + .is_some_and(|cur| elem.publication_timestamp <= cur.publication_timestamp) + { + false + } else { + topic.insert(topic_uuid.to_owned(), elem.clone()); + true + }; + + if e { + if let Some(s) = cache.store.as_mut() { + let _ = s.send(SqlxCommand::Write(elem)); + } + } + + e + } + + /// Retrieve an element by its uuid and topic + pub async fn get(&self, topic_name: &str, topic_uuid: &str) -> Option { + let cache = self.0.read().await; + + cache + .mem + .get(topic_name) + .and_then(|topic| topic.get(topic_uuid)) + .cloned() + } + + /// Instantiate a query over the local cache + pub fn query<'a>(&self, topic: &'a str) -> Query<'a> { + Query::new(topic, self.clone()) + } + + /// Compute the current hash value + pub async fn get_hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.hash(&mut s).await; + s.finish() + } + + pub(crate) async fn read_owned(&self) -> OwnedRwLockReadGuard { + self.0.clone().read_owned().await + } +} + +/// Query the local DHT cache +#[derive(Clone)] +pub struct Query<'a> { + cache: LocalCache, + topic: &'a str, +} + +impl<'a> Query<'a> { + /// Create a new query over a local cache + pub fn new(topic: &'a str, cache: LocalCache) -> Self { + Self { topic, cache } + } + + /// Gets a value on a topic given a specific UUID. + /// + /// Keep in mind that the returned type holds a lock guard to the underlying data, be careful + /// to use it across yield points. + pub async fn get_by_uuid<'b>(&'b self, uuid: &'b str) -> Option> { + RwLockReadGuard::try_map(self.cache.0.read().await, |cache| { + cache + .mem + .get(self.topic) + .and_then(|tree| tree.get(uuid)) + .map(|cache_element| &cache_element.value) + }) + .ok() + } + + /// Gets the data stored for the topic. + /// + /// It returns an _iterable type_ that can be used to obtain pairs of UUID and values. + /// + /// Keep in mind that the returned type holds a lock guard to the underlying data, be careful + /// to use it across yield points. + /// + /// # Example + /// + /// ``` + /// # use sifis_dht::cache::Query; + /// # async fn handle_query(query: Query<'_>) { + /// let get = query.get().await; + /// for pair in &get { + /// let (uuid, value): (&str, &serde_json::Value) = pair; + /// println!("{uuid}, {value}"); + /// } + /// # } + /// ``` + #[inline] + pub async fn get(&self) -> QueryGet<'_> { + let lock = + RwLockReadGuard::try_map(self.cache.0.read().await, |cache| cache.mem.get(self.topic)) + .ok(); + + QueryGet(lock) + } +} + +#[derive(Debug)] +pub struct QueryGet<'a>(Option>>); + +impl<'a> QueryGet<'a> { + /// Iterate over queried pairs of UUIDs and values. + #[inline] + #[must_use] + pub fn iter(&'a self) -> QueryGetIter<'a> { + IntoIterator::into_iter(self) + } +} + +impl<'a> IntoIterator for &'a QueryGet<'a> { + type Item = (&'a str, &'a Value); + type IntoIter = QueryGetIter<'a>; + + #[inline] + fn into_iter(self) -> Self::IntoIter { + let values = self + .0 + .as_deref() + .map_or_else(Default::default, BTreeMap::iter); + + QueryGetIter(values) + } +} + +#[derive(Debug)] +pub struct QueryGetIter<'a>(btree_map::Iter<'a, String, DomoCacheElement>); + +impl<'a> Iterator for QueryGetIter<'a> { + type Item = (&'a str, &'a Value); + + #[inline] + fn next(&mut self) -> Option { + self.0 + .next() + .map(|(uuid, cache_element)| (&**uuid, &cache_element.value)) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.0.size_hint() + } +} + +impl DoubleEndedIterator for QueryGetIter<'_> { + #[inline] + fn next_back(&mut self) -> Option { + self.0 + .next_back() + .map(|(uuid, cache_element)| (&**uuid, &cache_element.value)) + } +} + +impl ExactSizeIterator for QueryGetIter<'_> { + #[inline] + fn len(&self) -> usize { + self.0.len() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::data::DomoCacheElement; + use serde_json::*; + + fn make_test_element(topic_name: &str, topic_uuid: &str, value: &Value) -> DomoCacheElement { + DomoCacheElement { + topic_name: topic_name.to_owned(), + topic_uuid: topic_uuid.to_owned(), + value: value.to_owned(), + ..Default::default() + } + } + + #[tokio::test] + async fn hash() { + let cache = LocalCache::new(); + + let hash = cache.get_hash().await; + println!("{hash}"); + + let elem = make_test_element("Domo::Light", "luce-1", &json!({ "connected": true})); + cache.put(elem).await; + + let hash2 = cache.get_hash().await; + println!("{hash2}"); + + assert_ne!(hash, hash2); + + let elem = make_test_element("Domo::Light", "luce-1", &json!({ "connected": false})); + cache.put(elem).await; + + let hash3 = cache.get_hash().await; + println!("{hash3}"); + + assert_ne!(hash2, hash3); + + let elem = make_test_element("Domo::Light", "luce-1", &json!({ "connected": true})); + cache.put(elem).await; + + let hash4 = cache.get_hash().await; + println!("{hash4}"); + + assert_eq!(hash2, hash4); + } + + #[tokio::test] + async fn put() { + let cache = LocalCache::new(); + + let elem = make_test_element("Domo::Light", "luce-1", &json!({ "connected": true})); + + cache.put(elem.clone()).await; + + let out = cache.get("Domo::Light", "luce-1").await.expect("element"); + + assert_eq!(out, elem); + + let elem2 = make_test_element("Domo::Light", "luce-1", &json!({ "connected": false})); + + cache.put(elem2.clone()).await; + + let out = cache.get("Domo::Light", "luce-1").await.expect("element"); + + assert_eq!(out, elem2); + } + + #[tokio::test] + async fn try_put() { + let cache = LocalCache::new(); + + let mut elem = make_test_element("Domo::Light", "luce-1", &json!({ "connected": true})); + + assert!(cache.try_put(elem.clone()).await); + + let out = cache.get("Domo::Light", "luce-1").await.expect("element"); + + assert_eq!(out, elem); + + elem.publication_timestamp = 1; + + assert!(cache.try_put(elem.clone()).await); + + let out: DomoCacheElement = cache.get("Domo::Light", "luce-1").await.expect("element"); + + assert_eq!(out, elem); + + elem.publication_timestamp = 0; + + assert!(!cache.try_put(elem).await); + + let out: DomoCacheElement = cache.get("Domo::Light", "luce-1").await.expect("element"); + + assert_eq!(out.publication_timestamp, 1); + } + + #[tokio::test] + async fn query() { + let cache = LocalCache::new(); + + for item in 0..10 { + let elem = make_test_element( + "Domo::Light", + &format!("luce-{item}"), + &json!({ "connected": true, "count": item}), + ); + + cache.put(elem).await; + } + + let q = cache.query("Domo::Light"); + + assert_eq!(q.get().await.iter().len(), 10); + + assert!(q.get_by_uuid("not-existent").await.is_none()); + + assert_eq!( + q.clone() + .get_by_uuid("luce-1") + .await + .unwrap() + .get("count") + .unwrap(), + 1 + ); + } + + #[tokio::test] + async fn persistence() { + let cfg = crate::Config { + ..Default::default() + }; + + let cache = LocalCache::with_config(&cfg).await; + + for item in 0..10 { + let elem = make_test_element( + "Domo::Light", + &format!("luce-{item}"), + &json!({ "connected": true, "count": item}), + ); + + cache.put(elem).await; + } + } +} diff --git a/dht-cache/src/data.rs b/dht-cache/src/data.rs new file mode 100644 index 0000000..b38141a --- /dev/null +++ b/dht-cache/src/data.rs @@ -0,0 +1,61 @@ +//! Data types for interacting with the DHT +//! +//! The DHT may persist Elements indexed by a topic and an uuid or broadcast free-form messages. +//! +//! +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::fmt::{Display, Formatter}; + +/// Events of interest +#[derive(Debug)] +pub enum DomoEvent { + None, + VolatileData(serde_json::Value), + PersistentData(DomoCacheElement), + NewPeers(Vec), +} + +/// Full Cache Element +#[derive(Default, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub struct DomoCacheElement { + /// Free-form topic name + pub topic_name: String, + /// Unique identifier of the element + pub topic_uuid: String, + /// JSON-serializable Value + pub value: Value, + /// If true the element could be expunged from the local cache + pub deleted: bool, + /// Time of the first pubblication + pub publication_timestamp: u128, + /// First peer publishing it + pub publisher_peer_id: String, + /// If non-zero the element is republished as part of a cache sync + pub republication_timestamp: u128, +} + +/// Summary of the current state of the DHT according to a peer +#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] +pub(crate) struct DomoCacheStateMessage { + pub peer_id: String, + pub cache_hash: u64, + pub publication_timestamp: u128, +} + +impl Display for DomoCacheElement { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "(topic_name: {}, topic_uuid:{}, \ + value: {}, deleted: {}, publication_timestamp: {}, \ + peer_id: {})", + self.topic_name, + self.topic_uuid, + self.value, + self.deleted, + self.publication_timestamp, + self.publisher_peer_id + ) + } +} diff --git a/dht-cache/src/dht.rs b/dht-cache/src/dht.rs new file mode 100644 index 0000000..a221033 --- /dev/null +++ b/dht-cache/src/dht.rs @@ -0,0 +1,393 @@ +//! DHT Abstraction +//! + +use std::collections::HashSet; +use std::time::Duration; + +use crate::domolibp2p::{DomoBehaviour, OutEvent}; +use futures::prelude::*; +use libp2p::{gossipsub::IdentTopic as Topic, swarm::SwarmEvent, Swarm}; +use libp2p::{mdns, PeerId}; +use serde_json::Value; +use time::OffsetDateTime; +use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::task::JoinHandle; + +/// Network commands +#[derive(Debug)] +pub enum Command { + Config(Value), + Broadcast(Value), + Publish(Value), + Stop, +} + +/// Network Events +#[derive(Debug)] +pub enum Event { + PersistentData(String), + VolatileData(String), + Config(String), + Discovered(Vec), + Ready(HashSet), +} + +fn handle_command(swarm: &mut Swarm, cmd: Command) -> bool { + use Command::*; + match cmd { + Broadcast(val) => { + let topic = Topic::new("domo-volatile-data"); + let m = serde_json::to_string(&val).unwrap(); + + if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, m.as_bytes()) { + log::info!("Boradcast error: {e:?}"); + } + true + } + Publish(val) => { + let topic = Topic::new("domo-persistent-data"); + let m2 = serde_json::to_string(&val).unwrap(); + + if let Err(e) = swarm + .behaviour_mut() + .gossipsub + .publish(topic, m2.as_bytes()) + { + log::info!("Publish error: {e:?}"); + } + true + } + Config(val) => { + let topic = Topic::new("domo-config"); + let m = serde_json::to_string(&val).unwrap(); + if let Err(e) = swarm.behaviour_mut().gossipsub.publish(topic, m.as_bytes()) { + log::info!("Config error: {e:?}"); + } + true + } + Stop => false, + } +} + +fn handle_swarm_event( + swarm: &mut Swarm, + event: SwarmEvent, + ev_send: &UnboundedSender, +) -> Result<(), ()> { + use Event::*; + + match event { + SwarmEvent::ExpiredListenAddr { address, .. } => { + log::info!("Address {address:?} expired"); + } + SwarmEvent::ConnectionEstablished { .. } => { + log::info!("Connection established ..."); + } + SwarmEvent::ConnectionClosed { .. } => { + log::info!("Connection closed"); + } + SwarmEvent::ListenerError { .. } => { + log::info!("Listener Error"); + } + SwarmEvent::OutgoingConnectionError { .. } => { + log::info!("Outgoing connection error"); + } + SwarmEvent::ListenerClosed { .. } => { + log::info!("Listener Closed"); + } + SwarmEvent::NewListenAddr { address, .. } => { + println!("Listening in {address:?}"); + } + SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub( + libp2p::gossipsub::Event::Message { + propagation_source: _peer_id, + message_id: _id, + message, + }, + )) => { + if let Ok(data) = String::from_utf8(message.data) { + match message.topic.as_str() { + "domo-persistent-data" => { + ev_send.send(PersistentData(data)).map_err(|_| ())?; + } + "domo-config" => { + ev_send.send(Config(data)).map_err(|_| ())?; + } + "domo-volatile-data" => { + ev_send.send(VolatileData(data)).map_err(|_| ())?; + } + _ => { + log::info!("Not able to recognize message"); + } + } + } else { + log::warn!("The message does not contain utf8 data"); + } + } + SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Gossipsub( + libp2p::gossipsub::Event::Subscribed { peer_id, topic }, + )) => { + log::debug!("Peer {peer_id} subscribed to {}", topic.as_str()); + } + SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(mdns::Event::Expired(list))) => { + let local = OffsetDateTime::now_utc(); + + for (peer, _) in list { + log::info!("MDNS for peer {peer} expired {local:?}"); + } + } + SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(mdns::Event::Discovered(list))) => { + let local = OffsetDateTime::now_utc(); + let peers = list + .into_iter() + .map(|(peer, _)| { + swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer); + log::info!("Discovered peer {peer} {local:?}"); + peer + }) + .collect(); + ev_send.send(Discovered(peers)).map_err(|_| ())?; + } + _ => {} + } + + Ok(()) +} + +/// Spawn a new task polling constantly for new swarm Events +pub fn dht_channel( + mut swarm: Swarm, +) -> ( + UnboundedSender, + UnboundedReceiver, + JoinHandle>, +) { + let (cmd_send, mut cmd_recv) = mpsc::unbounded_channel(); + let (ev_send, ev_recv) = mpsc::unbounded_channel(); + + let handle = tokio::task::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + let volatile = Topic::new("domo-volatile-data").hash(); + let persistent = Topic::new("domo-persistent-data").hash(); + let config = Topic::new("domo-config").hash(); + let mut ready_peers = HashSet::new(); + // Only peers that subscribed to all the topics are usable + let check_peers = |swarm: &mut Swarm| { + swarm + .behaviour_mut() + .gossipsub + .all_peers() + .filter_map(|(p, topics)| { + log::debug!("{p}, {topics:?}"); + (topics.contains(&&volatile) + && topics.contains(&&persistent) + && topics.contains(&&config)) + .then(|| p.to_owned()) + }) + .collect() + }; + + loop { + log::trace!("Looping {}", swarm.local_peer_id()); + tokio::select! { + // the mdns event is not enough to ensure we can send messages + _ = interval.tick() => { + log::debug!("{} Checking for peers", swarm.local_peer_id()); + let peers: HashSet<_> = check_peers(&mut swarm); + if !peers.is_empty() && ready_peers != peers { + ready_peers = peers.clone(); + if ev_send.send(Event::Ready(peers)).is_err() { + return swarm; + } + } + } + cmd = cmd_recv.recv() => { + log::trace!("command {cmd:?}"); + if !cmd.is_some_and(|cmd| handle_command(&mut swarm, cmd)) { + log::debug!("Exiting cmd"); + return swarm + } + } + ev = swarm.select_next_some() => { + log::trace!("event {ev:?}"); + if handle_swarm_event(&mut swarm, ev, &ev_send).is_err() { + log::debug!("Exiting ev"); + return swarm + } + } + } + tokio::task::yield_now().await; + } + }); + + (cmd_send, ev_recv, handle) +} + +#[cfg(test)] +pub(crate) mod test { + use std::collections::HashSet; + use std::time::Duration; + + use super::*; + use crate::Keypair; + use libp2p::core::transport::MemoryTransport; + use libp2p::core::upgrade::Version; + use libp2p::plaintext::PlainText2Config; + use libp2p::pnet::{PnetConfig, PreSharedKey}; + use libp2p::swarm::SwarmBuilder; + use libp2p::yamux; + use libp2p::Transport; + use libp2p_swarm_test::SwarmExt; + use serde_json::json; + + // like Swarm::new_ephemeral but with pnet variant + fn new_ephemeral( + behaviour_fn: impl FnOnce(Keypair) -> DomoBehaviour, + variant: u8, + ) -> Swarm { + let identity = Keypair::generate_ed25519(); + let peer_id = PeerId::from(identity.public()); + let psk = PreSharedKey::new([variant; 32]); + + let transport = MemoryTransport::default() + .or_transport(libp2p::tcp::async_io::Transport::default()) + .and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)) + .upgrade(Version::V1) + .authenticate(PlainText2Config { + local_public_key: identity.public(), + }) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(20)) + .boxed(); + + SwarmBuilder::without_executor(transport, behaviour_fn(identity), peer_id).build() + } + + pub async fn make_peer(variant: u8) -> Swarm { + let mut swarm = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); + swarm.listen().await; + swarm + } + + pub async fn connect_peer(a: &mut Swarm, b: &mut Swarm) { + a.connect(b).await; + + let peers: Vec<_> = a.connected_peers().cloned().collect(); + + for peer in peers { + a.behaviour_mut().gossipsub.add_explicit_peer(&peer); + } + } + + pub async fn make_peers(variant: u8) -> [Swarm; 3] { + let _ = env_logger::builder().is_test(true).try_init(); + + let mut a = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); + let mut b = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); + let mut c = new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap(), variant); + /* + let mut a = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap()); + let mut b = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap()); + let mut c = Swarm::new_ephemeral(|identity| DomoBehaviour::new(&identity).unwrap()); + */ + for a in a.external_addresses() { + log::info!("{a:?}"); + } + + a.listen().await; + b.listen().await; + c.listen().await; + + a.connect(&mut b).await; + b.connect(&mut c).await; + c.connect(&mut a).await; + + println!("a {}", a.local_peer_id()); + println!("b {}", b.local_peer_id()); + println!("c {}", c.local_peer_id()); + + let peers: Vec<_> = a.connected_peers().cloned().collect(); + + log::info!("Peers {peers:#?}"); + + for peer in peers { + a.behaviour_mut().gossipsub.add_explicit_peer(&peer); + } + + let peers: Vec<_> = b.connected_peers().cloned().collect(); + + for peer in peers { + b.behaviour_mut().gossipsub.add_explicit_peer(&peer); + } + + let peers: Vec<_> = c.connected_peers().cloned().collect(); + + for peer in peers { + c.behaviour_mut().gossipsub.add_explicit_peer(&peer); + } + + [a, b, c] + } + + #[tokio::test] + async fn multiple_peers() { + let [a, b, c] = make_peers(1).await; + + let mut expected_peers: HashSet<_> = + [b.local_peer_id().to_owned(), c.local_peer_id().to_owned()].into(); + + let (a_s, mut ar, _) = dht_channel(a); + let (_b_s, br, _) = dht_channel(b); + let (_c_s, cr, _) = dht_channel(c); + + log::info!("Waiting for peers"); + + // Wait until peers are discovered + while let Some(ev) = ar.recv().await { + match ev { + Event::VolatileData(data) => log::info!("volatile {data}"), + Event::PersistentData(data) => log::info!("persistent {data}"), + Event::Config(cfg) => log::info!("config {cfg}"), + Event::Discovered(peers) => { + log::info!("found peers: {peers:?}"); + } + Event::Ready(peers) => { + log::info!("ready peers: {peers:?}"); + for peer in peers { + expected_peers.remove(&peer); + } + + if expected_peers.is_empty() { + break; + } + } + } + } + + let msg = json!({"a": "value"}); + + a_s.send(Command::Broadcast(msg.clone())).unwrap(); + + log::info!("Sent volatile"); + for r in [br, cr].iter_mut() { + while let Some(ev) = r.recv().await { + match ev { + Event::VolatileData(data) => { + log::info!("volatile {data}"); + let val: Value = serde_json::from_str(&data).unwrap(); + assert_eq!(val, msg); + break; + } + Event::PersistentData(data) => log::info!("persistent {data}"), + Event::Config(cfg) => log::info!("config {cfg}"), + Event::Discovered(peers) => { + log::info!("found peers: {peers:?}"); + } + Event::Ready(peers) => { + log::info!("peers ready: {peers:?}"); + } + } + } + } + } +} diff --git a/dht-cache/src/domocache.rs b/dht-cache/src/domocache.rs index 0b4f012..0e282b9 100644 --- a/dht-cache/src/domocache.rs +++ b/dht-cache/src/domocache.rs @@ -1,18 +1,17 @@ +//! Cached access to the DHT +pub use crate::data::*; +use crate::domolibp2p::{generate_rsa_key, parse_hex_key}; use crate::domopersistentstorage::{DomoPersistentStorage, SqlxStorage}; use crate::utils; +use crate::Error; use futures::prelude::*; use libp2p::gossipsub::IdentTopic as Topic; use libp2p::identity::Keypair; use libp2p::mdns; use libp2p::swarm::SwarmEvent; -use rsa::pkcs8::EncodePrivateKey; -use rsa::RsaPrivateKey; -use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::hash_map::DefaultHasher; use std::collections::BTreeMap; -use std::error::Error; -use std::fmt::{Display, Formatter}; use std::hash::{Hash, Hasher}; use std::io::ErrorKind; use std::time::Duration; @@ -20,72 +19,19 @@ use time::OffsetDateTime; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; -fn generate_rsa_key() -> (Vec, Vec) { - let mut rng = rand::thread_rng(); - let bits = 2048; - let private_key = RsaPrivateKey::new(&mut rng, bits).expect("failed to generate a key"); - let pem = private_key - .to_pkcs8_pem(Default::default()) - .unwrap() - .as_bytes() - .to_vec(); - let der = private_key.to_pkcs8_der().unwrap().as_bytes().to_vec(); - (pem, der) -} - -// possible events returned by cache_loop_event() -#[derive(Debug)] -pub enum DomoEvent { - None, - VolatileData(serde_json::Value), - PersistentData(DomoCacheElement), -} - // period at which we send messages containing our cache hash -const SEND_CACHE_HASH_PERIOD: u8 = 5; - -#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] -pub struct DomoCacheElement { - pub topic_name: String, - pub topic_uuid: String, - pub value: Value, - pub deleted: bool, - pub publication_timestamp: u128, - pub publisher_peer_id: String, - pub republication_timestamp: u128, -} - -#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)] -pub struct DomoCacheStateMessage { - pub peer_id: String, - pub cache_hash: u64, - pub publication_timestamp: u128, -} - -impl Display for DomoCacheElement { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "(topic_name: {}, topic_uuid:{}, \ - value: {}, deleted: {}, publication_timestamp: {}, \ - peer_id: {})", - self.topic_name, - self.topic_uuid, - self.value, - self.deleted, - self.publication_timestamp, - self.publisher_peer_id - ) - } -} +const SEND_CACHE_HASH_PERIOD: u8 = 120; +/// Cached access to the DHT +/// +/// It keeps an in-memory (or persistent) cache of the whole DHT. pub struct DomoCache { storage: SqlxStorage, - pub cache: BTreeMap>, - pub peers_caches_state: BTreeMap, - pub publish_cache_counter: u8, - pub last_cache_repub_timestamp: u128, - pub swarm: libp2p::Swarm, + pub(crate) cache: BTreeMap>, + peers_caches_state: BTreeMap, + pub(crate) publish_cache_counter: u8, + pub(crate) last_cache_repub_timestamp: u128, + pub(crate) swarm: libp2p::Swarm, pub is_persistent_cache: bool, pub local_peer_id: String, client_tx_channel: Sender, @@ -116,7 +62,7 @@ impl Hash for DomoCache { } impl DomoCache { - #[allow(unused)] + /// Apply a jsonpath expression over the elements of a topic. pub fn filter_with_topic_name( &self, topic_name: &str, @@ -150,10 +96,7 @@ impl DomoCache { } } - fn handle_volatile_data( - &self, - message: &str, - ) -> std::result::Result> { + fn handle_volatile_data(&self, message: &str) -> std::result::Result { let m: serde_json::Value = serde_json::from_str(message)?; Ok(DomoEvent::VolatileData(m)) } @@ -161,7 +104,7 @@ impl DomoCache { async fn handle_persistent_message_data( &mut self, message: &str, - ) -> std::result::Result> { + ) -> std::result::Result { let mut m: DomoCacheElement = serde_json::from_str(message)?; // rimetto a 0 il republication timestamp altrimenti cambia hash @@ -232,6 +175,7 @@ impl DomoCache { (true, true) } + /// Get a value identified by its uuid within a topic. pub fn get_topic_uuid(&self, topic_name: &str, topic_uuid: &str) -> Result { let ret = self.read_cache_element(topic_name, topic_uuid); match ret { @@ -244,6 +188,7 @@ impl DomoCache { } } + /// Get all the values within a topic. pub fn get_topic_name(&self, topic_name: &str) -> Result { let s = r#"[]"#; let mut ret: Value = serde_json::from_str(s).unwrap(); @@ -266,6 +211,7 @@ impl DomoCache { } } + /// Return the whole cache as a JSON Value pub fn get_all(&self) -> Value { let s = r#"[]"#; let mut ret: Value = serde_json::from_str(s).unwrap(); @@ -363,6 +309,7 @@ impl DomoCache { } } + /// Print the status of the cache across the known peers. pub fn print_peers_cache(&self) { for (peer_id, peer_data) in self.peers_caches_state.iter() { println!( @@ -372,6 +319,15 @@ impl DomoCache { } } + /// Get the currently seen peers + /// + /// And their known hash and timestamp + pub fn get_peers_stats(&self) -> impl Iterator { + self.peers_caches_state + .values() + .map(|v| (v.peer_id.as_str(), v.cache_hash, v.publication_timestamp)) + } + async fn inner_select(&mut self) -> Event { use Event::*; tokio::select!( @@ -460,7 +416,11 @@ impl DomoCache { ); Continue } - pub async fn cache_event_loop(&mut self) -> std::result::Result> { + + /// Cache event loop + /// + /// To be called as often as needed to keep the cache in-sync and receive new data. + pub async fn cache_event_loop(&mut self) -> Result { use Event::*; loop { match self.inner_select().await { @@ -486,14 +446,17 @@ impl DomoCache { } } - pub async fn new(conf: sifis_config::Cache) -> Result> { + /// Instantiate a new cache + /// + /// See [sifis_config::Cache] for the available parameters. + pub async fn new(conf: crate::Config) -> Result { if conf.url.is_empty() { panic!("db_url needed"); } let is_persistent_cache = conf.persistent; let loopback_only = conf.loopback; - let shared_key = conf.shared_key.clone(); + let shared_key = parse_hex_key(&conf.shared_key)?; let private_key_file = conf.private_key.clone(); let storage = SqlxStorage::new(&conf).await; @@ -502,24 +465,22 @@ impl DomoCache { let mut pkcs8_der = if let Some(pk_path) = private_key_file { match std::fs::read(&pk_path) { Ok(pem) => { - let der = pem_rfc7468::decode_vec(&pem) - .map_err(|e| format!("Couldn't decode pem: {e:?}"))?; + let der = pem_rfc7468::decode_vec(&pem)?; der.1 } Err(e) if e.kind() == ErrorKind::NotFound => { // Generate a new key and put it into the file at the given path let (pem, der) = generate_rsa_key(); - std::fs::write(pk_path, pem).expect("Couldn't save "); + std::fs::write(pk_path, pem)?; der } - Err(e) => Err(format!("Couldn't load key file: {e:?}"))?, + Err(e) => Err(e)?, } } else { generate_rsa_key().1 }; - let local_key_pair = Keypair::rsa_from_pkcs8(&mut pkcs8_der) - .map_err(|e| format!("Couldn't load key: {e:?}"))?; + let local_key_pair = Keypair::rsa_from_pkcs8(&mut pkcs8_der)?; let swarm = crate::domolibp2p::start(shared_key, local_key_pair, loopback_only) .await @@ -556,6 +517,10 @@ impl DomoCache { Ok(c) } + /// Publish a volatile value on the DHT + /// + /// All the peers reachable will receive it. + /// Peers joining later would not receive it. pub async fn pub_value(&mut self, value: Value) { let topic = Topic::new("domo-volatile-data"); @@ -575,7 +540,7 @@ impl DomoCache { self.client_tx_channel.send(ev).await.unwrap(); } - pub async fn gossip_pub(&mut self, mut m: DomoCacheElement, republished: bool) { + async fn gossip_pub(&mut self, mut m: DomoCacheElement, republished: bool) { let topic = Topic::new("domo-persistent-data"); if republished { @@ -601,6 +566,7 @@ impl DomoCache { } } + /// Print the current DHT state pub fn print(&self) { for (topic_name, topic_name_map) in self.cache.iter() { let mut first = true; @@ -617,16 +583,22 @@ impl DomoCache { } } + /// Print the DHT current hash pub fn print_cache_hash(&self) { println!("Hash {}", self.get_cache_hash()) } + /// Compute the hash of the current DHT state pub fn get_cache_hash(&self) -> u64 { let mut s = DefaultHasher::new(); self.hash(&mut s); s.finish() } + /// Mark a persistent value as deleted + /// + /// It will not be propagated and it is expunged from the initial DHT state fed to new peers + /// joining. pub async fn delete_value(&mut self, topic_name: &str, topic_uuid: &str) { let mut value_to_set = serde_json::Value::Null; @@ -649,7 +621,9 @@ impl DomoCache { .await; } - // metodo chiamato dall'applicazione, metto in cache e pubblico + /// Write/Update a persistent value + /// + /// The value will be part of the initial DHT state a peer joining will receive. pub async fn write_value(&mut self, topic_name: &str, topic_uuid: &str, value: Value) { let timest = utils::get_epoch_ms(); let elem = DomoCacheElement { diff --git a/dht-cache/src/domolibp2p.rs b/dht-cache/src/domolibp2p.rs index 6196ec7..08b3c3d 100644 --- a/dht-cache/src/domolibp2p.rs +++ b/dht-cache/src/domolibp2p.rs @@ -8,6 +8,9 @@ use libp2p::gossipsub::{ }; use libp2p::{gossipsub, tcp}; +use rsa::pkcs8::EncodePrivateKey; +use rsa::RsaPrivateKey; + use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; // @@ -23,12 +26,13 @@ use libp2p::Transport; use libp2p::{identity, mdns, swarm::NetworkBehaviour, PeerId, Swarm}; use libp2p::swarm::SwarmBuilder; -use std::error::Error; use std::time::Duration; +use crate::Error; + const KEY_SIZE: usize = 32; -fn parse_hex_key(s: &str) -> Result<[u8; KEY_SIZE], String> { +pub fn parse_hex_key(s: &str) -> Result { if s.len() == KEY_SIZE * 2 { let mut r = [0u8; KEY_SIZE]; for i in 0..KEY_SIZE { @@ -37,16 +41,18 @@ fn parse_hex_key(s: &str) -> Result<[u8; KEY_SIZE], String> { Ok(res) => { r[i] = res; } - Err(_e) => return Err(String::from("Error while parsing")), + Err(_e) => return Err(Error::Hex("Error while parsing".into())), } } - Ok(r) + let psk = PreSharedKey::new(r); + + Ok(psk) } else { - Err(format!( + Err(Error::Hex(format!( "Len Error: expected {} but got {}", KEY_SIZE * 2, s.len() - )) + ))) } } @@ -68,25 +74,61 @@ pub fn build_transport( .boxed() } +pub fn generate_rsa_key() -> (Vec, Vec) { + let mut rng = rand::thread_rng(); + let bits = 2048; + let private_key = RsaPrivateKey::new(&mut rng, bits).expect("failed to generate a key"); + let pem = private_key + .to_pkcs8_pem(Default::default()) + .unwrap() + .as_bytes() + .to_vec(); + let der = private_key.to_pkcs8_der().unwrap().as_bytes().to_vec(); + (pem, der) +} + pub async fn start( - shared_key: String, + shared_key: PreSharedKey, local_key_pair: identity::Keypair, loopback_only: bool, -) -> Result, Box> { +) -> Result, Error> { let local_peer_id = PeerId::from(local_key_pair.public()); - // Create a Gossipsub topic - let topic_persistent_data = Topic::new("domo-persistent-data"); - let topic_volatile_data = Topic::new("domo-volatile-data"); - let topic_config = Topic::new("domo-config"); - - let arr = parse_hex_key(&shared_key)?; - let psk = PreSharedKey::new(arr); - - let transport = build_transport(local_key_pair.clone(), psk); + let transport = build_transport(local_key_pair.clone(), shared_key); // Create a swarm to manage peers and events. let mut swarm = { + let behaviour = DomoBehaviour::new(&local_key_pair)?; + + SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build() + }; + + if !loopback_only { + // Listen on all interfaces and whatever port the OS assigns. + swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + } else { + // Listen only on loopack interface + swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?; + } + + Ok(swarm) +} + +// We create a custom network behaviour that combines mDNS and gossipsub. +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "OutEvent")] +pub struct DomoBehaviour { + pub mdns: libp2p::mdns::tokio::Behaviour, + pub gossipsub: gossipsub::Behaviour, +} + +impl DomoBehaviour { + pub fn new(local_key_pair: &crate::Keypair) -> Result { + let local_peer_id = PeerId::from(local_key_pair.public()); + let topic_persistent_data = Topic::new("domo-persistent-data"); + let topic_volatile_data = Topic::new("domo-volatile-data"); + let topic_config = Topic::new("domo-config"); + let mdnsconf = mdns::Config { ttl: Duration::from_secs(600), query_interval: Duration::from_secs(30), @@ -103,6 +145,7 @@ pub async fn start( }; // Set a custom gossipsub + // SAFETY: hard-coded configuration let gossipsub_config = gossipsub::ConfigBuilder::default() .heartbeat_interval(Duration::from_secs(3)) // This is set to aid debugging by not cluttering the log space .check_explicit_peers_ticks(10) @@ -114,7 +157,7 @@ pub async fn start( // build a gossipsub network behaviour let mut gossipsub = gossipsub::Behaviour::new( - MessageAuthenticity::Signed(local_key_pair), + MessageAuthenticity::Signed(local_key_pair.to_owned()), gossipsub_config, ) .expect("Correct configuration"); @@ -129,28 +172,9 @@ pub async fn start( gossipsub.subscribe(&topic_config).unwrap(); let behaviour = DomoBehaviour { mdns, gossipsub }; - //Swarm::new(transport, behaviour, local_peer_id) - SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build() - }; - - if !loopback_only { - // Listen on all interfaces and whatever port the OS assigns. - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - } else { - // Listen only on loopack interface - swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse()?)?; + Ok(behaviour) } - - Ok(swarm) -} - -// We create a custom network behaviour that combines mDNS and gossipsub. -#[derive(NetworkBehaviour)] -#[behaviour(to_swarm = "OutEvent")] -pub struct DomoBehaviour { - pub mdns: libp2p::mdns::tokio::Behaviour, - pub gossipsub: gossipsub::Behaviour, } #[allow(clippy::large_enum_variant)] diff --git a/dht-cache/src/domopersistentstorage.rs b/dht-cache/src/domopersistentstorage.rs index a50a7f6..1f4ef77 100644 --- a/dht-cache/src/domopersistentstorage.rs +++ b/dht-cache/src/domopersistentstorage.rs @@ -8,7 +8,7 @@ use sqlx::{ any::{AnyConnectOptions, AnyKind, AnyRow}, postgres::PgConnectOptions, sqlite::SqliteConnectOptions, - AnyConnection, ConnectOptions, Connection, Executor, Row, SqliteConnection, + AnyConnection, ConnectOptions, Executor, Row, }; use std::str::FromStr; @@ -76,7 +76,9 @@ impl SqlxStorage { } } + #[cfg(test)] pub async fn new_in_memory(db_table: &str) -> Self { + use sqlx::{Connection, SqliteConnection}; let conn = SqliteConnection::connect("sqlite::memory:").await.unwrap(); Self::with_connection(conn.into(), db_table, true).await @@ -223,10 +225,16 @@ mod tests { let _s = super::SqlxStorage::new(&db_config).await; } + fn get_pg_db() -> String { + std::env::var("DOMO_DHT_TEST_DB").unwrap_or_else(|_| { + "postgres://postgres:mysecretpassword@localhost/postgres".to_string() + }) + } + #[tokio::test] async fn test_pgsql_db_connection() { let db_config = sifis_config::Cache { - url: "postgres://postgres:mysecretpassword@localhost/postgres".to_string(), + url: get_pg_db(), table: "domo_test_pgsql_connection".to_string(), persistent: true, ..Default::default() @@ -244,7 +252,7 @@ mod tests { assert_eq!(v.len(), 0); let db_config = sifis_config::Cache { - url: "postgres://postgres:mysecretpassword@localhost/postgres".to_string(), + url: get_pg_db(), table: "test_initial_get_all_elements".to_string(), persistent: true, ..Default::default() @@ -278,7 +286,7 @@ mod tests { assert_eq!(v[0], m); let db_config = sifis_config::Cache { - url: "postgres://postgres:mysecretpassword@localhost/postgres".to_string(), + url: get_pg_db(), table: "test_store".to_string(), persistent: true, ..Default::default() diff --git a/dht-cache/src/lib.rs b/dht-cache/src/lib.rs index ab62c28..b5b3efa 100644 --- a/dht-cache/src/lib.rs +++ b/dht-cache/src/lib.rs @@ -1,8 +1,43 @@ -extern crate core; - +//! Simple DHT/messaging system based on libp2p +//! +pub mod cache; +pub mod data; +pub mod dht; pub mod domocache; -pub mod domolibp2p; -pub mod domopersistentstorage; +mod domolibp2p; +mod domopersistentstorage; pub mod utils; pub use libp2p::identity::Keypair; + +#[doc(inline)] +pub use domocache::DomoCache as Cache; + +/// Cache configuration +pub use sifis_config::Cache as Config; + +/// Error type +#[derive(thiserror::Error, Debug)] +#[non_exhaustive] +pub enum Error { + #[error("I/O error")] + Io(#[from] std::io::Error), + #[error("JSON serialization-deserialization error")] + Json(#[from] serde_json::Error), + #[error("Cannot decode the identity key")] + Identity(#[from] libp2p::identity::DecodingError), + #[error("Cannot decode the PEM information")] + Pem(#[from] pem_rfc7468::Error), + #[error("Cannot parse the hex string: {0}")] + Hex(String), + #[error("Connection setup error")] + Transport(#[from] libp2p::TransportError), + #[error("Cannot parse the multiaddr")] + MultiAddr(#[from] libp2p::multiaddr::Error), + #[error("Internal channel dropped")] + Channel, + #[error("Missing configuration")] + MissingConfig, + #[error("Invalid JSONPath expression")] + Jsonpath, +} diff --git a/dht-cache/src/utils.rs b/dht-cache/src/utils.rs index 76d6f69..88cce77 100644 --- a/dht-cache/src/utils.rs +++ b/dht-cache/src/utils.rs @@ -1,4 +1,7 @@ +//! Miscellaneous utilities use std::time::{SystemTime, UNIX_EPOCH}; + +/// Compute the dht timestamps pub fn get_epoch_ms() -> u128 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/dht-config/src/lib.rs b/dht-config/src/lib.rs index 21c60d5..04d1c51 100644 --- a/dht-config/src/lib.rs +++ b/dht-config/src/lib.rs @@ -92,7 +92,7 @@ where } /// Set a custom config file path /// - /// By default uses the result of [Command::get_name] with the extension `.toml`. + /// By default uses the result of [clap::Command::get_name] with the extension `.toml`. pub fn with_config_path>(mut self, path: P) -> Self { self.default_path = Some(path.as_ref().to_owned()); self @@ -100,7 +100,7 @@ where /// Set a custom prefix for the env variable lookup /// - /// By default uses the result of [Command::get_name] with `_`. + /// By default uses the result of [clap::Command::get_name] with `_`. pub fn with_env_prefix(mut self, prefix: S) -> Self { self.prefix = Some(prefix.to_string()); self diff --git a/src/domobroker.rs b/src/domobroker.rs index d54369a..f676e4a 100644 --- a/src/domobroker.rs +++ b/src/domobroker.rs @@ -18,7 +18,7 @@ pub struct DomoBroker { enum Event { WebSocket(SyncWebSocketDomoMessage), Rest(RestMessage), - Cache(Result>), + Cache(DomoEvent), Continue, } @@ -50,7 +50,7 @@ impl DomoBroker { }, m = self.domo_cache.cache_event_loop() => { - return Cache(m); + return Cache(m.unwrap_or(DomoEvent::None)); } } @@ -242,10 +242,10 @@ impl DomoBroker { } } - fn handle_cache_event_loop(&mut self, m: Result>) -> DomoEvent { + fn handle_cache_event_loop(&mut self, m: DomoEvent) -> DomoEvent { match m { - Ok(DomoEvent::None) => DomoEvent::None, - Ok(DomoEvent::PersistentData(m)) => { + DomoEvent::None => DomoEvent::None, + DomoEvent::PersistentData(m) => { println!( "Persistent message received {} {}", m.topic_name, m.topic_uuid @@ -263,7 +263,7 @@ impl DomoBroker { println!("SENT DATA ON WS {}", get_epoch_ms()); DomoEvent::PersistentData(m2) } - Ok(DomoEvent::VolatileData(m)) => { + DomoEvent::VolatileData(m) => { println!("Volatile message {m}"); let m2 = m.clone(); @@ -274,7 +274,7 @@ impl DomoBroker { DomoEvent::VolatileData(m2) } - _ => DomoEvent::None, + DomoEvent::NewPeers(_) => DomoEvent::None, } } } diff --git a/src/main.rs b/src/main.rs index 7fc3265..429c664 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,9 @@ fn report_event(m: &DomoEvent) { DomoEvent::PersistentData(_v) => { println!("Persistent"); } + DomoEvent::NewPeers(peers) => { + println!("New peers {:#?}", peers); + } } }