diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index fc733003..f1c16d1e 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -27,7 +27,7 @@ use crate::{ }, future::CancelGuard, notification::{AsyncEvictionListener, RemovalCause}, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, sync_base::iter::ScanningGet, Entry, Expiry, Policy, PredicateError, }; @@ -167,6 +167,7 @@ where initial_capacity: Option, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -187,6 +188,7 @@ where initial_capacity, build_hasher, weigher, + eviction_policy, eviction_listener, r_rcv, w_rcv, @@ -1041,6 +1043,7 @@ pub(crate) struct Inner { read_op_ch: Receiver>, write_op_ch: Receiver>, maintenance_task_lock: RwLock<()>, + eviction_policy: EvictionPolicy, expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, weigher: Option>, @@ -1191,6 +1194,7 @@ where initial_capacity: Option, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, read_op_ch: Receiver>, write_op_ch: Receiver>, @@ -1247,6 +1251,7 @@ where read_op_ch, write_op_ch, maintenance_task_lock: RwLock::default(), + eviction_policy, expiration_policy, valid_after: AtomicInstant::default(), weigher, @@ -1451,7 +1456,9 @@ where .await; } - if self.should_enable_frequency_sketch(&eviction_state.counters) { + if self.eviction_policy == EvictionPolicy::TinyLfu + && self.should_enable_frequency_sketch(&eviction_state.counters) + { self.enable_frequency_sketch(&eviction_state.counters).await; } @@ -1742,15 +1749,19 @@ where } } - let mut candidate = EntrySizeAndFrequency::new(new_weight); - candidate.add_frequency(freq, kh.hash); + // TODO: Refactoring the policy implementations. + // https://github.com/moka-rs/moka/issues/389 // Try to admit the candidate. - // - // NOTE: We need to call `admit` here, instead of a part of the `match` - // expression. Otherwise the future returned from this `handle_upsert` method - // will not be `Send`. - let admission_result = Self::admit(&candidate, &self.cache, deqs, freq); + let admission_result = match &self.eviction_policy { + EvictionPolicy::TinyLfu => { + let mut candidate = EntrySizeAndFrequency::new(new_weight); + candidate.add_frequency(freq, kh.hash); + Self::admit(&candidate, &self.cache, deqs, freq) + } + EvictionPolicy::Lru => AdmissionResult::Admitted { victim_keys: SmallVec::default() }, + }; + match admission_result { AdmissionResult::Admitted { victim_keys } => { // Try to remove the victims from the hash map. @@ -2760,7 +2771,7 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { - use crate::policy::ExpirationPolicy; + use crate::policy::{EvictionPolicy, ExpirationPolicy}; use super::BaseCache; @@ -2779,8 +2790,9 @@ mod tests { None, RandomState::default(), None, + EvictionPolicy::default(), None, - Default::default(), + ExpirationPolicy::default(), false, ); cache.inner.enable_frequency_sketch_for_testing().await; @@ -3127,6 +3139,7 @@ mod tests { None, RandomState::default(), None, + EvictionPolicy::default(), None, ExpirationPolicy::new( Some(Duration::from_secs(TTL)), diff --git a/src/future/builder.rs b/src/future/builder.rs index ca7f52ad..b71095bf 100644 --- a/src/future/builder.rs +++ b/src/future/builder.rs @@ -2,7 +2,7 @@ use super::{Cache, FutureExt}; use crate::{ common::{builder_utils, concurrent::Weigher}, notification::{AsyncEvictionListener, ListenerFuture, RemovalCause}, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, Expiry, }; @@ -60,6 +60,7 @@ pub struct CacheBuilder { max_capacity: Option, initial_capacity: Option, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -77,6 +78,7 @@ where max_capacity: None, initial_capacity: None, weigher: None, + eviction_policy: EvictionPolicy::default(), eviction_listener: None, expiration_policy: ExpirationPolicy::default(), invalidator_enabled: false, @@ -116,6 +118,7 @@ where self.initial_capacity, build_hasher, self.weigher, + self.eviction_policy, self.eviction_listener, self.expiration_policy, self.invalidator_enabled, @@ -212,6 +215,7 @@ where self.initial_capacity, hasher, self.weigher, + self.eviction_policy, self.eviction_listener, self.expiration_policy, self.invalidator_enabled, @@ -245,6 +249,13 @@ impl CacheBuilder { } } + pub fn eviction_policy(self, policy: EvictionPolicy) -> Self { + Self { + eviction_policy: policy, + ..self + } + } + /// Sets the weigher closure to the cache. /// /// The closure should take `&K` and `&V` as the arguments and returns a `u32` diff --git a/src/future/cache.rs b/src/future/cache.rs index 523f49cd..d9a8a394 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -8,7 +8,7 @@ use crate::{ common::concurrent::Weigher, notification::AsyncEvictionListener, ops::compute::{self, CompResult}, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, Entry, Policy, PredicateError, }; @@ -787,6 +787,7 @@ where None, build_hasher, None, + EvictionPolicy::default(), None, ExpirationPolicy::default(), false, @@ -816,6 +817,7 @@ where initial_capacity: Option, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -827,6 +829,7 @@ where initial_capacity, build_hasher.clone(), weigher, + eviction_policy, eviction_listener, expiration_policy, invalidator_enabled, @@ -2115,7 +2118,7 @@ mod tests { future::FutureExt, notification::{ListenerFuture, RemovalCause}, ops::compute, - policy::test_utils::ExpiryCallCounters, + policy::{test_utils::ExpiryCallCounters, EvictionPolicy}, Expiry, }; @@ -2313,6 +2316,107 @@ mod tests { assert!(cache.key_locks_map_is_empty()); } + #[tokio::test] + async fn basic_lru_single_thread() { + // The following `Vec`s will hold actual and expected notifications. + let actual = Arc::new(Mutex::new(Vec::new())); + let mut expected = Vec::new(); + + // Create an eviction listener. + let a1 = Arc::clone(&actual); + let listener = move |k, v, cause| -> ListenerFuture { + let a2 = Arc::clone(&a1); + async move { + a2.lock().await.push((k, v, cause)); + } + .boxed() + }; + + // Create a cache with the eviction listener. + let mut cache = Cache::builder() + .max_capacity(3) + .eviction_policy(EvictionPolicy::Lru) + .async_eviction_listener(listener) + .build(); + cache.reconfigure_for_testing().await; + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice").await; + cache.insert("b", "bob").await; + assert_eq!(cache.get(&"a").await, Some("alice")); + assert!(cache.contains_key(&"a")); + assert!(cache.contains_key(&"b")); + assert_eq!(cache.get(&"b").await, Some("bob")); + cache.run_pending_tasks().await; + // a -> b + + cache.insert("c", "cindy").await; + assert_eq!(cache.get(&"c").await, Some("cindy")); + assert!(cache.contains_key(&"c")); + cache.run_pending_tasks().await; + // a -> b -> c + + assert!(cache.contains_key(&"a")); + assert_eq!(cache.get(&"a").await, Some("alice")); + assert_eq!(cache.get(&"b").await, Some("bob")); + assert!(cache.contains_key(&"b")); + cache.run_pending_tasks().await; + // c -> a -> b + + // "d" should be admitted because the cache uses the LRU strategy. + cache.insert("d", "david").await; + // "c" is the LRU and should have be evicted. + expected.push((Arc::new("c"), "cindy", RemovalCause::Size)); + cache.run_pending_tasks().await; + + assert_eq!(cache.get(&"a").await, Some("alice")); + assert_eq!(cache.get(&"b").await, Some("bob")); + assert_eq!(cache.get(&"c").await, None); + assert_eq!(cache.get(&"d").await, Some("david")); + assert!(cache.contains_key(&"a")); + assert!(cache.contains_key(&"b")); + assert!(!cache.contains_key(&"c")); + assert!(cache.contains_key(&"d")); + cache.run_pending_tasks().await; + // a -> b -> d + + cache.invalidate(&"b").await; + expected.push((Arc::new("b"), "bob", RemovalCause::Explicit)); + cache.run_pending_tasks().await; + // a -> d + assert_eq!(cache.get(&"b").await, None); + assert!(!cache.contains_key(&"b")); + + assert!(cache.remove(&"b").await.is_none()); + assert_eq!(cache.remove(&"d").await, Some("david")); + expected.push((Arc::new("d"), "david", RemovalCause::Explicit)); + cache.run_pending_tasks().await; + // a + assert_eq!(cache.get(&"d").await, None); + assert!(!cache.contains_key(&"d")); + + cache.insert("e", "emily").await; + cache.insert("f", "frank").await; + // "a" should be evicted because it is the LRU. + cache.insert("g", "gina").await; + expected.push((Arc::new("a"), "alice", RemovalCause::Size)); + cache.run_pending_tasks().await; + // e -> f -> g + assert_eq!(cache.get(&"a").await, None); + assert_eq!(cache.get(&"e").await, Some("emily")); + assert_eq!(cache.get(&"f").await, Some("frank")); + assert_eq!(cache.get(&"g").await, Some("gina")); + assert!(!cache.contains_key(&"a")); + assert!(cache.contains_key(&"e")); + assert!(cache.contains_key(&"f")); + assert!(cache.contains_key(&"g")); + + verify_notification_vec(&cache, actual, &expected).await; + assert!(cache.key_locks_map_is_empty()); + } + #[tokio::test] async fn size_aware_eviction() { let weigher = |_k: &&str, v: &(&str, u32)| v.1; diff --git a/src/lib.rs b/src/lib.rs index c56f4c69..4bd4a800 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -114,7 +114,7 @@ pub(crate) mod common; pub mod ops; #[cfg(any(feature = "sync", feature = "future"))] -pub(crate) mod policy; +pub mod policy; #[cfg(any(feature = "sync", feature = "future"))] pub(crate) mod sync_base; diff --git a/src/policy.rs b/src/policy.rs index 2856df22..e85cb3a0 100644 --- a/src/policy.rs +++ b/src/policy.rs @@ -58,6 +58,18 @@ impl Policy { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum EvictionPolicy { + TinyLfu, + Lru, +} + +impl Default for EvictionPolicy { + fn default() -> Self { + Self::TinyLfu + } +} + /// Calculates when cache entries expire. A single expiration time is retained on /// each entry so that the lifetime of an entry may be extended or reduced by /// subsequent evaluations. diff --git a/src/sync/builder.rs b/src/sync/builder.rs index 0309e8a0..932573d0 100644 --- a/src/sync/builder.rs +++ b/src/sync/builder.rs @@ -2,7 +2,7 @@ use super::{Cache, SegmentedCache}; use crate::{ common::{builder_utils, concurrent::Weigher}, notification::{EvictionListener, RemovalCause}, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, Expiry, }; @@ -53,6 +53,7 @@ pub struct CacheBuilder { initial_capacity: Option, num_segments: Option, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -72,6 +73,7 @@ where num_segments: None, weigher: None, eviction_listener: None, + eviction_policy: EvictionPolicy::default(), expiration_policy: ExpirationPolicy::default(), invalidator_enabled: false, cache_type: PhantomData, @@ -110,6 +112,7 @@ where initial_capacity: self.initial_capacity, num_segments: Some(num_segments), weigher: self.weigher, + eviction_policy: self.eviction_policy, eviction_listener: self.eviction_listener, expiration_policy: self.expiration_policy, invalidator_enabled: self.invalidator_enabled, @@ -137,6 +140,7 @@ where self.initial_capacity, build_hasher, self.weigher, + self.eviction_policy, self.eviction_listener, self.expiration_policy, self.invalidator_enabled, @@ -223,6 +227,7 @@ where self.initial_capacity, hasher, self.weigher, + self.eviction_policy, self.eviction_listener, self.expiration_policy, self.invalidator_enabled, @@ -256,6 +261,7 @@ where self.num_segments.unwrap(), build_hasher, self.weigher, + self.eviction_policy, self.eviction_listener, self.expiration_policy, self.invalidator_enabled, @@ -344,6 +350,7 @@ where self.num_segments.unwrap(), hasher, self.weigher, + self.eviction_policy, self.eviction_listener, self.expiration_policy, self.invalidator_enabled, @@ -377,6 +384,13 @@ impl CacheBuilder { } } + pub fn eviction_policy(self, policy: EvictionPolicy) -> Self { + Self { + eviction_policy: policy, + ..self + } + } + /// Sets the weigher closure to the cache. /// /// The closure should take `&K` and `&V` as the arguments and returns a `u32` diff --git a/src/sync/cache.rs b/src/sync/cache.rs index 58d67164..210b0425 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -11,7 +11,7 @@ use crate::{ }, notification::EvictionListener, ops::compute::{self, CompResult}, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, sync::{Iter, PredicateId}, sync_base::{ base_cache::{BaseCache, HouseKeeperArc}, @@ -707,6 +707,7 @@ where None, build_hasher, None, + EvictionPolicy::default(), None, ExpirationPolicy::default(), false, @@ -736,6 +737,7 @@ where initial_capacity: Option, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -747,6 +749,7 @@ where initial_capacity, build_hasher.clone(), weigher, + eviction_policy, eviction_listener, expiration_policy, invalidator_enabled, @@ -1904,7 +1907,9 @@ where mod tests { use super::Cache; use crate::{ - common::time::Clock, notification::RemovalCause, policy::test_utils::ExpiryCallCounters, + common::time::Clock, + notification::RemovalCause, + policy::{test_utils::ExpiryCallCounters, EvictionPolicy}, Expiry, }; @@ -2019,6 +2024,101 @@ mod tests { assert!(cache.key_locks_map_is_empty()); } + #[test] + fn basic_lru_single_thread() { + // The following `Vec`s will hold actual and expected notifications. + let actual = Arc::new(Mutex::new(Vec::new())); + let mut expected = Vec::new(); + + // Create an eviction listener. + let a1 = Arc::clone(&actual); + let listener = move |k, v, cause| a1.lock().push((k, v, cause)); + + // Create a cache with the eviction listener. + let mut cache = Cache::builder() + .max_capacity(3) + .eviction_policy(EvictionPolicy::Lru) + .eviction_listener(listener) + .build(); + cache.reconfigure_for_testing(); + + // Make the cache exterior immutable. + let cache = cache; + + cache.insert("a", "alice"); + cache.insert("b", "bob"); + assert_eq!(cache.get(&"a"), Some("alice")); + assert!(cache.contains_key(&"a")); + assert!(cache.contains_key(&"b")); + assert_eq!(cache.get(&"b"), Some("bob")); + cache.run_pending_tasks(); + // a -> b + + cache.insert("c", "cindy"); + assert_eq!(cache.get(&"c"), Some("cindy")); + assert!(cache.contains_key(&"c")); + cache.run_pending_tasks(); + // a -> b -> c + + assert!(cache.contains_key(&"a")); + assert_eq!(cache.get(&"a"), Some("alice")); + assert_eq!(cache.get(&"b"), Some("bob")); + assert!(cache.contains_key(&"b")); + cache.run_pending_tasks(); + // c -> a -> b + + // "d" should be admitted because the cache uses the LRU strategy. + cache.insert("d", "david"); + // "c" is the LRU and should have be evicted. + expected.push((Arc::new("c"), "cindy", RemovalCause::Size)); + cache.run_pending_tasks(); + + assert_eq!(cache.get(&"a"), Some("alice")); + assert_eq!(cache.get(&"b"), Some("bob")); + assert_eq!(cache.get(&"c"), None); + assert_eq!(cache.get(&"d"), Some("david")); + assert!(cache.contains_key(&"a")); + assert!(cache.contains_key(&"b")); + assert!(!cache.contains_key(&"c")); + assert!(cache.contains_key(&"d")); + cache.run_pending_tasks(); + // a -> b -> d + + cache.invalidate(&"b"); + expected.push((Arc::new("b"), "bob", RemovalCause::Explicit)); + cache.run_pending_tasks(); + // a -> d + assert_eq!(cache.get(&"b"), None); + assert!(!cache.contains_key(&"b")); + + assert!(cache.remove(&"b").is_none()); + assert_eq!(cache.remove(&"d"), Some("david")); + expected.push((Arc::new("d"), "david", RemovalCause::Explicit)); + cache.run_pending_tasks(); + // a + assert_eq!(cache.get(&"d"), None); + assert!(!cache.contains_key(&"d")); + + cache.insert("e", "emily"); + cache.insert("f", "frank"); + // "a" should be evicted because it is the LRU. + cache.insert("g", "gina"); + expected.push((Arc::new("a"), "alice", RemovalCause::Size)); + cache.run_pending_tasks(); + // e -> f -> g + assert_eq!(cache.get(&"a"), None); + assert_eq!(cache.get(&"e"), Some("emily")); + assert_eq!(cache.get(&"f"), Some("frank")); + assert_eq!(cache.get(&"g"), Some("gina")); + assert!(!cache.contains_key(&"a")); + assert!(cache.contains_key(&"e")); + assert!(cache.contains_key(&"f")); + assert!(cache.contains_key(&"g")); + + verify_notification_vec(&cache, actual, &expected); + assert!(cache.key_locks_map_is_empty()); + } + #[test] fn size_aware_eviction() { let weigher = |_k: &&str, v: &(&str, u32)| v.1; diff --git a/src/sync/segment.rs b/src/sync/segment.rs index 53251ae9..fb3a9cf0 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -2,7 +2,7 @@ use super::{cache::Cache, CacheBuilder, OwnedKeyEntrySelector, RefKeyEntrySelect use crate::{ common::concurrent::Weigher, notification::EvictionListener, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, sync_base::iter::{Iter, ScanningGet}, Entry, Policy, PredicateError, }; @@ -102,6 +102,7 @@ where num_segments, build_hasher, None, + EvictionPolicy::default(), None, ExpirationPolicy::default(), false, @@ -207,6 +208,7 @@ where num_segments: usize, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -219,6 +221,7 @@ where num_segments, build_hasher, weigher, + eviction_policy, eviction_listener, expiration_policy, invalidator_enabled, @@ -729,6 +732,7 @@ where num_segments: usize, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -751,6 +755,7 @@ where seg_init_capacity, build_hasher.clone(), weigher.as_ref().map(Arc::clone), + eviction_policy.clone(), eviction_listener.as_ref().map(Arc::clone), expiration_policy.clone(), invalidator_enabled, diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 304083dc..75b73cab 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -26,7 +26,7 @@ use crate::{ CacheRegion, }, notification::{notifier::RemovalNotifier, EvictionListener, RemovalCause}, - policy::ExpirationPolicy, + policy::{EvictionPolicy, ExpirationPolicy}, Entry, Expiry, Policy, PredicateError, }; @@ -142,6 +142,7 @@ where initial_capacity: Option, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, expiration_policy: ExpirationPolicy, invalidator_enabled: bool, @@ -161,6 +162,7 @@ where initial_capacity, build_hasher, weigher, + eviction_policy, eviction_listener, r_rcv, w_rcv, @@ -901,6 +903,7 @@ pub(crate) struct Inner { frequency_sketch_enabled: AtomicBool, read_op_ch: Receiver>, write_op_ch: Receiver>, + eviction_policy: EvictionPolicy, expiration_policy: ExpirationPolicy, valid_after: AtomicInstant, weigher: Option>, @@ -1038,6 +1041,7 @@ where initial_capacity: Option, build_hasher: S, weigher: Option>, + eviction_policy: EvictionPolicy, eviction_listener: Option>, read_op_ch: Receiver>, write_op_ch: Receiver>, @@ -1094,6 +1098,7 @@ where frequency_sketch_enabled: AtomicBool::default(), read_op_ch, write_op_ch, + eviction_policy, expiration_policy, valid_after: AtomicInstant::default(), weigher, @@ -1285,7 +1290,9 @@ where self.apply_writes(&mut deqs, &mut timer_wheel, w_len, &mut eviction_state); } - if self.should_enable_frequency_sketch(&eviction_state.counters) { + if self.eviction_policy == EvictionPolicy::TinyLfu + && self.should_enable_frequency_sketch(&eviction_state.counters) + { self.enable_frequency_sketch(&eviction_state.counters); } @@ -1554,11 +1561,20 @@ where } } - let mut candidate = EntrySizeAndFrequency::new(new_weight); - candidate.add_frequency(freq, kh.hash); + // TODO: Refactoring the policy implementations. + // https://github.com/moka-rs/moka/issues/389 // Try to admit the candidate. - match Self::admit(&candidate, &self.cache, deqs, freq) { + let admission_result = match &self.eviction_policy { + EvictionPolicy::TinyLfu => { + let mut candidate = EntrySizeAndFrequency::new(new_weight); + candidate.add_frequency(freq, kh.hash); + Self::admit(&candidate, &self.cache, deqs, freq) + } + EvictionPolicy::Lru => AdmissionResult::Admitted { victim_keys: SmallVec::default() }, + }; + + match admission_result { AdmissionResult::Admitted { victim_keys } => { // Try to remove the victims from the hash map. for (vic_kh, vic_la) in victim_keys { @@ -2497,7 +2513,7 @@ fn is_expired_by_ttl( #[cfg(test)] mod tests { - use crate::policy::ExpirationPolicy; + use crate::policy::{EvictionPolicy, ExpirationPolicy}; use super::BaseCache; @@ -2516,8 +2532,9 @@ mod tests { None, RandomState::default(), None, + EvictionPolicy::default(), None, - Default::default(), + ExpirationPolicy::default(), false, ); cache.inner.enable_frequency_sketch_for_testing(); @@ -2861,6 +2878,7 @@ mod tests { None, RandomState::default(), None, + EvictionPolicy::default(), None, ExpirationPolicy::new( Some(Duration::from_secs(TTL)),