From 4bd31baa256bb8abb5b47bbebf2317e625d8aadb Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 14 Sep 2024 12:04:19 +0800 Subject: [PATCH 1/4] Switch from `triomphe::Arc` to `MiniArc`, our own `Arc` implementation `MiniArc` is like a `std::sync::Arc` but with a few differences: - No `Weak` references. - Uses `AtomicU32` instead of `AtomicUsize` for reference counting. - Much smaller code size by having only the necessary methods for us. --- Cargo.toml | 6 +--- src/common/concurrent.rs | 50 ++++++++++++++--------------- src/common/concurrent/deques.rs | 21 +++++++------ src/common/timer_wheel.rs | 26 +++++++-------- src/future/base_cache.rs | 56 ++++++++++++++++----------------- src/future/invalidator.rs | 6 ++-- src/future/key_lock.rs | 14 ++++----- src/future/value_initializer.rs | 14 ++++----- src/sync/value_initializer.rs | 10 +++--- src/sync_base/base_cache.rs | 56 ++++++++++++++++----------------- src/sync_base/invalidator.rs | 6 ++-- src/sync_base/key_lock.rs | 14 ++++----- 12 files changed, 138 insertions(+), 141 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b363bc2a..fc21cdf9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,17 +44,13 @@ unstable-debug-counters = ["future", "once_cell"] crossbeam-channel = "0.5.5" crossbeam-epoch = "0.9.9" crossbeam-utils = "0.8" +moka-arc = { git = "https://gitlab.com/moka-labs/moka-gh440-remove-triomphe/moka-arc.git" } parking_lot = "0.12" smallvec = "1.8" tagptr = "0.2" thiserror = "1.0" uuid = { version = "1.1", features = ["v4"] } -# Opt-out serde and stable_deref_trait features -# https://github.com/Manishearth/triomphe/pull/5 -# 0.1.12 requires Rust 1.76 -triomphe = { version = ">=0.1.3, <0.1.12", default-features = false } - # Optional dependencies (enabled by default) quanta = { version = "0.12.2", optional = true } diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 2f7e6d8e..95686327 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -3,7 +3,7 @@ use crate::common::{deque::DeqNode, time::Instant}; use parking_lot::Mutex; use std::{fmt, ptr::NonNull, sync::Arc}; use tagptr::TagNonNull; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; pub(crate) mod constants; pub(crate) mod deques; @@ -64,13 +64,13 @@ impl Clone for KeyHash { } pub(crate) struct KeyHashDate { - entry_info: TrioArc>, + entry_info: MiniArc>, } impl KeyHashDate { - pub(crate) fn new(entry_info: &TrioArc>) -> Self { + pub(crate) fn new(entry_info: &MiniArc>) -> Self { Self { - entry_info: TrioArc::clone(entry_info), + entry_info: MiniArc::clone(entry_info), } } @@ -101,11 +101,11 @@ impl KeyHashDate { pub(crate) struct KvEntry { pub(crate) key: Arc, - pub(crate) entry: TrioArc>, + pub(crate) entry: MiniArc>, } impl KvEntry { - pub(crate) fn new(key: Arc, entry: TrioArc>) -> Self { + pub(crate) fn new(key: Arc, entry: MiniArc>) -> Self { Self { key, entry } } } @@ -114,7 +114,7 @@ impl Clone for KvEntry { fn clone(&self) -> Self { Self { key: Arc::clone(&self.key), - entry: TrioArc::clone(&self.entry), + entry: MiniArc::clone(&self.entry), } } } @@ -177,33 +177,33 @@ impl DeqNodes { pub(crate) struct ValueEntry { pub(crate) value: V, - info: TrioArc>, - nodes: TrioArc>>, + info: MiniArc>, + nodes: MiniArc>>, } impl ValueEntry { - pub(crate) fn new(value: V, entry_info: TrioArc>) -> Self { + pub(crate) fn new(value: V, entry_info: MiniArc>) -> Self { #[cfg(feature = "unstable-debug-counters")] self::debug_counters::InternalGlobalDebugCounters::value_entry_created(); Self { value, info: entry_info, - nodes: TrioArc::new(Mutex::new(DeqNodes::default())), + nodes: MiniArc::new(Mutex::new(DeqNodes::default())), } } - pub(crate) fn new_from(value: V, entry_info: TrioArc>, other: &Self) -> Self { + pub(crate) fn new_from(value: V, entry_info: MiniArc>, other: &Self) -> Self { #[cfg(feature = "unstable-debug-counters")] self::debug_counters::InternalGlobalDebugCounters::value_entry_created(); Self { value, info: entry_info, - nodes: TrioArc::clone(&other.nodes), + nodes: MiniArc::clone(&other.nodes), } } - pub(crate) fn entry_info(&self) -> &TrioArc> { + pub(crate) fn entry_info(&self) -> &MiniArc> { &self.info } @@ -224,7 +224,7 @@ impl ValueEntry { self.info.policy_weight() } - pub(crate) fn deq_nodes(&self) -> &TrioArc>> { + pub(crate) fn deq_nodes(&self) -> &MiniArc>> { &self.nodes } @@ -278,7 +278,7 @@ impl Drop for ValueEntry { } } -impl AccessTime for TrioArc> { +impl AccessTime for MiniArc> { #[inline] fn last_accessed(&self) -> Option { self.info.last_accessed() @@ -302,7 +302,7 @@ impl AccessTime for TrioArc> { pub(crate) enum ReadOp { Hit { - value_entry: TrioArc>, + value_entry: MiniArc>, is_expiry_modified: bool, }, // u64 is the hash of the key. @@ -312,7 +312,7 @@ pub(crate) enum ReadOp { pub(crate) enum WriteOp { Upsert { key_hash: KeyHash, - value_entry: TrioArc>, + value_entry: MiniArc>, /// Entry generation after the operation. entry_gen: u16, old_weight: u32, @@ -324,7 +324,7 @@ pub(crate) enum WriteOp { }, } -/// Cloning a `WriteOp` is safe and cheap because it uses `Arc` and `TrioArc` pointers to +/// Cloning a `WriteOp` is safe and cheap because it uses `Arc` and `MiniArc` pointers to /// the actual data. impl Clone for WriteOp { fn clone(&self) -> Self { @@ -337,7 +337,7 @@ impl Clone for WriteOp { new_weight, } => Self::Upsert { key_hash: key_hash.clone(), - value_entry: TrioArc::clone(value_entry), + value_entry: MiniArc::clone(value_entry), entry_gen: *entry_gen, old_weight: *old_weight, new_weight: *new_weight, @@ -366,13 +366,13 @@ impl WriteOp { pub(crate) fn new_upsert( key: &Arc, hash: u64, - value_entry: &TrioArc>, + value_entry: &MiniArc>, entry_generation: u16, old_weight: u32, new_weight: u32, ) -> Self { let key_hash = KeyHash::new(Arc::clone(key), hash); - let value_entry = TrioArc::clone(value_entry); + let value_entry = MiniArc::clone(value_entry); Self::Upsert { key_hash, value_entry, @@ -384,15 +384,15 @@ impl WriteOp { } pub(crate) struct OldEntryInfo { - pub(crate) entry: TrioArc>, + pub(crate) entry: MiniArc>, pub(crate) last_accessed: Option, pub(crate) last_modified: Option, } impl OldEntryInfo { - pub(crate) fn new(entry: &TrioArc>) -> Self { + pub(crate) fn new(entry: &MiniArc>) -> Self { Self { - entry: TrioArc::clone(entry), + entry: MiniArc::clone(entry), last_accessed: entry.last_accessed(), last_modified: entry.last_modified(), } diff --git a/src/common/concurrent/deques.rs b/src/common/concurrent/deques.rs index 424fc104..37245b1e 100644 --- a/src/common/concurrent/deques.rs +++ b/src/common/concurrent/deques.rs @@ -6,7 +6,8 @@ use crate::common::{ use std::ptr::NonNull; use tagptr::TagNonNull; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; + pub(crate) struct Deques { pub(crate) window: Deque>, // Not used yet. pub(crate) probation: Deque>, @@ -50,7 +51,7 @@ impl Deques { &mut self, region: CacheRegion, khd: KeyHashDate, - entry: &TrioArc>, + entry: &MiniArc>, ) { let node = Box::new(DeqNode::new(khd)); let node = match region { @@ -66,14 +67,14 @@ impl Deques { pub(crate) fn push_back_wo( &mut self, kd: KeyHashDate, - entry: &TrioArc>, + entry: &MiniArc>, ) { let node = Box::new(DeqNode::new(kd)); let node = self.write_order.push_back(node); entry.set_write_order_q_node(Some(node)); } - pub(crate) fn move_to_back_ao(&mut self, entry: &TrioArc>) { + pub(crate) fn move_to_back_ao(&mut self, entry: &MiniArc>) { if let Some(tagged_node) = entry.access_order_q_node() { let (node, tag) = tagged_node.decompose(); let p = unsafe { node.as_ref() }; @@ -95,7 +96,7 @@ impl Deques { pub(crate) fn move_to_back_ao_in_deque( deq_name: &str, deq: &mut Deque>, - entry: &TrioArc>, + entry: &MiniArc>, ) { if let Some(tagged_node) = entry.access_order_q_node() { let (node, tag) = tagged_node.decompose(); @@ -111,7 +112,7 @@ impl Deques { } } - pub(crate) fn move_to_back_wo(&mut self, entry: &TrioArc>) { + pub(crate) fn move_to_back_wo(&mut self, entry: &MiniArc>) { if let Some(node) = entry.write_order_q_node() { let p = unsafe { node.as_ref() }; if self.write_order.contains(p) { @@ -122,7 +123,7 @@ impl Deques { pub(crate) fn move_to_back_wo_in_deque( deq: &mut Deque>, - entry: &TrioArc>, + entry: &MiniArc>, ) { if let Some(node) = entry.write_order_q_node() { let p = unsafe { node.as_ref() }; @@ -132,7 +133,7 @@ impl Deques { } } - pub(crate) fn unlink_ao(&mut self, entry: &TrioArc>) { + pub(crate) fn unlink_ao(&mut self, entry: &MiniArc>) { if let Some(node) = entry.take_access_order_q_node() { self.unlink_node_ao(node); } @@ -141,14 +142,14 @@ impl Deques { pub(crate) fn unlink_ao_from_deque( deq_name: &str, deq: &mut Deque>, - entry: &TrioArc>, + entry: &MiniArc>, ) { if let Some(node) = entry.take_access_order_q_node() { unsafe { Self::unlink_node_ao_from_deque(deq_name, deq, node) }; } } - pub(crate) fn unlink_wo(deq: &mut Deque>, entry: &TrioArc>) { + pub(crate) fn unlink_wo(deq: &mut Deque>, entry: &MiniArc>) { if let Some(node) = entry.take_write_order_q_node() { Self::unlink_node_wo(deq, node); } diff --git a/src/common/timer_wheel.rs b/src/common/timer_wheel.rs index aa78affa..73add563 100644 --- a/src/common/timer_wheel.rs +++ b/src/common/timer_wheel.rs @@ -20,7 +20,7 @@ use super::{ }; use parking_lot::Mutex; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; const BUCKET_COUNTS: &[u64] = &[ 64, // roughly seconds @@ -69,16 +69,16 @@ pub(crate) enum TimerNode { /// The position (level and index) of the timer wheel bucket. pos: Option<(u8, u8)>, /// An Arc pointer to the `EntryInfo` of the cache entry (`ValueEntry`). - entry_info: TrioArc>, + entry_info: MiniArc>, /// An Arc pointer to the `DeqNodes` of the cache entry (`ValueEntry`). - deq_nodes: TrioArc>>, + deq_nodes: MiniArc>>, }, } impl TimerNode { fn new( - entry_info: TrioArc>, - deq_nodes: TrioArc>>, + entry_info: MiniArc>, + deq_nodes: MiniArc>>, level: usize, index: usize, ) -> Self { @@ -118,7 +118,7 @@ impl TimerNode { matches!(self, Self::Sentinel) } - pub(crate) fn entry_info(&self) -> &TrioArc> { + pub(crate) fn entry_info(&self) -> &MiniArc> { if let Self::Entry { entry_info, .. } = &self { entry_info } else { @@ -209,8 +209,8 @@ impl TimerWheel { /// Schedules a timer event for the node. pub(crate) fn schedule( &mut self, - entry_info: TrioArc>, - deq_nodes: TrioArc>>, + entry_info: MiniArc>, + deq_nodes: MiniArc>>, ) -> Option>>> { debug_assert!(self.is_enabled()); @@ -397,7 +397,7 @@ pub(crate) enum TimerEvent { // from one wheel to another in a lower level of the hierarchy. (This variant // is mainly used for testing) #[cfg(test)] - Rescheduled(TrioArc>), + Rescheduled(MiniArc>), #[cfg(not(test))] Rescheduled(()), /// This timer node (containing a cache entry) has been removed from the timer. @@ -517,7 +517,7 @@ impl<'iter, K> Iterator for TimerEventsIter<'iter, K> { // Get the entry info before rescheduling (mutating) the node to // avoid Stacked Borrows/Tree Borrows violations on `node_p`. let entry_info = - TrioArc::clone(unsafe { node_p.as_ref() }.element.entry_info()); + MiniArc::clone(unsafe { node_p.as_ref() }.element.entry_info()); match self.timer_wheel.schedule_existing_node(node_p) { ReschedulingResult::Rescheduled => { @@ -564,7 +564,7 @@ mod tests { time::{CheckedTimeOps, Clock, Instant, Mock}, }; - use triomphe::Arc as TrioArc; + use moka_arc::MiniArc; #[test] fn test_bucket_indices() { @@ -654,10 +654,10 @@ mod tests { let hash = key as u64; let key_hash = KeyHash::new(Arc::new(key), hash); let policy_weight = 0; - let entry_info = TrioArc::new(EntryInfo::new(key_hash, now, policy_weight)); + let entry_info = MiniArc::new(EntryInfo::new(key_hash, now, policy_weight)); entry_info.set_expiration_time(Some(now.checked_add(ttl).unwrap())); let deq_nodes = Default::default(); - let timer_node = timer.schedule(entry_info, TrioArc::clone(&deq_nodes)); + let timer_node = timer.schedule(entry_info, MiniArc::clone(&deq_nodes)); deq_nodes.lock().set_timer_node(timer_node); } diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index 0c9d0a5f..f302b66a 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -51,7 +51,7 @@ use std::{ }, time::{Duration, Instant as StdInstant}, }; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; pub(crate) type HouseKeeperArc = Arc; @@ -127,7 +127,7 @@ impl BaseCache { pub(crate) fn notify_invalidate( &self, key: &Arc, - entry: &TrioArc>, + entry: &MiniArc>, ) -> BoxFuture<'static, ()> where K: Send + Sync + 'static, @@ -335,7 +335,7 @@ where let ent = Entry::new(maybe_key, entry.value.clone(), false, false); let maybe_op = if record_read { Some(ReadOp::Hit { - value_entry: TrioArc::clone(entry), + value_entry: MiniArc::clone(entry), is_expiry_modified, }) } else { @@ -758,11 +758,11 @@ impl BaseCache { value: V, timestamp: Instant, policy_weight: u32, - ) -> (TrioArc>, u16) { + ) -> (MiniArc>, u16) { let key_hash = KeyHash::new(Arc::clone(key), hash); - let info = TrioArc::new(EntryInfo::new(key_hash, timestamp, policy_weight)); + let info = MiniArc::new(EntryInfo::new(key_hash, timestamp, policy_weight)); let gen: u16 = info.entry_gen(); - (TrioArc::new(ValueEntry::new(value, info)), gen) + (MiniArc::new(ValueEntry::new(value, info)), gen) } #[inline] @@ -772,15 +772,15 @@ impl BaseCache { timestamp: Instant, policy_weight: u32, other: &ValueEntry, - ) -> (TrioArc>, u16) { - let info = TrioArc::clone(other.entry_info()); + ) -> (MiniArc>, u16) { + let info = MiniArc::clone(other.entry_info()); // To prevent this updated ValueEntry from being evicted by an expiration // policy, increment the entry generation. let gen = info.incr_entry_gen(); info.set_last_accessed(timestamp); info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); - (TrioArc::new(ValueEntry::new_from(value, info, other)), gen) + (MiniArc::new(ValueEntry::new_from(value, info, other)), gen) } fn expire_after_create( @@ -901,7 +901,7 @@ impl<'a, K, V> EvictionState<'a, K, V> { async fn notify_entry_removal( &mut self, key: Arc, - entry: &TrioArc>, + entry: &MiniArc>, cause: RemovalCause, ) where K: Send + Sync + 'static, @@ -991,7 +991,7 @@ enum AdmissionResult { Rejected, } -type CacheStore = crate::cht::SegmentedHashMap, TrioArc>, S>; +type CacheStore = crate::cht::SegmentedHashMap, MiniArc>, S>; struct Clocks { // Lock for this Clocks instance. Used when the `expiration_clock` is set. @@ -1285,7 +1285,7 @@ where where K: Borrow, Q: Hash + Eq + ?Sized, - F: FnOnce(&Arc, &TrioArc>) -> T, + F: FnOnce(&Arc, &MiniArc>) -> T, { self.cache .get_key_value_and(hash, |k| (k as &K).borrow() == key, with_entry) @@ -1296,7 +1296,7 @@ where where K: Borrow, Q: Hash + Eq + ?Sized, - F: FnOnce(&Arc, &TrioArc>) -> Option, + F: FnOnce(&Arc, &MiniArc>) -> Option, { self.cache .get_key_value_and_then(hash, |k| (k as &K).borrow() == key, with_entry) @@ -1337,7 +1337,7 @@ where /// Returns `true` if the entry is invalidated by `invalidate_entries_if` method. #[inline] - fn is_invalidated_entry(&self, key: &Arc, entry: &TrioArc>) -> bool + fn is_invalidated_entry(&self, key: &Arc, entry: &MiniArc>) -> bool where V: Clone, { @@ -1667,7 +1667,7 @@ where async fn handle_upsert( &self, kh: KeyHash, - entry: TrioArc>, + entry: MiniArc>, gen: u16, old_weight: u32, new_weight: u32, @@ -1717,7 +1717,7 @@ where kh.hash, |k| k == &kh.key, |_, current_entry| { - TrioArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) + MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) && current_entry.entry_info().entry_gen() == gen }, ); @@ -1822,7 +1822,7 @@ where kh.hash, |k| k == &key, |_, current_entry| { - TrioArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) + MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) && current_entry.entry_info().entry_gen() == gen }, ); @@ -1926,7 +1926,7 @@ where fn handle_admit( &self, - entry: &TrioArc>, + entry: &MiniArc>, policy_weight: u32, deqs: &mut Deques, timer_wheel: &mut TimerWheel, @@ -1951,7 +1951,7 @@ where /// NOTE: This method may enable the timer wheel. fn update_timer_wheel( &self, - entry: &TrioArc>, + entry: &MiniArc>, timer_wheel: &mut TimerWheel, ) { // Enable the timer wheel if needed. @@ -1971,8 +1971,8 @@ where // expiration time and not registered to the timer wheel. (true, None) => { let timer = timer_wheel.schedule( - TrioArc::clone(entry.entry_info()), - TrioArc::clone(entry.deq_nodes()), + MiniArc::clone(entry.entry_info()), + MiniArc::clone(entry.deq_nodes()), ); entry.set_timer_node(timer); } @@ -2000,7 +2000,7 @@ where fn handle_remove( deqs: &mut Deques, timer_wheel: &mut TimerWheel, - entry: TrioArc>, + entry: MiniArc>, gen: Option, counters: &mut EvictionCounters, ) { @@ -2012,7 +2012,7 @@ where fn handle_remove_without_timer_wheel( deqs: &mut Deques, - entry: TrioArc>, + entry: MiniArc>, gen: Option, counters: &mut EvictionCounters, ) { @@ -2035,7 +2035,7 @@ where ao_deq: &mut Deque>, wo_deq: &mut Deque>, timer_wheel: &mut TimerWheel, - entry: TrioArc>, + entry: MiniArc>, counters: &mut EvictionCounters, ) { if let Some(timer) = entry.take_timer_node() { @@ -2576,7 +2576,7 @@ where pub(crate) async fn notify_single_removal( &self, key: Arc, - entry: &TrioArc>, + entry: &MiniArc>, cause: RemovalCause, ) { if let Some(notifier) = &self.removal_notifier { @@ -2588,7 +2588,7 @@ where fn notify_upsert( &self, key: Arc, - entry: &TrioArc>, + entry: &MiniArc>, last_accessed: Option, last_modified: Option, ) -> BoxFuture<'static, ()> { @@ -2629,7 +2629,7 @@ where fn notify_invalidate( &self, key: &Arc, - entry: &TrioArc>, + entry: &MiniArc>, ) -> BoxFuture<'static, ()> { use futures_util::future::FutureExt; @@ -2715,7 +2715,7 @@ where /// Returns `true` if this entry is expired by its per-entry TTL. #[inline] -fn is_expired_by_per_entry_ttl(entry_info: &TrioArc>, now: Instant) -> bool { +fn is_expired_by_per_entry_ttl(entry_info: &MiniArc>, now: Instant) -> bool { if let Some(ts) = entry_info.expiration_time() { ts <= now } else { diff --git a/src/future/invalidator.rs b/src/future/invalidator.rs index 4c159f6c..27e8f1e2 100644 --- a/src/future/invalidator.rs +++ b/src/future/invalidator.rs @@ -16,7 +16,7 @@ use std::{ Arc, }, }; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; use uuid::Uuid; pub(crate) type PredicateFun = Arc bool + Send + Sync + 'static>; @@ -141,7 +141,7 @@ impl Invalidator { // This method will be called by the get method of Cache. #[inline] - pub(crate) fn apply_predicates(&self, key: &Arc, entry: &TrioArc>) -> bool + pub(crate) fn apply_predicates(&self, key: &Arc, entry: &MiniArc>) -> bool where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -296,7 +296,7 @@ where key: &Arc, hash: u64, ts: Instant, - ) -> Option>> + ) -> Option>> where K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, diff --git a/src/future/key_lock.rs b/src/future/key_lock.rs index fc06ff2f..6449aa10 100644 --- a/src/future/key_lock.rs +++ b/src/future/key_lock.rs @@ -6,11 +6,11 @@ use std::{ use crate::cht::SegmentedHashMap; use async_lock::{Mutex, MutexGuard}; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; const LOCK_MAP_NUM_SEGMENTS: usize = 64; -type LockMap = SegmentedHashMap, TrioArc>, S>; +type LockMap = SegmentedHashMap, MiniArc>, S>; // We need the `where` clause here because of the Drop impl. pub(crate) struct KeyLock<'a, K, S> @@ -21,7 +21,7 @@ where map: &'a LockMap, key: Arc, hash: u64, - lock: TrioArc>, + lock: MiniArc>, } impl<'a, K, S> Drop for KeyLock<'a, K, S> @@ -30,11 +30,11 @@ where S: BuildHasher, { fn drop(&mut self) { - if TrioArc::count(&self.lock) <= 2 { + if MiniArc::count(&self.lock) <= 2 { self.map.remove_if( self.hash, |k| k == &self.key, - |_k, v| TrioArc::count(v) <= 2, + |_k, v| MiniArc::count(v) <= 2, ); } } @@ -45,7 +45,7 @@ where K: Eq + Hash, S: BuildHasher, { - fn new(map: &'a LockMap, key: &Arc, hash: u64, lock: TrioArc>) -> Self { + fn new(map: &'a LockMap, key: &Arc, hash: u64, lock: MiniArc>) -> Self { Self { map, key: Arc::clone(key), @@ -76,7 +76,7 @@ where pub(crate) fn key_lock(&self, key: &Arc) -> KeyLock<'_, K, S> { let hash = self.locks.hash(key); - let kl = TrioArc::new(Mutex::new(())); + let kl = MiniArc::new(Mutex::new(())); match self .locks .insert_if_not_present(Arc::clone(key), hash, kl.clone()) diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 63b7a341..53d40929 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -8,7 +8,7 @@ use std::{ pin::Pin, sync::Arc, }; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; use crate::{ ops::compute::{CompResult, Op}, @@ -49,7 +49,7 @@ impl fmt::Debug for WaiterValue { } } -type Waiter = TrioArc>>; +type Waiter = MiniArc>>; type WaiterMap = crate::cht::SegmentedHashMap<(Arc, TypeId), Waiter, S>; struct WaiterGuard<'a, K, V, S> @@ -116,7 +116,7 @@ pub(crate) struct ValueInitializer { // try_get_with method. We use the type ID as a part of the key to ensure that we // can always downcast the trait object ErrorObject (in Waiter) into its // concrete type. - waiters: TrioArc>, + waiters: MiniArc>, } impl ValueInitializer @@ -127,7 +127,7 @@ where { pub(crate) fn with_hasher(hasher: S) -> Self { Self { - waiters: TrioArc::new(crate::cht::SegmentedHashMap::with_num_segments_and_hasher( + waiters: MiniArc::new(crate::cht::SegmentedHashMap::with_num_segments_and_hasher( WAITER_MAP_NUM_SEGMENTS, hasher, )), @@ -172,7 +172,7 @@ where let (w_key, w_hash) = waiter_key_hash(&self.waiters, c_key, type_id); - let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + let waiter = MiniArc::new(RwLock::new(WaiterValue::Computing)); // NOTE: We have to acquire a write lock before `try_insert_waiter`, // so that any concurrent attempt will get our lock and wait on it. let lock = waiter.write().await; @@ -281,7 +281,7 @@ where let type_id = TypeId::of::(); let (w_key, w_hash) = waiter_key_hash(&self.waiters, &c_key, type_id); - let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + let waiter = MiniArc::new(RwLock::new(WaiterValue::Computing)); // NOTE: We have to acquire a write lock before `try_insert_waiter`, // so that any concurrent attempt will get our lock and wait on it. let lock = waiter.write().await; @@ -482,7 +482,7 @@ where (Arc, TypeId): Eq + Hash, S: BuildHasher, { - let waiter = TrioArc::clone(waiter); + let waiter = MiniArc::clone(waiter); waiter_map.insert_if_not_present(w_key, w_hash, waiter) } diff --git a/src/sync/value_initializer.rs b/src/sync/value_initializer.rs index f66aa9ef..90528f46 100644 --- a/src/sync/value_initializer.rs +++ b/src/sync/value_initializer.rs @@ -5,7 +5,7 @@ use std::{ hash::{BuildHasher, Hash}, sync::Arc, }; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; use crate::{ ops::compute::{CompResult, Op}, @@ -38,7 +38,7 @@ impl fmt::Debug for WaiterValue { } } -type Waiter = TrioArc>>; +type Waiter = MiniArc>>; pub(crate) enum InitResult { Initialized(V), @@ -96,7 +96,7 @@ where let (w_key, w_hash) = self.waiter_key_hash(key, type_id); - let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + let waiter = MiniArc::new(RwLock::new(WaiterValue::Computing)); let mut lock = waiter.write(); loop { @@ -194,7 +194,7 @@ where let type_id = TypeId::of::(); let (w_key, w_hash) = self.waiter_key_hash(&c_key, type_id); - let waiter = TrioArc::new(RwLock::new(WaiterValue::Computing)); + let waiter = MiniArc::new(RwLock::new(WaiterValue::Computing)); // NOTE: We have to acquire a write lock before `try_insert_waiter`, // so that any concurrent attempt will get our lock and wait on it. let mut lock = waiter.write(); @@ -370,7 +370,7 @@ where w_hash: u64, waiter: &Waiter, ) -> Option> { - let waiter = TrioArc::clone(waiter); + let waiter = MiniArc::clone(waiter); self.waiters.insert_if_not_present(w_key, w_hash, waiter) } diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index fe951d29..d262cd59 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -45,7 +45,7 @@ use std::{ }, time::{Duration, Instant as StdInstant}, }; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; pub(crate) type HouseKeeperArc = Arc; @@ -109,7 +109,7 @@ impl BaseCache { self.inner.current_time_from_expiration_clock() } - pub(crate) fn notify_invalidate(&self, key: &Arc, entry: &TrioArc>) + pub(crate) fn notify_invalidate(&self, key: &Arc, entry: &MiniArc>) where K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -306,7 +306,7 @@ where } else { // Valid entry. let maybe_key = if need_key { Some(Arc::clone(k)) } else { None }; - Some((maybe_key, TrioArc::clone(entry))) + Some((maybe_key, MiniArc::clone(entry))) } }); @@ -630,11 +630,11 @@ impl BaseCache { value: V, timestamp: Instant, policy_weight: u32, - ) -> (TrioArc>, u16) { + ) -> (MiniArc>, u16) { let key_hash = KeyHash::new(Arc::clone(key), hash); - let info = TrioArc::new(EntryInfo::new(key_hash, timestamp, policy_weight)); + let info = MiniArc::new(EntryInfo::new(key_hash, timestamp, policy_weight)); let gen: u16 = info.entry_gen(); - (TrioArc::new(ValueEntry::new(value, info)), gen) + (MiniArc::new(ValueEntry::new(value, info)), gen) } #[inline] @@ -644,15 +644,15 @@ impl BaseCache { timestamp: Instant, policy_weight: u32, other: &ValueEntry, - ) -> (TrioArc>, u16) { - let info = TrioArc::clone(other.entry_info()); + ) -> (MiniArc>, u16) { + let info = MiniArc::clone(other.entry_info()); // To prevent this updated ValueEntry from being evicted by an expiration // policy, increment the entry generation. let gen = info.incr_entry_gen(); info.set_last_accessed(timestamp); info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); - (TrioArc::new(ValueEntry::new_from(value, info, other)), gen) + (MiniArc::new(ValueEntry::new_from(value, info, other)), gen) } fn expire_after_create( @@ -773,7 +773,7 @@ impl<'a, K, V> EvictionState<'a, K, V> { fn notify_entry_removal( &mut self, key: Arc, - entry: &TrioArc>, + entry: &MiniArc>, cause: RemovalCause, ) where K: Send + Sync + 'static, @@ -863,7 +863,7 @@ enum AdmissionResult { Rejected, } -type CacheStore = crate::cht::SegmentedHashMap, TrioArc>, S>; +type CacheStore = crate::cht::SegmentedHashMap, MiniArc>, S>; struct Clocks { has_expiration_clock: AtomicBool, @@ -1140,7 +1140,7 @@ where where K: Borrow, Q: Hash + Eq + ?Sized, - F: FnOnce(&Arc, &TrioArc>) -> T, + F: FnOnce(&Arc, &MiniArc>) -> T, { self.cache .get_key_value_and(hash, |k| (k as &K).borrow() == key, with_entry) @@ -1151,7 +1151,7 @@ where where K: Borrow, Q: Hash + Eq + ?Sized, - F: FnOnce(&Arc, &TrioArc>) -> Option, + F: FnOnce(&Arc, &MiniArc>) -> Option, { self.cache .get_key_value_and_then(hash, |k| (k as &K).borrow() == key, with_entry) @@ -1192,7 +1192,7 @@ where /// Returns `true` if the entry is invalidated by `invalidate_entries_if` method. #[inline] - fn is_invalidated_entry(&self, key: &Arc, entry: &TrioArc>) -> bool + fn is_invalidated_entry(&self, key: &Arc, entry: &MiniArc>) -> bool where V: Clone, { @@ -1515,7 +1515,7 @@ where fn handle_upsert( &self, kh: KeyHash, - entry: TrioArc>, + entry: MiniArc>, gen: u16, old_weight: u32, new_weight: u32, @@ -1561,7 +1561,7 @@ where kh.hash, |k| k == &kh.key, |_, current_entry| { - TrioArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) + MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) && current_entry.entry_info().entry_gen() == gen }, ); @@ -1658,7 +1658,7 @@ where kh.hash, |k| k == &key, |_, current_entry| { - TrioArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) + MiniArc::ptr_eq(entry.entry_info(), current_entry.entry_info()) && current_entry.entry_info().entry_gen() == gen }, ); @@ -1760,7 +1760,7 @@ where fn handle_admit( &self, - entry: &TrioArc>, + entry: &MiniArc>, policy_weight: u32, deqs: &mut Deques, timer_wheel: &mut TimerWheel, @@ -1785,7 +1785,7 @@ where /// NOTE: This method may enable the timer wheel. fn update_timer_wheel( &self, - entry: &TrioArc>, + entry: &MiniArc>, timer_wheel: &mut TimerWheel, ) { // Enable the timer wheel if needed. @@ -1805,8 +1805,8 @@ where // expiration time and not registered to the timer wheel. (true, None) => { let timer = timer_wheel.schedule( - TrioArc::clone(entry.entry_info()), - TrioArc::clone(entry.deq_nodes()), + MiniArc::clone(entry.entry_info()), + MiniArc::clone(entry.deq_nodes()), ); entry.set_timer_node(timer); } @@ -1834,7 +1834,7 @@ where fn handle_remove( deqs: &mut Deques, timer_wheel: &mut TimerWheel, - entry: TrioArc>, + entry: MiniArc>, gen: Option, counters: &mut EvictionCounters, ) { @@ -1846,7 +1846,7 @@ where fn handle_remove_without_timer_wheel( deqs: &mut Deques, - entry: TrioArc>, + entry: MiniArc>, gen: Option, counters: &mut EvictionCounters, ) { @@ -1869,7 +1869,7 @@ where ao_deq: &mut Deque>, wo_deq: &mut Deque>, timer_wheel: &mut TimerWheel, - entry: TrioArc>, + entry: MiniArc>, counters: &mut EvictionCounters, ) { if let Some(timer) = entry.take_timer_node() { @@ -2368,7 +2368,7 @@ where pub(crate) fn notify_single_removal( &self, key: Arc, - entry: &TrioArc>, + entry: &MiniArc>, cause: RemovalCause, ) { if let Some(notifier) = &self.removal_notifier { @@ -2380,7 +2380,7 @@ where fn notify_upsert( &self, key: Arc, - entry: &TrioArc>, + entry: &MiniArc>, last_accessed: Option, last_modified: Option, ) { @@ -2407,7 +2407,7 @@ where } #[inline] - fn notify_invalidate(&self, key: &Arc, entry: &TrioArc>) { + fn notify_invalidate(&self, key: &Arc, entry: &MiniArc>) { let now = self.current_time_from_expiration_clock(); let exp = &self.expiration_policy; @@ -2480,7 +2480,7 @@ where /// Returns `true` if this entry is expired by its per-entry TTL. #[inline] -fn is_expired_by_per_entry_ttl(entry_info: &TrioArc>, now: Instant) -> bool { +fn is_expired_by_per_entry_ttl(entry_info: &MiniArc>, now: Instant) -> bool { if let Some(ts) = entry_info.expiration_time() { ts <= now } else { diff --git a/src/sync_base/invalidator.rs b/src/sync_base/invalidator.rs index dfb62c42..f281f709 100644 --- a/src/sync_base/invalidator.rs +++ b/src/sync_base/invalidator.rs @@ -16,7 +16,7 @@ use std::{ Arc, }, }; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; use uuid::Uuid; pub(crate) type PredicateFun = Arc bool + Send + Sync + 'static>; @@ -141,7 +141,7 @@ impl Invalidator { // This method will be called by the get method of Cache. #[inline] - pub(crate) fn apply_predicates(&self, key: &Arc, entry: &TrioArc>) -> bool + pub(crate) fn apply_predicates(&self, key: &Arc, entry: &MiniArc>) -> bool where K: Hash + Eq + Send + Sync + 'static, V: Clone + Send + Sync + 'static, @@ -296,7 +296,7 @@ where key: &Arc, hash: u64, ts: Instant, - ) -> Option>> + ) -> Option>> where K: Send + Sync + 'static, V: Clone + Send + Sync + 'static, diff --git a/src/sync_base/key_lock.rs b/src/sync_base/key_lock.rs index abc96873..fc5ad402 100644 --- a/src/sync_base/key_lock.rs +++ b/src/sync_base/key_lock.rs @@ -6,11 +6,11 @@ use std::{ use crate::cht::SegmentedHashMap; use parking_lot::{Mutex, MutexGuard}; -use triomphe::Arc as TrioArc; +use moka_arc::MiniArc; const LOCK_MAP_NUM_SEGMENTS: usize = 64; -type LockMap = SegmentedHashMap, TrioArc>, S>; +type LockMap = SegmentedHashMap, MiniArc>, S>; // We need the `where` clause here because of the Drop impl. pub(crate) struct KeyLock<'a, K, S> @@ -21,7 +21,7 @@ where map: &'a LockMap, key: Arc, hash: u64, - lock: TrioArc>, + lock: MiniArc>, } impl<'a, K, S> Drop for KeyLock<'a, K, S> @@ -30,11 +30,11 @@ where S: BuildHasher, { fn drop(&mut self) { - if TrioArc::count(&self.lock) <= 2 { + if MiniArc::count(&self.lock) <= 2 { self.map.remove_if( self.hash, |k| k == &self.key, - |_k, v| TrioArc::count(v) <= 2, + |_k, v| MiniArc::count(v) <= 2, ); } } @@ -45,7 +45,7 @@ where K: Eq + Hash, S: BuildHasher, { - fn new(map: &'a LockMap, key: &Arc, hash: u64, lock: TrioArc>) -> Self { + fn new(map: &'a LockMap, key: &Arc, hash: u64, lock: MiniArc>) -> Self { Self { map, key: Arc::clone(key), @@ -76,7 +76,7 @@ where pub(crate) fn key_lock(&self, key: &Arc) -> KeyLock<'_, K, S> { let hash = self.locks.hash(key); - let kl = TrioArc::new(Mutex::new(())); + let kl = MiniArc::new(Mutex::new(())); match self .locks .insert_if_not_present(Arc::clone(key), hash, kl.clone()) From 2e463d3d8f7d0dbee4abb96b322f56110974538a Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 14 Sep 2024 12:15:30 +0800 Subject: [PATCH 2/4] Apply cargo fmt --- src/common/concurrent.rs | 2 +- src/common/concurrent/deques.rs | 2 +- src/common/timer_wheel.rs | 2 +- src/future/base_cache.rs | 2 +- src/future/invalidator.rs | 2 +- src/future/value_initializer.rs | 2 +- src/sync/value_initializer.rs | 2 +- src/sync_base/base_cache.rs | 2 +- src/sync_base/invalidator.rs | 2 +- src/sync_base/key_lock.rs | 2 +- 10 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 95686327..366edf50 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -1,9 +1,9 @@ use crate::common::{deque::DeqNode, time::Instant}; +use moka_arc::MiniArc; use parking_lot::Mutex; use std::{fmt, ptr::NonNull, sync::Arc}; use tagptr::TagNonNull; -use moka_arc::MiniArc; pub(crate) mod constants; pub(crate) mod deques; diff --git a/src/common/concurrent/deques.rs b/src/common/concurrent/deques.rs index 37245b1e..8a14c7e4 100644 --- a/src/common/concurrent/deques.rs +++ b/src/common/concurrent/deques.rs @@ -4,9 +4,9 @@ use crate::common::{ CacheRegion, }; +use moka_arc::MiniArc; use std::ptr::NonNull; use tagptr::TagNonNull; -use moka_arc::MiniArc; pub(crate) struct Deques { pub(crate) window: Deque>, // Not used yet. diff --git a/src/common/timer_wheel.rs b/src/common/timer_wheel.rs index 73add563..9598bbbf 100644 --- a/src/common/timer_wheel.rs +++ b/src/common/timer_wheel.rs @@ -19,8 +19,8 @@ use super::{ time::{CheckedTimeOps, Instant}, }; -use parking_lot::Mutex; use moka_arc::MiniArc; +use parking_lot::Mutex; const BUCKET_COUNTS: &[u64] = &[ 64, // roughly seconds diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index f302b66a..d6cc3ccc 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -39,6 +39,7 @@ use async_lock::{Mutex, MutexGuard, RwLock}; use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; use futures_util::future::BoxFuture; +use moka_arc::MiniArc; use parking_lot::RwLock as SyncRwLock; use smallvec::SmallVec; use std::{ @@ -51,7 +52,6 @@ use std::{ }, time::{Duration, Instant as StdInstant}, }; -use moka_arc::MiniArc; pub(crate) type HouseKeeperArc = Arc; diff --git a/src/future/invalidator.rs b/src/future/invalidator.rs index 27e8f1e2..7bc4eb68 100644 --- a/src/future/invalidator.rs +++ b/src/future/invalidator.rs @@ -9,6 +9,7 @@ use crate::{ }; use async_lock::{Mutex, MutexGuard}; +use moka_arc::MiniArc; use std::{ hash::{BuildHasher, Hash}, sync::{ @@ -16,7 +17,6 @@ use std::{ Arc, }, }; -use moka_arc::MiniArc; use uuid::Uuid; pub(crate) type PredicateFun = Arc bool + Send + Sync + 'static>; diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index 53d40929..c56eeacc 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -1,5 +1,6 @@ use async_lock::{RwLock, RwLockWriteGuard}; use futures_util::FutureExt; +use moka_arc::MiniArc; use std::{ any::{Any, TypeId}, fmt, @@ -8,7 +9,6 @@ use std::{ pin::Pin, sync::Arc, }; -use moka_arc::MiniArc; use crate::{ ops::compute::{CompResult, Op}, diff --git a/src/sync/value_initializer.rs b/src/sync/value_initializer.rs index 90528f46..66034972 100644 --- a/src/sync/value_initializer.rs +++ b/src/sync/value_initializer.rs @@ -1,3 +1,4 @@ +use moka_arc::MiniArc; use parking_lot::RwLock; use std::{ any::{Any, TypeId}, @@ -5,7 +6,6 @@ use std::{ hash::{BuildHasher, Hash}, sync::Arc, }; -use moka_arc::MiniArc; use crate::{ ops::compute::{CompResult, Op}, diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index d262cd59..1d3a6281 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -32,6 +32,7 @@ use crate::{ use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; +use moka_arc::MiniArc; use parking_lot::{Mutex, RwLock}; use smallvec::SmallVec; use std::{ @@ -45,7 +46,6 @@ use std::{ }, time::{Duration, Instant as StdInstant}, }; -use moka_arc::MiniArc; pub(crate) type HouseKeeperArc = Arc; diff --git a/src/sync_base/invalidator.rs b/src/sync_base/invalidator.rs index f281f709..f51435a6 100644 --- a/src/sync_base/invalidator.rs +++ b/src/sync_base/invalidator.rs @@ -8,6 +8,7 @@ use crate::{ PredicateError, }; +use moka_arc::MiniArc; use parking_lot::{Mutex, MutexGuard}; use std::{ hash::{BuildHasher, Hash}, @@ -16,7 +17,6 @@ use std::{ Arc, }, }; -use moka_arc::MiniArc; use uuid::Uuid; pub(crate) type PredicateFun = Arc bool + Send + Sync + 'static>; diff --git a/src/sync_base/key_lock.rs b/src/sync_base/key_lock.rs index fc5ad402..c58c6eed 100644 --- a/src/sync_base/key_lock.rs +++ b/src/sync_base/key_lock.rs @@ -5,8 +5,8 @@ use std::{ use crate::cht::SegmentedHashMap; -use parking_lot::{Mutex, MutexGuard}; use moka_arc::MiniArc; +use parking_lot::{Mutex, MutexGuard}; const LOCK_MAP_NUM_SEGMENTS: usize = 64; From 0ef051ce222f4b9c5cca1689bbe4ec6fef057112 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Mon, 16 Sep 2024 09:05:24 +0800 Subject: [PATCH 3/4] Bump the version to v0.12.9 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index fc21cdf9..336c54f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "moka" -version = "0.12.8" +version = "0.12.9" edition = "2021" # Rust 1.65 was released on Nov 3, 2022. rust-version = "1.65" From 14651ee287ee7b22a4ccba33eb70c5875e62b3b3 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Mon, 16 Sep 2024 10:12:29 +0800 Subject: [PATCH 4/4] Bring `MiniArc`, our own `Arc` implementation, from a separate crate into `moka` Also enable Miri and Loom tests on the CI (GitHub Actions). --- .github/workflows/Loom.yml | 50 +++++ .github/workflows/Miri.yml | 8 + .vscode/settings.json | 3 + Cargo.toml | 7 +- src/common/concurrent.rs | 4 +- src/common/concurrent/arc.rs | 330 ++++++++++++++++++++++++++++++++ src/common/concurrent/deques.rs | 3 +- src/common/timer_wheel.rs | 7 +- src/future/base_cache.rs | 2 +- src/future/invalidator.rs | 3 +- src/future/key_lock.rs | 3 +- src/future/value_initializer.rs | 2 +- src/sync/value_initializer.rs | 2 +- src/sync_base/base_cache.rs | 2 +- src/sync_base/invalidator.rs | 3 +- src/sync_base/key_lock.rs | 3 +- 16 files changed, 410 insertions(+), 22 deletions(-) create mode 100644 .github/workflows/Loom.yml create mode 100644 src/common/concurrent/arc.rs diff --git a/.github/workflows/Loom.yml b/.github/workflows/Loom.yml new file mode 100644 index 00000000..ea3152f5 --- /dev/null +++ b/.github/workflows/Loom.yml @@ -0,0 +1,50 @@ +name: CI + +on: + push: + paths-ignore: + - '.devcontainer/**' + - '.gitpod.yml' + - '.vscode/**' + - 'tests/**' + pull_request: + paths-ignore: + - '.devcontainer/**' + - '.gitpod.yml' + - '.vscode/**' + - 'tests/**' + schedule: + # Run against the last commit on the default branch on Friday at 8pm (UTC?) + - cron: '0 20 * * 5' + +jobs: + pre_job: + runs-on: ubuntu-latest + outputs: + should_skip: ${{ steps.skip_check.outputs.should_skip }} + steps: + - id: skip_check + # https://github.com/marketplace/actions/skip-duplicate-actions + uses: fkirc/skip-duplicate-actions@v5 + with: + concurrent_skipping: 'same_content' + do_not_skip: '["pull_request", "workflow_dispatch", "schedule"]' + + test: + needs: pre_job + if: needs.pre_job.outputs.should_skip != 'true' + runs-on: ubuntu-latest + + steps: + - name: Checkout Moka + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: stable + + - name: Run tests in concurrent::arc module + run: cargo test --release --lib --features 'future, sync' common::concurrent::arc::loom_tests + env: + RUSTFLAGS: '--cfg moka_loom' diff --git a/.github/workflows/Miri.yml b/.github/workflows/Miri.yml index 3ca29c3c..4faa1fe0 100644 --- a/.github/workflows/Miri.yml +++ b/.github/workflows/Miri.yml @@ -47,6 +47,11 @@ jobs: - run: cargo miri setup + - name: Run Miri test on arc module using tree borrows + run: cargo miri test arc --features 'sync, future' + env: + MIRIFLAGS: '-Zmiri-tree-borrows' + - name: Run Miri test on deque module using tree borrows run: cargo miri test deque --features 'sync, future' env: @@ -57,6 +62,9 @@ jobs: env: MIRIFLAGS: '-Zmiri-tree-borrows' + - name: Run Miri test on arc module using stacked borrows + run: cargo miri test arc --features 'sync, future' + - name: Run Miri test on deque module using stacked borrows run: cargo miri test deque --features 'sync, future' diff --git a/.vscode/settings.json b/.vscode/settings.json index c9ae44b0..5093b96c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -32,10 +32,13 @@ "ENHANCEME", "Eytan", "getrandom", + "Goregaokar", "hashbrown", "Hasher", "kani", "Kawano", + "Manish", + "Manishearth", "mapref", "Miri", "Moka", diff --git a/Cargo.toml b/Cargo.toml index 336c54f9..81107fa3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,6 @@ unstable-debug-counters = ["future", "once_cell"] crossbeam-channel = "0.5.5" crossbeam-epoch = "0.9.9" crossbeam-utils = "0.8" -moka-arc = { git = "https://gitlab.com/moka-labs/moka-gh440-remove-triomphe/moka-arc.git" } parking_lot = "0.12" smallvec = "1.8" tagptr = "0.2" @@ -77,6 +76,11 @@ paste = "1.0.9" reqwest = { version = "0.11.11", default-features = false, features = ["rustls-tls"] } tokio = { version = "1.19", features = ["fs", "io-util", "macros", "rt-multi-thread", "sync", "time" ] } +# We cannot use `cfg(loom)` here because an indirect dependency `concurrent-queue` +# uses it. +[target.'cfg(moka_loom)'.dependencies] +loom = "0.7" + [target.'cfg(trybuild)'.dev-dependencies] trybuild = "1.0" @@ -88,6 +92,7 @@ unexpected_cfgs = { level = "warn", check-cfg = [ "cfg(armv5te)", "cfg(beta_clippy)", "cfg(kani)", + "cfg(moka_loom)", "cfg(mips)", "cfg(rustver)", "cfg(skip_large_mem_tests)", diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 366edf50..f8ba44c8 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -1,10 +1,10 @@ -use crate::common::{deque::DeqNode, time::Instant}; +use crate::common::{concurrent::arc::MiniArc, deque::DeqNode, time::Instant}; -use moka_arc::MiniArc; use parking_lot::Mutex; use std::{fmt, ptr::NonNull, sync::Arc}; use tagptr::TagNonNull; +pub(crate) mod arc; pub(crate) mod constants; pub(crate) mod deques; pub(crate) mod entry_info; diff --git a/src/common/concurrent/arc.rs b/src/common/concurrent/arc.rs new file mode 100644 index 00000000..d0d8e7d9 --- /dev/null +++ b/src/common/concurrent/arc.rs @@ -0,0 +1,330 @@ +// This module's source code was written by us, the `moka` developers, referring to +// the following book and code: +// +// - Chapter 6. Building Our Own "Arc" of the Rust Atomics and Locks book. +// - Rust Atomics and Locks by Mara Bos (O’Reilly). Copyright 2023 Mara Bos, +// ISBN: 978-1-098-11944-7 +// - https://marabos.nl/atomics/ +// - The `triomphe` crate v0.1.13 and v0.1.11 by Manish Goregaokar (Manishearth) +// - MIT or Apache-2.0 License +// - https://github.com/Manishearth/triomphe +// - `std::sync::Arc` in the Rust Standard Library (1.81.0). +// - MIT or Apache-2.0 License + +use std::{ + fmt, + hash::{Hash, Hasher}, + ops::Deref, + ptr::NonNull, +}; + +#[cfg(not(moka_loom))] +use std::sync::atomic::{self, AtomicU32}; + +#[cfg(moka_loom)] +use loom::sync::atomic::{self, AtomicU32}; + +/// A thread-safe reference-counting pointer. `MiniArc` is similar to +/// `std::sync::Arc`, Atomically Reference Counted shared pointer, but with a few +/// differences: +/// +/// - Smaller memory overhead: +/// - `MiniArc` does not support weak references, so it does not need to store a +/// weak reference count. +/// - `MiniArc` uses `AtomicU32` for the reference count, while `std::sync::Arc` +/// uses `AtomicUsize`. On a 64-bit system, `AtomicU32` is half the size of +/// `AtomicUsize`. +/// - Note: Depending on the value type `T`, the Rust compiler may add +/// padding to the internal struct of `MiniArc`, so the actual memory +/// overhead may vary. +/// - Smaller code size: +/// - Only about 100 lines of code. +/// - This is because `MiniArc` provides only the methods needed for the +/// `moka` and `mini-moka` crates. +/// - Smaller code size means less chance of bugs. +pub(crate) struct MiniArc { + ptr: NonNull>, +} + +struct ArcData { + ref_count: AtomicU32, + data: T, +} + +/// A soft limit on the amount of references that may be made to an `MiniArc`. +/// +/// Going above this limit will abort your program (although not necessarily) +/// at _exactly_ `MAX_REFCOUNT + 1` references. +const MAX_REFCOUNT: u32 = (i32::MAX) as u32; + +unsafe impl Send for MiniArc {} +unsafe impl Sync for MiniArc {} + +impl MiniArc { + pub(crate) fn new(data: T) -> MiniArc { + MiniArc { + ptr: NonNull::from(Box::leak(Box::new(ArcData { + ref_count: AtomicU32::new(1), + data, + }))), + } + } +} + +impl MiniArc { + /// Gets the number of [`MiniArc`] pointers to this allocation + pub(crate) fn count(this: &Self) -> u32 { + use atomic::Ordering::Acquire; + + this.data().ref_count.load(Acquire) + } + + /// Returns `true` if the two `MiniArc`s point to the same allocation in a + /// vein similar to [`ptr::eq`]. + /// + /// # Safety + /// + /// This function is unreliable when `T` is a `dyn Trait`. Currently + /// coercing `MiniArc` to `MiniArc` is not possible, so + /// this is not a problem in practice. However, if this coercion becomes + /// possible in the future, this function may return incorrect results when + /// comparing `MiniArc` instances. + /// + /// To fix this, we must rise the minimum supported Rust version (MSRV) to + /// 1.76 and use `std::ptr::addr_eq` internally instead of `eq` (`==`). + /// `addr_eq` compares the _addresses_ of the pointers for equality, + /// ignoring any metadata in fat pointers. + /// + /// See the following `triomphe` issue for more information: + /// https://github.com/Manishearth/triomphe/pull/84 + /// + /// Note that `triomphe` has a feature called `unsize`, which enables the + /// coercion by using the `unsize` crate. `MiniArc` does not have such a + /// feature, so we are safe for now. + #[inline] + #[allow(ambiguous_wide_pointer_comparisons)] // Remove when MSRV is 1.76 or newer. + pub(crate) fn ptr_eq(this: &Self, other: &Self) -> bool { + // `addr_eq` requires Rust 1.76 or newer. + // ptr::addr_eq(this.ptr.as_ptr(), other.ptr.as_ptr()) + this.ptr.as_ptr() == other.ptr.as_ptr() + } + + #[inline] + fn data(&self) -> &ArcData { + unsafe { self.ptr.as_ref() } + } +} + +impl Deref for MiniArc { + type Target = T; + + fn deref(&self) -> &T { + &self.data().data + } +} + +impl Clone for MiniArc { + fn clone(&self) -> Self { + use atomic::Ordering::Relaxed; + + if self.data().ref_count.fetch_add(1, Relaxed) > MAX_REFCOUNT { + std::process::abort(); + } + + MiniArc { ptr: self.ptr } + } +} + +impl Drop for MiniArc { + fn drop(&mut self) { + use std::sync::atomic::Ordering::{Acquire, Release}; + + if self.data().ref_count.fetch_sub(1, Release) == 1 { + atomic::fence(Acquire); + unsafe { + drop(Box::from_raw(self.ptr.as_ptr())); + } + } + } +} + +impl Default for MiniArc { + /// Creates a new `MiniArc`, with the `Default` value for `T`. + fn default() -> MiniArc { + MiniArc::new(Default::default()) + } +} + +impl PartialEq for MiniArc { + fn eq(&self, other: &MiniArc) -> bool { + // TODO: pointer equality is incorrect if `T` is not `Eq`. + // See: https://github.com/Manishearth/triomphe/pull/88 + Self::ptr_eq(self, other) || *(*self) == *(*other) + } + + #[allow(clippy::partialeq_ne_impl)] + fn ne(&self, other: &MiniArc) -> bool { + !Self::ptr_eq(self, other) && *(*self) != *(*other) + } +} + +impl Eq for MiniArc {} + +impl fmt::Display for MiniArc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&**self, f) + } +} + +impl fmt::Debug for MiniArc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(&**self, f) + } +} + +impl fmt::Pointer for MiniArc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Pointer::fmt(&self.ptr.as_ptr(), f) + } +} + +impl Hash for MiniArc { + fn hash(&self, state: &mut H) { + (**self).hash(state) + } +} + +#[cfg(all(test, not(moka_loom)))] +mod tests { + use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + + use super::*; + + #[test] + fn test_drop() { + static NUM_DROPS: AtomicUsize = AtomicUsize::new(0); + + struct DetectDrop; + + impl Drop for DetectDrop { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Relaxed); + } + } + + // Create two MiniArcs sharing an object containing a string + // and a DetectDrop, to detect when it is dropped. + let x = MiniArc::new(("hello", DetectDrop)); + let y = x.clone(); + + // Send x to another thread, and use it there. + let t = std::thread::spawn(move || { + assert_eq!(x.0, "hello"); + }); + + // In parallel, y should still be usable here. + assert_eq!(y.0, "hello"); + assert!(MiniArc::count(&y) >= 1); + + // Wait for the thread to finish. + t.join().unwrap(); + + // One MiniArc, x, should be dropped by now. + // We still have y, so the object should not have been dropped yet. + assert_eq!(NUM_DROPS.load(Relaxed), 0); + assert_eq!(MiniArc::count(&y), 1); + + // Drop the remaining `MiniArc`. + drop(y); + + // Now that `y` is dropped too, + // the object should have been dropped. + assert_eq!(NUM_DROPS.load(Relaxed), 1); + } + + #[test] + fn test_eq() { + let w = MiniArc::new(6502); + let x = w.clone(); + let y = MiniArc::new(6502); + let z = MiniArc::new(8086); + + assert_eq!(w, x); + assert_eq!(x, w); + assert_eq!(w, y); + assert_eq!(y, w); + assert_ne!(y, z); + assert_ne!(z, y); + } + + #[test] + fn test_partial_eq_bug() { + let float = f32::NAN; + assert_ne!(float, float); + let arc = MiniArc::new(f32::NAN); + // TODO: this is a bug. + // See: https://github.com/Manishearth/triomphe/pull/88 + assert_eq!(arc, arc); + } + + #[allow(dead_code)] + const fn is_partial_eq() {} + + #[allow(dead_code)] + const fn is_eq() {} + + // compile-time check that PartialEq/Eq is correctly derived + const _: () = is_partial_eq::>(); + const _: () = is_eq::>(); +} + +#[cfg(all(test, moka_loom))] +mod loom_tests { + use super::*; + + #[test] + fn test_drop() { + use loom::sync::atomic::{AtomicUsize, Ordering::Relaxed}; + + struct DetectDrop(loom::sync::Arc); + + impl Drop for DetectDrop { + fn drop(&mut self) { + self.0.fetch_add(1, Relaxed); + } + } + + loom::model(move || { + let num_drops = loom::sync::Arc::new(AtomicUsize::new(0)); + + // Create two MiniArcs sharing an object containing a string + // and a DetectDrop, to detect when it is dropped. + let x = MiniArc::new(("hello", DetectDrop(loom::sync::Arc::clone(&num_drops)))); + let y = x.clone(); + + // Send x to another thread, and use it there. + let t = loom::thread::spawn(move || { + assert_eq!(x.0, "hello"); + }); + + // In parallel, y should still be usable here. + assert_eq!(y.0, "hello"); + assert!(MiniArc::count(&y) >= 1); + + // Wait for the thread to finish. + t.join().unwrap(); + + // One MiniArc, x, should be dropped by now. + // We still have y, so the object should not have been dropped yet. + assert_eq!(num_drops.load(Relaxed), 0); + assert_eq!(MiniArc::count(&y), 1); + + // Drop the remaining `MiniArc`. + drop(y); + + // Now that `y` is dropped too, + // the object should have been dropped. + assert_eq!(num_drops.load(Relaxed), 1); + }); + } +} diff --git a/src/common/concurrent/deques.rs b/src/common/concurrent/deques.rs index 8a14c7e4..94d235bb 100644 --- a/src/common/concurrent/deques.rs +++ b/src/common/concurrent/deques.rs @@ -1,10 +1,9 @@ -use super::{KeyHashDate, ValueEntry}; +use super::{arc::MiniArc, KeyHashDate, ValueEntry}; use crate::common::{ deque::{DeqNode, Deque}, CacheRegion, }; -use moka_arc::MiniArc; use std::ptr::NonNull; use tagptr::TagNonNull; diff --git a/src/common/timer_wheel.rs b/src/common/timer_wheel.rs index 9598bbbf..0b74b9df 100644 --- a/src/common/timer_wheel.rs +++ b/src/common/timer_wheel.rs @@ -14,12 +14,11 @@ use std::{ptr::NonNull, time::Duration}; use super::{ - concurrent::{entry_info::EntryInfo, DeqNodes}, + concurrent::{arc::MiniArc, entry_info::EntryInfo, DeqNodes}, deque::{DeqNode, Deque}, time::{CheckedTimeOps, Instant}, }; -use moka_arc::MiniArc; use parking_lot::Mutex; const BUCKET_COUNTS: &[u64] = &[ @@ -560,12 +559,10 @@ mod tests { use super::{TimerEvent, TimerWheel, SPANS}; use crate::common::{ - concurrent::{entry_info::EntryInfo, KeyHash}, + concurrent::{arc::MiniArc, entry_info::EntryInfo, KeyHash}, time::{CheckedTimeOps, Clock, Instant, Mock}, }; - use moka_arc::MiniArc; - #[test] fn test_bucket_indices() { fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) { diff --git a/src/future/base_cache.rs b/src/future/base_cache.rs index d6cc3ccc..6815004e 100644 --- a/src/future/base_cache.rs +++ b/src/future/base_cache.rs @@ -10,6 +10,7 @@ use crate::{ common::{ self, concurrent::{ + arc::MiniArc, atomic_time::AtomicInstant, constants::{ READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT, @@ -39,7 +40,6 @@ use async_lock::{Mutex, MutexGuard, RwLock}; use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; use futures_util::future::BoxFuture; -use moka_arc::MiniArc; use parking_lot::RwLock as SyncRwLock; use smallvec::SmallVec; use std::{ diff --git a/src/future/invalidator.rs b/src/future/invalidator.rs index 7bc4eb68..5c48cd12 100644 --- a/src/future/invalidator.rs +++ b/src/future/invalidator.rs @@ -1,7 +1,7 @@ use super::{base_cache::Inner, PredicateId, PredicateIdStr}; use crate::{ common::{ - concurrent::{AccessTime, KvEntry, ValueEntry}, + concurrent::{arc::MiniArc, AccessTime, KvEntry, ValueEntry}, time::Instant, }, notification::RemovalCause, @@ -9,7 +9,6 @@ use crate::{ }; use async_lock::{Mutex, MutexGuard}; -use moka_arc::MiniArc; use std::{ hash::{BuildHasher, Hash}, sync::{ diff --git a/src/future/key_lock.rs b/src/future/key_lock.rs index 6449aa10..08f73b8f 100644 --- a/src/future/key_lock.rs +++ b/src/future/key_lock.rs @@ -3,10 +3,9 @@ use std::{ sync::Arc, }; -use crate::cht::SegmentedHashMap; +use crate::{cht::SegmentedHashMap, common::concurrent::arc::MiniArc}; use async_lock::{Mutex, MutexGuard}; -use moka_arc::MiniArc; const LOCK_MAP_NUM_SEGMENTS: usize = 64; diff --git a/src/future/value_initializer.rs b/src/future/value_initializer.rs index c56eeacc..fb9e87be 100644 --- a/src/future/value_initializer.rs +++ b/src/future/value_initializer.rs @@ -1,6 +1,5 @@ use async_lock::{RwLock, RwLockWriteGuard}; use futures_util::FutureExt; -use moka_arc::MiniArc; use std::{ any::{Any, TypeId}, fmt, @@ -11,6 +10,7 @@ use std::{ }; use crate::{ + common::concurrent::arc::MiniArc, ops::compute::{CompResult, Op}, Entry, }; diff --git a/src/sync/value_initializer.rs b/src/sync/value_initializer.rs index 66034972..cf29b14a 100644 --- a/src/sync/value_initializer.rs +++ b/src/sync/value_initializer.rs @@ -1,4 +1,3 @@ -use moka_arc::MiniArc; use parking_lot::RwLock; use std::{ any::{Any, TypeId}, @@ -8,6 +7,7 @@ use std::{ }; use crate::{ + common::concurrent::arc::MiniArc, ops::compute::{CompResult, Op}, Entry, }; diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 1d3a6281..4f8dee7a 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -9,6 +9,7 @@ use crate::{ common::{ self, concurrent::{ + arc::MiniArc, atomic_time::AtomicInstant, constants::{ READ_LOG_CH_SIZE, READ_LOG_FLUSH_POINT, WRITE_LOG_CH_SIZE, WRITE_LOG_FLUSH_POINT, @@ -32,7 +33,6 @@ use crate::{ use crossbeam_channel::{Receiver, Sender, TrySendError}; use crossbeam_utils::atomic::AtomicCell; -use moka_arc::MiniArc; use parking_lot::{Mutex, RwLock}; use smallvec::SmallVec; use std::{ diff --git a/src/sync_base/invalidator.rs b/src/sync_base/invalidator.rs index f51435a6..f5ff7dc4 100644 --- a/src/sync_base/invalidator.rs +++ b/src/sync_base/invalidator.rs @@ -1,14 +1,13 @@ use super::{base_cache::Inner, PredicateId, PredicateIdStr}; use crate::{ common::{ - concurrent::{AccessTime, KvEntry, ValueEntry}, + concurrent::{arc::MiniArc, AccessTime, KvEntry, ValueEntry}, time::Instant, }, notification::RemovalCause, PredicateError, }; -use moka_arc::MiniArc; use parking_lot::{Mutex, MutexGuard}; use std::{ hash::{BuildHasher, Hash}, diff --git a/src/sync_base/key_lock.rs b/src/sync_base/key_lock.rs index c58c6eed..ae470401 100644 --- a/src/sync_base/key_lock.rs +++ b/src/sync_base/key_lock.rs @@ -3,9 +3,8 @@ use std::{ sync::Arc, }; -use crate::cht::SegmentedHashMap; +use crate::{cht::SegmentedHashMap, common::concurrent::arc::MiniArc}; -use moka_arc::MiniArc; use parking_lot::{Mutex, MutexGuard}; const LOCK_MAP_NUM_SEGMENTS: usize = 64;