Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new api optionally_get_with #187

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ pub use {
/// [invalidate-if]: ./struct.Cache.html#method.invalidate_entries_if
pub type PredicateId = String;

// Empty struct to be used in InitResult::InitErr to represent the Option None.
struct OptionallyNone;

pub struct Iter<'i, K, V>(crate::sync_base::iter::Iter<'i, K, V>);

impl<'i, K, V> Iter<'i, K, V> {
Expand Down
268 changes: 268 additions & 0 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use std::{
time::Duration,
};

use super::OptionallyNone;

/// A thread-safe, futures-aware concurrent in-memory cache.
///
/// `Cache` supports full concurrency of retrievals and a high expected concurrency
Expand Down Expand Up @@ -975,6 +977,106 @@ where
.await
}

/// Try to ensure the value of the key exists by inserting an `Some` output of
/// the init future. If not exist, returns a _clone_ of the value or `None`
/// produced by the future.
///
/// This method prevents to resolve the init future multiple times on the same
/// key even if the method is concurrently called by many async tasks; only one
/// of the calls resolves its future (as long as these futures return the value),
/// and other calls wait for that future to complete.
///
/// # Example
///
/// ```rust
/// // Cargo.toml
/// //
/// // [dependencies]
/// // moka = { version = "0.9", features = ["future"] }
/// // futures-util = "0.3"
/// // reqwest = "0.11"
/// // tokio = { version = "1", features = ["rt-multi-thread", "macros" ] }
/// use moka::future::Cache;
///
/// // This async function tries to get HTML from the given URI.
/// async fn get_html(task_id: u8, uri: &str) -> Option<String> {
/// println!("get_html() called by task {}.", task_id);
/// reqwest::get(uri).await.ok()?.text().await.ok()
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let cache = Cache::new(100);
///
/// // Spawn four async tasks.
/// let tasks: Vec<_> = (0..4_u8)
/// .map(|task_id| {
/// let my_cache = cache.clone();
/// tokio::spawn(async move {
/// println!("Task {} started.", task_id);
///
/// // Try to insert and get the value for key1. Although
/// // all four async tasks will call `try_get_with`
/// // at the same time, get_html() must be called only once.
/// let value = my_cache
/// .optionally_get_with(
/// "key1",
/// get_html(task_id, "https://www.rust-lang.org"),
/// ).await;
///
/// // Ensure the value exists now.
/// assert!(value.is_some());
/// assert!(my_cache.get(&"key1").is_some());
///
/// println!(
/// "Task {} got the value. (len: {})",
/// task_id,
/// value.unwrap().len()
/// );
/// })
/// })
/// .collect();
///
/// // Run all tasks concurrently and wait for them to complete.
/// futures_util::future::join_all(tasks).await;
/// }
/// ```
///
/// **A Sample Result**
///
/// - `get_html()` was called exactly once by task 2.
/// - Other tasks were blocked until task 2 inserted the value.
///
/// ```console
/// Task 1 started.
/// Task 0 started.
/// Task 2 started.
/// Task 3 started.
/// get_html() called by task 2.
/// Task 2 got the value. (len: 19419)
/// Task 1 got the value. (len: 19419)
/// Task 0 got the value. (len: 19419)
/// Task 3 got the value. (len: 19419)
/// ```
///
/// # Panics
///
/// This method panics when the `init` future has panicked. When it happens, only
/// the caller whose `init` future panicked will get the panic (e.g. only task 2
/// in the above sample). If there are other calls in progress (e.g. task 0, 1
/// and 3 above), this method will restart and resolve one of the remaining
/// `init` futures.
///
pub async fn optionally_get_with<F>(&self, key: K, init: F) -> Option<V>
where
F: Future<Output = Option<V>>,
{
let hash = self.base.hash(&key);
let key = Arc::new(key);
self.optionally_insert_with_hash_and_fun(key, hash, init)
.await
}

/// Inserts a key-value pair into the cache.
///
/// If the cache has this key present, the value is updated.
Expand Down Expand Up @@ -1273,6 +1375,40 @@ where
}
}

async fn optionally_insert_with_hash_and_fun<F>(
&self,
key: Arc<K>,
hash: u64,
init: F,
) -> Option<V>
where
F: Future<Output = Option<V>>,
{
let res = self.base.get_with_hash(&key, hash);

if res.is_some() {
return res;
}

match self
.value_initializer
.optionally_init_or_read(Arc::clone(&key), init)
.await
{
InitResult::Initialized(v) => {
let hash = self.base.hash(&key);
self.insert_with_hash(Arc::clone(&key), hash, v.clone())
.await;
self.value_initializer
.remove_waiter(&key, TypeId::of::<OptionallyNone>());
crossbeam_epoch::pin().flush();
Some(v)
}
InitResult::ReadExisting(v) => Some(v),
InitResult::InitErr(_) => None,
}
}

async fn insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) {
let (op, now) = self.base.do_insert_with_hash(key, hash, value);
let hk = self.base.housekeeper.as_ref();
Expand Down Expand Up @@ -2459,6 +2595,138 @@ mod tests {
futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}

#[tokio::test]
async fn optionally_get_with() {
let cache = Cache::new(100);
const KEY: u32 = 0;

// This test will run eight async tasks:
//
// Task1 will be the first task to call `optionally_get_with` for a key,
// so its async block will be evaluated and then an None will be
// returned. Nothing will be inserted to the cache.
let task1 = {
let cache1 = cache.clone();
async move {
// Call `try_get_with` immediately.
let v = cache1
.optionally_get_with(KEY, async {
// Wait for 300 ms and return an None.
Timer::after(Duration::from_millis(300)).await;
None
})
.await;
assert!(v.is_none());
}
};

// Task2 will be the second task to call `optionally_get_with` for the same key, so its
// async block will not be evaluated. Once task1's async block finishes, it
// will get the same error value returned by task1's async block.
let task2 = {
let cache2 = cache.clone();
async move {
// Wait for 100 ms before calling `optionally_get_with`.
Timer::after(Duration::from_millis(100)).await;
let v = cache2
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert!(v.is_none());
}
};

// Task3 will be the third task to call `optionally_get_with` for the
// same key. By the time it calls, task1's async block should have
// finished already, but the key still does not exist in the cache. So
// its async block will be evaluated and then an okay &str value will be
// returned. That value will be inserted to the cache.
let task3 = {
let cache3 = cache.clone();
async move {
// Wait for 400 ms before calling `optionally_get_with`.
Timer::after(Duration::from_millis(400)).await;
let v = cache3
.optionally_get_with(KEY, async {
// Wait for 300 ms and return an Some(&str) value.
Timer::after(Duration::from_millis(300)).await;
Some("task3")
})
.await;
assert_eq!(v.unwrap(), "task3");
}
};

// Task4 will be the fourth task to call `optionally_get_with` for the
// same key. So its async block will not be evaluated. Once task3's
// async block finishes, it will get the same okay &str value.
let task4 = {
let cache4 = cache.clone();
async move {
// Wait for 500 ms before calling `try_get_with`.
Timer::after(Duration::from_millis(500)).await;
let v = cache4
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};

// Task5 will be the fifth task to call `optionally_get_with` for the
// same key. So its async block will not be evaluated. By the time it
// calls, task3's async block should have finished already, so its async
// block will not be evaluated and will get the value insert by task3's
// async block immediately.
let task5 = {
let cache5 = cache.clone();
async move {
// Wait for 800 ms before calling `optionally_get_with`.
Timer::after(Duration::from_millis(800)).await;
let v = cache5
.optionally_get_with(KEY, async { unreachable!() })
.await;
assert_eq!(v.unwrap(), "task3");
}
};

// Task6 will call `get` for the same key. It will call when task1's async
// block is still running, so it will get none for the key.
let task6 = {
let cache6 = cache.clone();
async move {
// Wait for 200 ms before calling `get`.
Timer::after(Duration::from_millis(200)).await;
let maybe_v = cache6.get(&KEY);
assert!(maybe_v.is_none());
}
};

// Task7 will call `get` for the same key. It will call after task1's async
// block finished with an error. So it will get none for the key.
let task7 = {
let cache7 = cache.clone();
async move {
// Wait for 400 ms before calling `get`.
Timer::after(Duration::from_millis(400)).await;
let maybe_v = cache7.get(&KEY);
assert!(maybe_v.is_none());
}
};

// Task8 will call `get` for the same key. It will call after task3's async
// block finished, so it will get the value insert by task3's async block.
let task8 = {
let cache8 = cache.clone();
async move {
// Wait for 800 ms before calling `get`.
Timer::after(Duration::from_millis(800)).await;
let maybe_v = cache8.get(&KEY);
assert_eq!(maybe_v, Some("task3"));
}
};

futures_util::join!(task1, task2, task3, task4, task5, task6, task7, task8);
}

#[tokio::test]
// https://github.com/moka-rs/moka/issues/43
async fn handle_panic_in_get_with() {
Expand Down
36 changes: 36 additions & 0 deletions src/future/value_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::{
};
use triomphe::Arc as TrioArc;

use super::OptionallyNone;

const WAITER_MAP_NUM_SEGMENTS: usize = 64;

type ErrorObject = Arc<dyn Any + Send + Sync + 'static>;
Expand Down Expand Up @@ -156,6 +158,40 @@ where
self.do_try_init(&key, type_id, init, post_init).await
}

/// # Panics
/// Panics if the `init` future has been panicked.
pub(super) async fn optionally_init_or_read<F>(
&self,
key: Arc<K>,
init: F,
) -> InitResult<V, OptionallyNone>
where
F: Future<Output = Option<V>>,
{
let type_id = TypeId::of::<OptionallyNone>();

// This closure will be called after the init closure has returned a value.
// It will convert the returned value (from init) into an InitResult.
let post_init = |key, value: Option<V>, mut guard: WaiterGuard<'_, K, V, S>| match value {
Some(value) => {
guard.set_waiter_value(WaiterValue::Ready(Ok(value.clone())));
InitResult::Initialized(value)
}
None => {
// `value` can be either `Some` or `None`. For `None` case, without
// change the existing API too much, we will need to convert `None`
// to Arc<E> here. `Infalliable` could not be instantiated. So it
// might be good to use an empty struct to indicate the error type.
let err: ErrorObject = Arc::new(OptionallyNone);
tatsuya6502 marked this conversation as resolved.
Show resolved Hide resolved
guard.set_waiter_value(WaiterValue::Ready(Err(Arc::clone(&err))));
self.remove_waiter(key, type_id);
InitResult::InitErr(err.downcast().unwrap())
}
};

self.do_try_init(&key, type_id, init, post_init).await
}

/// # Panics
/// Panics if the `init` future has been panicked.
async fn do_try_init<'a, F, O, C, E>(
Expand Down
4 changes: 4 additions & 0 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ pub trait ConcurrentCacheExt<K, V> {
/// Performs any pending maintenance operations needed by the cache.
fn sync(&self);
}

// Empty internal struct to be used in optionally_get_with to represent the None
// results.
struct OptionallyNone;
Loading