Skip to content

Commit

Permalink
Support the plain LRU policy
Browse files Browse the repository at this point in the history
  • Loading branch information
tatsuya6502 committed Jan 27, 2024
1 parent 2e904db commit 8ac62ed
Show file tree
Hide file tree
Showing 9 changed files with 303 additions and 26 deletions.
35 changes: 24 additions & 11 deletions src/future/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
},
future::CancelGuard,
notification::{AsyncEvictionListener, RemovalCause},
policy::ExpirationPolicy,
policy::{EvictionPolicy, ExpirationPolicy},
sync_base::iter::ScanningGet,
Entry, Expiry, Policy, PredicateError,
};
Expand Down Expand Up @@ -167,6 +167,7 @@ where
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_policy: EvictionPolicy,
eviction_listener: Option<AsyncEvictionListener<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
Expand All @@ -187,6 +188,7 @@ where
initial_capacity,
build_hasher,
weigher,
eviction_policy,
eviction_listener,
r_rcv,
w_rcv,
Expand Down Expand Up @@ -1041,6 +1043,7 @@ pub(crate) struct Inner<K, V, S> {
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
maintenance_task_lock: RwLock<()>,
eviction_policy: EvictionPolicy,
expiration_policy: ExpirationPolicy<K, V>,
valid_after: AtomicInstant,
weigher: Option<Weigher<K, V>>,
Expand Down Expand Up @@ -1191,6 +1194,7 @@ where
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_policy: EvictionPolicy,
eviction_listener: Option<AsyncEvictionListener<K, V>>,
read_op_ch: Receiver<ReadOp<K, V>>,
write_op_ch: Receiver<WriteOp<K, V>>,
Expand Down Expand Up @@ -1247,6 +1251,7 @@ where
read_op_ch,
write_op_ch,
maintenance_task_lock: RwLock::default(),
eviction_policy,
expiration_policy,
valid_after: AtomicInstant::default(),
weigher,
Expand Down Expand Up @@ -1451,7 +1456,9 @@ where
.await;
}

if self.should_enable_frequency_sketch(&eviction_state.counters) {
if self.eviction_policy == EvictionPolicy::TinyLfu
&& self.should_enable_frequency_sketch(&eviction_state.counters)
{
self.enable_frequency_sketch(&eviction_state.counters).await;
}

Expand Down Expand Up @@ -1742,15 +1749,19 @@ where
}
}

let mut candidate = EntrySizeAndFrequency::new(new_weight);
candidate.add_frequency(freq, kh.hash);
// TODO: Refactoring the policy implementations.
// https://github.com/moka-rs/moka/issues/389

// Try to admit the candidate.
//
// NOTE: We need to call `admit` here, instead of a part of the `match`
// expression. Otherwise the future returned from this `handle_upsert` method
// will not be `Send`.
let admission_result = Self::admit(&candidate, &self.cache, deqs, freq);
let admission_result = match &self.eviction_policy {
EvictionPolicy::TinyLfu => {
let mut candidate = EntrySizeAndFrequency::new(new_weight);
candidate.add_frequency(freq, kh.hash);
Self::admit(&candidate, &self.cache, deqs, freq)
}
EvictionPolicy::Lru => AdmissionResult::Admitted { victim_keys: SmallVec::default() },
};

match admission_result {
AdmissionResult::Admitted { victim_keys } => {
// Try to remove the victims from the hash map.
Expand Down Expand Up @@ -2760,7 +2771,7 @@ fn is_expired_by_ttl(

#[cfg(test)]
mod tests {
use crate::policy::ExpirationPolicy;
use crate::policy::{EvictionPolicy, ExpirationPolicy};

use super::BaseCache;

Expand All @@ -2779,8 +2790,9 @@ mod tests {
None,
RandomState::default(),
None,
EvictionPolicy::default(),
None,
Default::default(),
ExpirationPolicy::default(),
false,
);
cache.inner.enable_frequency_sketch_for_testing().await;
Expand Down Expand Up @@ -3127,6 +3139,7 @@ mod tests {
None,
RandomState::default(),
None,
EvictionPolicy::default(),
None,
ExpirationPolicy::new(
Some(Duration::from_secs(TTL)),
Expand Down
13 changes: 12 additions & 1 deletion src/future/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{Cache, FutureExt};
use crate::{
common::{builder_utils, concurrent::Weigher},
notification::{AsyncEvictionListener, ListenerFuture, RemovalCause},
policy::ExpirationPolicy,
policy::{EvictionPolicy, ExpirationPolicy},
Expiry,
};

Expand Down Expand Up @@ -60,6 +60,7 @@ pub struct CacheBuilder<K, V, C> {
max_capacity: Option<u64>,
initial_capacity: Option<usize>,
weigher: Option<Weigher<K, V>>,
eviction_policy: EvictionPolicy,
eviction_listener: Option<AsyncEvictionListener<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
Expand All @@ -77,6 +78,7 @@ where
max_capacity: None,
initial_capacity: None,
weigher: None,
eviction_policy: EvictionPolicy::default(),
eviction_listener: None,
expiration_policy: ExpirationPolicy::default(),
invalidator_enabled: false,
Expand Down Expand Up @@ -116,6 +118,7 @@ where
self.initial_capacity,
build_hasher,
self.weigher,
self.eviction_policy,
self.eviction_listener,
self.expiration_policy,
self.invalidator_enabled,
Expand Down Expand Up @@ -212,6 +215,7 @@ where
self.initial_capacity,
hasher,
self.weigher,
self.eviction_policy,
self.eviction_listener,
self.expiration_policy,
self.invalidator_enabled,
Expand Down Expand Up @@ -245,6 +249,13 @@ impl<K, V, C> CacheBuilder<K, V, C> {
}
}

pub fn eviction_policy(self, policy: EvictionPolicy) -> Self {
Self {
eviction_policy: policy,
..self
}
}

/// Sets the weigher closure to the cache.
///
/// The closure should take `&K` and `&V` as the arguments and returns a `u32`
Expand Down
108 changes: 106 additions & 2 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
common::concurrent::Weigher,
notification::AsyncEvictionListener,
ops::compute::{self, CompResult},
policy::ExpirationPolicy,
policy::{EvictionPolicy, ExpirationPolicy},
Entry, Policy, PredicateError,
};

Expand Down Expand Up @@ -787,6 +787,7 @@ where
None,
build_hasher,
None,
EvictionPolicy::default(),
None,
ExpirationPolicy::default(),
false,
Expand Down Expand Up @@ -816,6 +817,7 @@ where
initial_capacity: Option<usize>,
build_hasher: S,
weigher: Option<Weigher<K, V>>,
eviction_policy: EvictionPolicy,
eviction_listener: Option<AsyncEvictionListener<K, V>>,
expiration_policy: ExpirationPolicy<K, V>,
invalidator_enabled: bool,
Expand All @@ -827,6 +829,7 @@ where
initial_capacity,
build_hasher.clone(),
weigher,
eviction_policy,
eviction_listener,
expiration_policy,
invalidator_enabled,
Expand Down Expand Up @@ -2115,7 +2118,7 @@ mod tests {
future::FutureExt,
notification::{ListenerFuture, RemovalCause},
ops::compute,
policy::test_utils::ExpiryCallCounters,
policy::{test_utils::ExpiryCallCounters, EvictionPolicy},
Expiry,
};

Expand Down Expand Up @@ -2313,6 +2316,107 @@ mod tests {
assert!(cache.key_locks_map_is_empty());
}

#[tokio::test]
async fn basic_lru_single_thread() {
// The following `Vec`s will hold actual and expected notifications.
let actual = Arc::new(Mutex::new(Vec::new()));
let mut expected = Vec::new();

// Create an eviction listener.
let a1 = Arc::clone(&actual);
let listener = move |k, v, cause| -> ListenerFuture {
let a2 = Arc::clone(&a1);
async move {
a2.lock().await.push((k, v, cause));
}
.boxed()
};

// Create a cache with the eviction listener.
let mut cache = Cache::builder()
.max_capacity(3)
.eviction_policy(EvictionPolicy::Lru)
.async_eviction_listener(listener)
.build();
cache.reconfigure_for_testing().await;

// Make the cache exterior immutable.
let cache = cache;

cache.insert("a", "alice").await;
cache.insert("b", "bob").await;
assert_eq!(cache.get(&"a").await, Some("alice"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert_eq!(cache.get(&"b").await, Some("bob"));
cache.run_pending_tasks().await;
// a -> b

cache.insert("c", "cindy").await;
assert_eq!(cache.get(&"c").await, Some("cindy"));
assert!(cache.contains_key(&"c"));
cache.run_pending_tasks().await;
// a -> b -> c

assert!(cache.contains_key(&"a"));
assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert!(cache.contains_key(&"b"));
cache.run_pending_tasks().await;
// c -> a -> b

// "d" should be admitted because the cache uses the LRU strategy.
cache.insert("d", "david").await;
// "c" is the LRU and should have be evicted.
expected.push((Arc::new("c"), "cindy", RemovalCause::Size));
cache.run_pending_tasks().await;

assert_eq!(cache.get(&"a").await, Some("alice"));
assert_eq!(cache.get(&"b").await, Some("bob"));
assert_eq!(cache.get(&"c").await, None);
assert_eq!(cache.get(&"d").await, Some("david"));
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(!cache.contains_key(&"c"));
assert!(cache.contains_key(&"d"));
cache.run_pending_tasks().await;
// a -> b -> d

cache.invalidate(&"b").await;
expected.push((Arc::new("b"), "bob", RemovalCause::Explicit));
cache.run_pending_tasks().await;
// a -> d
assert_eq!(cache.get(&"b").await, None);
assert!(!cache.contains_key(&"b"));

assert!(cache.remove(&"b").await.is_none());
assert_eq!(cache.remove(&"d").await, Some("david"));
expected.push((Arc::new("d"), "david", RemovalCause::Explicit));
cache.run_pending_tasks().await;
// a
assert_eq!(cache.get(&"d").await, None);
assert!(!cache.contains_key(&"d"));

cache.insert("e", "emily").await;
cache.insert("f", "frank").await;
// "a" should be evicted because it is the LRU.
cache.insert("g", "gina").await;
expected.push((Arc::new("a"), "alice", RemovalCause::Size));
cache.run_pending_tasks().await;
// e -> f -> g
assert_eq!(cache.get(&"a").await, None);
assert_eq!(cache.get(&"e").await, Some("emily"));
assert_eq!(cache.get(&"f").await, Some("frank"));
assert_eq!(cache.get(&"g").await, Some("gina"));
assert!(!cache.contains_key(&"a"));
assert!(cache.contains_key(&"e"));
assert!(cache.contains_key(&"f"));
assert!(cache.contains_key(&"g"));

verify_notification_vec(&cache, actual, &expected).await;
assert!(cache.key_locks_map_is_empty());
}

#[tokio::test]
async fn size_aware_eviction() {
let weigher = |_k: &&str, v: &(&str, u32)| v.1;
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub(crate) mod common;
pub mod ops;

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) mod policy;
pub mod policy;

#[cfg(any(feature = "sync", feature = "future"))]
pub(crate) mod sync_base;
Expand Down
12 changes: 12 additions & 0 deletions src/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ impl Policy {
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum EvictionPolicy {
TinyLfu,
Lru,
}

impl Default for EvictionPolicy {
fn default() -> Self {
Self::TinyLfu
}
}

/// Calculates when cache entries expire. A single expiration time is retained on
/// each entry so that the lifetime of an entry may be extended or reduced by
/// subsequent evaluations.
Expand Down
Loading

0 comments on commit 8ac62ed

Please sign in to comment.