Skip to content

Commit

Permalink
feat!: overhaul dht_cache::cache::Query API
Browse files Browse the repository at this point in the history
  • Loading branch information
dodomorandi committed Sep 29, 2023
1 parent 4fd2dbe commit abff470
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 40 deletions.
5 changes: 3 additions & 2 deletions dht-cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
};

use self::local::DomoCacheElement;
pub use self::local::{LocalCache, Query};
pub use self::local::{LocalCache, Query, QueryGet, QueryGetIter};

/// DHT state change
#[derive(Debug)]
Expand Down Expand Up @@ -177,7 +177,8 @@ impl Cache {
}

/// Query the local cache
pub fn query(&self, topic: &str) -> Query {
#[must_use]
pub fn query<'a>(&'a self, topic: &'a str) -> Query<'a> {
self.local.query(topic)
}

Expand Down
155 changes: 117 additions & 38 deletions dht-cache/src/cache/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ pub use crate::data::*;
use crate::domopersistentstorage::{DomoPersistentStorage, SqlxStorage};
use serde_json::Value;
use std::collections::hash_map::DefaultHasher;
use std::collections::BTreeMap;
use std::collections::{btree_map, BTreeMap};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
use tokio::sync::{OwnedRwLockReadGuard, RwLock};
use tokio::sync::{OwnedRwLockReadGuard, RwLock, RwLockReadGuard};

enum SqlxCommand {
Write(DomoCacheElement),
}

#[derive(Default)]
#[derive(Debug, Default)]
pub(crate) struct InnerCache {
pub mem: BTreeMap<String, BTreeMap<String, DomoCacheElement>>,
store: Option<UnboundedSender<SqlxCommand>>,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl LocalCache {
}

/// Instantiate a query over the local cache
pub fn query(&self, topic: &str) -> Query {
pub fn query<'a>(&self, topic: &'a str) -> Query<'a> {
Query::new(topic, self.clone())
}

Expand All @@ -161,44 +161,120 @@ impl LocalCache {

/// Query the local DHT cache
#[derive(Clone)]
pub struct Query {
pub struct Query<'a> {
cache: LocalCache,
topic: String,
uuid: Option<String>,
topic: &'a str,
}

impl Query {
impl<'a> Query<'a> {
/// Create a new query over a local cache
pub fn new(topic: &str, cache: LocalCache) -> Self {
Self {
topic: topic.to_owned(),
cache,
uuid: None,
}
pub fn new(topic: &'a str, cache: LocalCache) -> Self {
Self { topic, cache }
}
/// Look up for a specific uuid
pub fn with_uuid(mut self, uuid: &str) -> Self {
self.uuid = Some(uuid.to_owned());
self

/// Gets a value on a topic given a specific UUID.
///
/// Keep in mind that the returned type holds a lock guard to the underlying data, be careful
/// to use it across yield points.
pub async fn get_by_uuid<'b>(&'b self, uuid: &'b str) -> Option<RwLockReadGuard<'b, Value>> {
RwLockReadGuard::try_map(self.cache.0.read().await, |cache| {
cache
.mem
.get(self.topic)
.and_then(|tree| tree.get(uuid))
.map(|cache_element| &cache_element.value)
})
.ok()
}

/// Execute the query and return a Value if found
pub async fn get(&self) -> Vec<Value> {
let cache = self.cache.0.read().await;

if let Some(topics) = cache.mem.get(&self.topic) {
if let Some(ref uuid) = self.uuid {
topics
.get(uuid)
.into_iter()
.map(|elem| elem.value.clone())
.collect()
} else {
topics.values().map(|elem| elem.value.clone()).collect()
}
} else {
Vec::new()
}
/// Gets the data stored for the topic.
///
/// It returns an _iterable type_ that can be used to obtain pairs of UUID and values.
///
/// Keep in mind that the returned type holds a lock guard to the underlying data, be careful
/// to use it across yield points.
///
/// # Example
///
/// ```
/// # use sifis_dht::cache::Query;
/// # async fn handle_query(query: Query<'_>) {
/// let get = query.get().await;
/// for pair in &get {
/// let (uuid, value): (&str, &serde_json::Value) = pair;
/// println!("{uuid}, {value}");
/// }
/// # }
/// ```
#[inline]
pub async fn get(&self) -> QueryGet<'_> {
let lock =
RwLockReadGuard::try_map(self.cache.0.read().await, |cache| cache.mem.get(self.topic))
.ok();

QueryGet(lock)
}
}

#[derive(Debug)]
pub struct QueryGet<'a>(Option<RwLockReadGuard<'a, BTreeMap<String, DomoCacheElement>>>);

impl<'a> QueryGet<'a> {
/// Iterate over queried pairs of UUIDs and values.
#[inline]
#[must_use]
pub fn iter(&'a self) -> QueryGetIter<'a> {
IntoIterator::into_iter(self)
}
}

impl<'a> IntoIterator for &'a QueryGet<'a> {
type Item = (&'a str, &'a Value);
type IntoIter = QueryGetIter<'a>;

#[inline]
fn into_iter(self) -> Self::IntoIter {
let values = self
.0
.as_deref()
.map_or_else(Default::default, BTreeMap::iter);

QueryGetIter(values)
}
}

#[derive(Debug)]
pub struct QueryGetIter<'a>(btree_map::Iter<'a, String, DomoCacheElement>);

impl<'a> Iterator for QueryGetIter<'a> {
type Item = (&'a str, &'a Value);

#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.0
.next()
.map(|(uuid, cache_element)| (&**uuid, &cache_element.value))
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl DoubleEndedIterator for QueryGetIter<'_> {
#[inline]
fn next_back(&mut self) -> Option<Self::Item> {
self.0
.next_back()
.map(|(uuid, cache_element)| (&**uuid, &cache_element.value))
}
}

impl ExactSizeIterator for QueryGetIter<'_> {
#[inline]
fn len(&self) -> usize {
self.0.len()
}
}

Expand Down Expand Up @@ -315,12 +391,15 @@ mod test {

let q = cache.query("Domo::Light");

assert_eq!(q.get().await.len(), 10);
assert_eq!(q.get().await.iter().len(), 10);

assert_eq!(q.clone().with_uuid("not-existent").get().await.len(), 0);
assert!(q.get_by_uuid("not-existent").await.is_none());

assert_eq!(
q.clone().with_uuid("luce-1").get().await[0]
q.clone()
.get_by_uuid("luce-1")
.await
.unwrap()
.get("count")
.unwrap(),
1
Expand Down

0 comments on commit abff470

Please sign in to comment.