-
-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Per-entry expiration #248
Per-entry expiration #248
Conversation
5567649
to
2ef3eb8
Compare
fwiw, mine is pretty low-level as I wanted to minimize time-space overhead, and could couple it to the implementation. It is a little harder to think through since I wanted to avoid an expensive division / modulus instruction in favor of bit manipulations and didn't want to rely on the compiler as I cannot assert its optimizations in Java. I couldn't find a good reference to crib from, though I did ask tokio-rs/tokio-timer#30 about their hashed wheel. I guess that inspired them since they rewrote it and theirs is more generalized, so it might be another reference or easy to borrow directly. |
Add `TimerWheel` to manage the per-entry expiration with amortized O(1) time.
2ef3eb8
to
c5d75db
Compare
- Continue implementing the `TimerWheel`. - Add `key_hash` and `expiration_time` fields to the `EntryInfo`.
- Fix compile errors when no default feature is enabled. - Fix Clippy warnings.
Continue implementing the `TimerWheel`.
Continue implementing the `TimerWheel`.
Change the iterator created by `TimerWheel::advance` method to iterate over `TimerEvent`s.
f3db798
to
d0caa64
Compare
Update internal `base_cache` module to use the timer wheel.
Update internal `base_cache` module to use the timer wheel.
- Remove `#![allow(unused)]` and fix the warnings. - Add a `drop` call for clarity.
- Add a public `Expiry` trait. - Update `base_cache` module to use it when reading from and writing to a cache. - Update `future::CacheBuilder` to allow to set an `Expiry` to a `future::Cache`.
9f37ab1
to
068083a
Compare
- Add a public `Expiry` trait. - Update `sync::CacheBuilder` to allow to set an `Expiry` to a `sync` `Cache` and `SegmentedCache`. - Add unit tests for the expiry to `future::Cache`.
Update the `expire_after_read_or_update` method of `BaseCache` to use all per-entry expiration time and cache-wide time-to-live and time-to-idle policies to calculate the `current_duration`.
Add sentinel nodes to the queues in the `TimerWheel` to avoid the `advance` method from going into an infinite loop.
Hi. The per-entry expiration feature is ready to play with. It has not been thoroughly tested yet, and is definitely not ready for a release yet, but I would like to get some feedback from you guys. Cargo.toml [dependencies]
moka = { git = "https://github.com/moka-rs/moka.git", branch = "per-entry-expiration", features = ["future"] } As for the API, I took the suggestion from @ben-manes, the creator of the Caffeine cache, to use a callback evaluator The pub trait Expiry<K, V> {
/// Specifies that the entry should be automatically removed from the cache once
/// the duration has elapsed after the entry's creation. Returning `None`
/// indicates no expiration for the entry.
/// ...
fn expire_after_create(&self, key: &K, value: &V, current_time: Instant) -> Option<Duration> {
None // No expiry.
}
/// Specifies that the entry should be automatically removed from the cache once
/// the duration has elapsed after its last read. Returning `None` indicates no
/// expiration for the entry. Returning `current_duration` will not modify the
/// expiration time.
/// ...
fn expire_after_read(
&self,
key: &K,
value: &V,
current_time: Instant,
// The duration until this entry expires.
current_duration: Option<Duration>,
// The time when this entry was modified (inserted or replaced).
last_modified_at: Instant,
) -> Option<Duration> {
current_duration // No change.
}
/// Specifies that the entry should be automatically removed from the cache once
/// the duration has elapsed after the replacement of its value. Returning `None`
/// indicates no expiration for the entry. Returning `current_duration` will not
/// modify the expiration time.
/// ...
fn expire_after_update(
&self,
key: &K,
value: &V,
current_time: Instant,
// The duration until this entry expires.
current_duration: Option<Duration>,
) -> Option<Duration> {
current_duration // No change.
}
} For more details about the methods, see the Here is a working example. You will create a type that implements the // Cargo.toml
// [dependencies]
// moka = { git = "https://github.com/moka-rs/moka.git", branch = "per-entry-expiration", features = ["future"] }
// tokio = { version = "1.27.0", features = ["rt-multi-thread", "time", "macros"] }
use moka::{future::Cache, Expiry};
use std::time::{Duration, Instant};
// In this example, we will create a `future::Cache` with `u32` as the key, and
// `(Expiration, String)` as the value. `Expiration` is an enum to represent the
// expiration of the value, and `String` is the application data of the value.
/// An enum to represent the expiration of a value.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum Expiration {
/// The value never expires.
Never,
/// The value expires after a short time. (5 seconds in this example)
AfterShortTime,
/// The value expires after a long time. (15 seconds in this example)
AfterLongTime,
}
impl Expiration {
/// Returns the duration of this expiration.
pub fn as_duration(&self) -> Option<Duration> {
match self {
Expiration::Never => None,
Expiration::AfterShortTime => Some(Duration::from_secs(5)),
Expiration::AfterLongTime => Some(Duration::from_secs(15)),
}
}
}
/// An expiry that implements `moka::Expiry` trait. `Expiry` trait provides the
/// default implementations of three callback methods `expire_after_create`,
/// `expire_after_read`, and `expire_after_update`.
///
/// In this example, we only override the `expire_after_create` method.
pub struct MyExpiry;
impl Expiry<u32, (Expiration, String)> for MyExpiry {
/// Returns the duration of the expiration of the value that was just
/// created.
fn expire_after_create(
&self,
_key: &u32,
value: &(Expiration, String),
_current_time: Instant,
) -> Option<Duration> {
let duration = value.0.as_duration();
println!("MyExpiry: expire_after_create called with key {_key} and value {value:?}. Returning {duration:?}.");
duration
}
}
#[tokio::main]
async fn main() {
// Create a `Cache<u32, (Expiration, String)>` with an expiry `MyExpiry` and
// eviction listener.
let expiry = MyExpiry;
let eviction_listener = |key, _value, cause| {
println!("Evicted key {key}. Cause: {cause:?}");
};
let cache = Cache::builder()
.max_capacity(100)
.expire_after(expiry)
.eviction_listener_with_queued_delivery_mode(eviction_listener)
.build();
// Insert some entries into the cache with different expirations.
cache
.get_with(0, async { (Expiration::AfterShortTime, "a".to_string()) })
.await;
cache
.get_with(1, async { (Expiration::AfterLongTime, "b".to_string()) })
.await;
cache
.get_with(2, async { (Expiration::Never, "c".to_string()) })
.await;
// Verify that all the inserted entries exist.
assert!(cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
// Sleep for 6 seconds. Key 0 should expire.
println!("\nSleeping for 6 seconds...\n");
tokio::time::sleep(Duration::from_secs(6)).await;
println!("Entry count: {}", cache.entry_count());
// Verify that key 0 has been evicted.
assert!(!cache.contains_key(&0));
assert!(cache.contains_key(&1));
assert!(cache.contains_key(&2));
// Sleep for 10 more seconds. Key 1 should expire.
println!("\nSleeping for 10 seconds...\n");
tokio::time::sleep(Duration::from_secs(10)).await;
println!("Entry count: {}", cache.entry_count());
// Verify that key 1 has been evicted.
assert!(!cache.contains_key(&1));
assert!(cache.contains_key(&2));
// Manually invalidate key 2.
cache.invalidate(&2).await;
assert!(!cache.contains_key(&2));
println!("\nSleeping for a second...\n");
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Entry count: {}", cache.entry_count());
println!("\nDone!");
} Outputs MyExpiry: expire_after_create called with key 0 and value (AfterShortTime, "a"). Returning Some(5s).
MyExpiry: expire_after_create called with key 1 and value (AfterLongTime, "b"). Returning Some(15s).
MyExpiry: expire_after_create called with key 2 and value (Never, "c"). Returning None.
Sleeping for 6 seconds...
Evicted key 0. Cause: Expired
Entry count: 2
Sleeping for 10 seconds...
Evicted key 1. Cause: Expired
Entry count: 1
Sleeping for a second...
Evicted key 2. Cause: Explicit
Entry count: 0
Done! |
Refactoring: Replace two internal functions in `TimerWheel` with one.
- Add an argument `last_modified_at` to `Expiry::expire_after_read`. - Fix FIXME TODOs in the `TimerWheel`.
Just a heads-up. I added an argument |
OK. I fixed this bug via a commit 0842409. Make sure you run |
I could not reproduce the hung issue. I used the head of Please check if the hung issue remains (when you have time). If so, any additional information would be helpful. |
I agree that a callback-based API makes sense to have maximum flexibility and control. However it does introduce a bit of boilerplate, as I have to still keep an I have some infrastructure issues putting this into a canary deployment anyway, so I guess that was actually a good thing to have delayed me, given this seems to still be deadlocking. |
Sorry for the late reply, I've been quite busy for a few days. I will check it again with the latest branch when I have time and get back to you with the result and more information as soon as possible. Also, the expiry implementation looks like this. Because it's an internal project running in the product environment, it's a bit harder to test and unfortunately I can't provide you the full backtrace information. pub struct MyExpiry;
pub struct MyResp {
ttl: std::time::Duration,
}
impl Expiry<faststr::FastStr, Option<Arc<RwLock<MyResp>>>> for MyExpiry {
fn expire_after_create(
&self,
_key: &faststr::FastStr,
value: &Option<Arc<RwLock<MyResp>>>,
_current_time: std::time::Instant,
) -> Option<Duration> {
if let Some(resp) = value {
Some(resp.read().ttl)
} else {
Some(std::time::Duration::from_secs(60 * 60))
}
}
} |
Thank you for the updates and feedback. Millione, please take your time. And sorry about the quality. I have been running very intense, multi-threaded workloads using Mokabench for few hours, and things are stable now (no panics, no write stalls, etc.). My expiry implementation is here. I hope using the latest branch may help. Here are last four commits with fixes to prevent panics and an infinite loop.
On next few days, I will check if there are any memory leaks in unsafe codes. And also write more unit tests. |
Thank you very much for the great work you have done and for providing us with detailed information on how to solve the problem. It definitely helps a lot. Much appreciated. |
Replace `AtomicU8` fields in `TimerNode` with a `Option<(u8, u8)>` field. Interior mutability was not needed.
- Enable `miri` tests on the `TimerWheel` on CI. - Fix a `miri` error (potential UB) in the `TimerWheel`. - Add FIXME TODOs for some deque `peek_front` usages in `sync_base::Inner` (potential UBs).
Add/re-enable unit tests.
Add unit tests.
Write the docs.
Remove the key and hash from the nodes of the access-order and write-order queues. Use the key and hash in the `EntryInfo` instead.
Re-enable a unit test for 32-bit Linux platforms.
@tatsuya6502 Well, I have refactored my code and can no longer reproduce it. So I think it might be my own code problem. |
@Millione — Thanks a lot for testing it again! That is a good news. |
I checked it few days ago and looked okay.
I did not have chance to do it, and could not write doc with examples. I believe this PR got all the planned functionalities already. I am going to review and merge this into the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some suggested changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merging.
Fixes #231.
Action Items
sync
andfuture
caches to use the new data structure for:insert
andinvalidate
.d. Update the(#227 comment)sync
andfuture
cache API to provideexpires_after
method.sync
andfuture
caches to be configured withexpire_after
at the cache creation time.expire_after
takes an expiry that implementsExpiry
trait. (#227 comment)