From 556764943e315224337420293437d9036a85e1f9 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Fri, 31 Mar 2023 08:39:51 +0800 Subject: [PATCH] Per-entry expiration Add `TimerWheel` to manage the per-entry expiration with amortized O(1) time. --- .vscode/settings.json | 2 + src/common.rs | 1 + src/common/time.rs | 10 ++ src/common/timer_wheel.rs | 225 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 238 insertions(+) create mode 100644 src/common/timer_wheel.rs diff --git a/.vscode/settings.json b/.vscode/settings.json index 8f962be7..30e7e7e8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -19,10 +19,12 @@ "deqs", "Deque", "Deques", + "deschedule", "devcontainer", "docsrs", "Einziger", "else's", + "ENHANCEME", "Eytan", "getrandom", "hashbrown", diff --git a/src/common.rs b/src/common.rs index c056e523..5b718cad 100644 --- a/src/common.rs +++ b/src/common.rs @@ -13,6 +13,7 @@ pub(crate) mod builder_utils; pub(crate) mod deque; pub(crate) mod frequency_sketch; pub(crate) mod time; +pub(crate) mod timer_wheel; #[cfg(all(test, any(feature = "sync", feature = "future")))] pub(crate) mod test_utils; diff --git a/src/common/time.rs b/src/common/time.rs index 163955d2..eb971a8d 100644 --- a/src/common/time.rs +++ b/src/common/time.rs @@ -19,6 +19,10 @@ pub(crate) trait CheckedTimeOps { fn checked_add(&self, duration: Duration) -> Option where Self: Sized; + + fn checked_duration_since(&self, earlier: Self) -> Option + where + Self: Sized; } impl Instant { @@ -40,4 +44,10 @@ impl CheckedTimeOps for Instant { fn checked_add(&self, duration: Duration) -> Option { self.0.checked_add(duration).map(Instant) } + + fn checked_duration_since(&self, earlier: Self) -> Option + where + Self: Sized { + self.0.checked_duration_since(earlier.0) + } } diff --git a/src/common/timer_wheel.rs b/src/common/timer_wheel.rs new file mode 100644 index 00000000..d1ce2479 --- /dev/null +++ b/src/common/timer_wheel.rs @@ -0,0 +1,225 @@ +// License and Copyright Notice: +// +// Some of the code and doc comments in this module were ported or copied from +// a Java class `com.github.benmanes.caffeine.cache.TimerWheel` of Caffeine. +// https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java +// +// The original code/comments from Caffeine are licensed under the Apache License, +// Version 2.0 +// +// Copyrights of the original code/comments are retained by their contributors. +// For full authorship information, see the version control history of +// https://github.com/ben-manes/caffeine/ + +#![allow(unused)] // TODO: Remove this. + +use std::{convert::TryInto, ptr::NonNull, time::Duration}; + +use super::{ + deque::{DeqNode, Deque}, + time::{CheckedTimeOps, Instant}, +}; + +const BUCKET_SIZES: &[u64] = &[ + 64, // roughly seconds + 64, // roughly minutes + 32, // roughly hours + 4, // roughly days + 1, // overflow (> ~6.5 days) +]; + +const OVERFLOW_QUEUE_INDEX: usize = BUCKET_SIZES.len() - 1; +const NUM_LEVELS: usize = OVERFLOW_QUEUE_INDEX - 1; + +const DAY: Duration = Duration::from_secs(60 * 60 * 24); + +const SPANS: &[u64] = &[ + aligned_duration(Duration::from_secs(1)), // 1.07s + aligned_duration(Duration::from_secs(60)), // 1.14m + aligned_duration(Duration::from_secs(60 * 60)), // 1.22h + aligned_duration(DAY), // 1.63d + BUCKET_SIZES[3] as u64 * aligned_duration(DAY), // 6.5d + BUCKET_SIZES[3] as u64 * aligned_duration(DAY), // 6.5d +]; + +const SHIFT: &[u64] = &[ + SPANS[0].trailing_zeros() as u64, + SPANS[1].trailing_zeros() as u64, + SPANS[2].trailing_zeros() as u64, + SPANS[3].trailing_zeros() as u64, + SPANS[4].trailing_zeros() as u64, +]; + +/// Returns the next power of two of the duration in nanoseconds. +const fn aligned_duration(duration: Duration) -> u64 { + // NOTE: as_nanos() returns u128, so convert it to u64 by using `as`. + // We cannot call TryInto::try_into() here because it is not a const fn. + (duration.as_nanos() as u64).next_power_of_two() +} + +/// A hierarchical timer wheel to add, remove, and fire expiration events in +/// amortized O(1) time. +/// +/// The expiration events are deferred until the timer is advanced, which is +/// performed as part of the cache's housekeeping cycle. +pub(crate) struct TimerWheel { + wheels: Box<[Box<[Deque]>]>, + /// The time when this timer wheel was created. + origin: Instant, + /// The time when this timer wheel was last advanced. + current: Instant, +} + +impl TimerWheel { + fn new(now: Instant) -> Self { + let wheels = BUCKET_SIZES + .iter() + .map(|b| { + (0..*b) + .map(|_| Deque::new(super::CacheRegion::Other)) + .collect::>() + .into_boxed_slice() + }) + .collect::>() + .into_boxed_slice(); + Self { + wheels, + origin: now, + current: now, + } + } + + /// Schedules a timer event for the node. + // pub(crate) fn schedule(&mut self, node: Box>) { + // if let Some(t) = node.element.expiration_time() { + // let (level, index) = self.bucket_indices(t); + // self.wheels[level][index].push_back(node); + // } + // } + + // /// Reschedules an active timer event for the node. + // pub(crate) fn reschedule(&mut self, node: NonNull>) {} + + /// Removes a timer event for this node if present. + // pub(crate) fn deschedule(&mut self, node: NonNull>) { + // if let Some(t) = node.element.expiration_time() { + // let (level, index) = self.bucket_indices(t); + // unsafe { self.wheels[level][index].unlink_and_drop(node) }; + // } + // } + + /// Returns the bucket indices to locate the bucket that the timer event + /// should be added to. + fn bucket_indices(&self, time: Instant) -> (usize, usize) { + let duration = time + .checked_duration_since(self.current) + // FIXME: unwrap will panic if the time is earlier than self.current. + .unwrap() + .as_nanos() as u64; + // ENHANCEME: Check overflow? (u128 -> u64) + // FIXME: unwrap will panic if the time is earlier than self.origin. + let time_nano = time.checked_duration_since(self.origin).unwrap().as_nanos() as u64; + for level in 0..=NUM_LEVELS { + if duration < SPANS[level + 1] { + let ticks = time_nano >> SHIFT[level]; + let index = ticks & (BUCKET_SIZES[level] - 1); + return (level, index as usize); + } + } + (OVERFLOW_QUEUE_INDEX, 0) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::{TimerWheel, SPANS}; + use crate::common::time::{CheckedTimeOps, Clock, Instant}; + + #[test] + fn test_bucket_indices() { + fn dur(nanos: u64) -> Duration { + Duration::from_nanos(nanos) + } + + fn bi(timer: &TimerWheel<()>, now: Instant, dur: Duration) -> (usize, usize) { + let t = now.checked_add(dur).unwrap(); + timer.bucket_indices(t) + } + + let (clock, mock) = Clock::mock(); + let now = Instant::new(clock.now()); + + let mut timer = TimerWheel::<()>::new(now); + assert_eq!(timer.bucket_indices(now), (0, 0)); + + // Level 0: 1.07s + assert_eq!(bi(&timer, now, dur(SPANS[0] - 1)), (0, 0)); + assert_eq!(bi(&timer, now, dur(SPANS[0])), (0, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[0] * 63)), (0, 63)); + + // Level 1: 1.14m + assert_eq!(bi(&timer, now, dur(SPANS[0] * 64)), (1, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[1])), (1, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[1] * 63 + SPANS[0] * 63)), (1, 63)); + + // Level 2: 1.22h + assert_eq!(bi(&timer, now, dur(SPANS[1] * 64)), (2, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[2])), (2, 1)); + assert_eq!( + bi( + &timer, + now, + dur(SPANS[2] * 31 + SPANS[1] * 63 + SPANS[0] * 63) + ), + (2, 31) + ); + + // Level 3: 1.63dh + assert_eq!(bi(&timer, now, dur(SPANS[2] * 32)), (3, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[3])), (3, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[3] * 3)), (3, 3)); + + // Overflow + assert_eq!(bi(&timer, now, dur(SPANS[3] * 4)), (4, 0)); + assert_eq!(bi(&timer, now, dur(SPANS[4])), (4, 0)); + assert_eq!(bi(&timer, now, dur(SPANS[4] * 100)), (4, 0)); + + // Increment the clock by 5 ticks. (1 tick ~= 1.07s) + mock.increment(dur(SPANS[0] * 5)); + let now = Instant::new(clock.now()); + timer.current = now; + + // Level 0: 1.07s + assert_eq!(bi(&timer, now, dur(SPANS[0] - 1)), (0, 5)); + assert_eq!(bi(&timer, now, dur(SPANS[0])), (0, 6)); + assert_eq!(bi(&timer, now, dur(SPANS[0] * 63)), (0, 4)); + + // Level 1: 1.14m + assert_eq!(bi(&timer, now, dur(SPANS[0] * 64)), (1, 1)); + assert_eq!(bi(&timer, now, dur(SPANS[1])), (1, 1)); + assert_eq!( + bi(&timer, now, dur(SPANS[1] * 63 + SPANS[0] * (63 - 5))), + (1, 63) + ); + + // Increment the clock by 61 ticks. (total 66 ticks) + mock.increment(dur(SPANS[0] * 61)); + let now = Instant::new(clock.now()); + timer.current = now; + + // Level 0: 1.07s + assert_eq!(bi(&timer, now, dur(SPANS[0] - 1)), (0, 2)); + assert_eq!(bi(&timer, now, dur(SPANS[0])), (0, 3)); + assert_eq!(bi(&timer, now, dur(SPANS[0] * 63)), (0, 1)); + + // Level 1: 1.14m + assert_eq!(bi(&timer, now, dur(SPANS[0] * 64)), (1, 2)); + assert_eq!(bi(&timer, now, dur(SPANS[1])), (1, 2)); + assert_eq!( + bi(&timer, now, dur(SPANS[1] * 63 + SPANS[0] * (63 - 2))), + (1, 0) + ); + } +}