Skip to content

Commit

Permalink
refactor: add right bound when listing expired keys to avoid unnecess…
Browse files Browse the repository at this point in the history
…ary data copy (#15334)
  • Loading branch information
drmingdrmer authored Apr 25, 2024
1 parent 6abee1b commit 97277ad
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl<'a> Applier<'a> {
info!("to clean expired kvs, log_time_ts: {}", log_time_ms);

let mut to_clean = vec![];
let mut strm = self.sm.list_expire_index().await?;
let mut strm = self.sm.list_expire_index(log_time_ms).await?;

{
let mut strm = std::pin::pin!(strm);
Expand Down
15 changes: 15 additions & 0 deletions src/meta/raft-store/src/sm_v002/leveled_store/level.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::ops::RangeBounds;

use databend_common_meta_types::KVMeta;
use futures_util::StreamExt;
use log::warn;

use crate::sm_v002::leveled_store::map_api::AsMap;
use crate::sm_v002::leveled_store::map_api::KVResultStream;
Expand Down Expand Up @@ -109,6 +110,13 @@ impl MapApiRO<String> for Level {
.map(|(k, v)| (k.clone(), v.clone()))
.collect::<Vec<_>>();

if vec.len() > 1000 {
warn!(
"Level::<ExpireKey>::range() returns big range of len={}",
vec.len()
);
}

let strm = futures::stream::iter(vec).map(Ok).boxed();
Ok(strm)
}
Expand Down Expand Up @@ -159,6 +167,13 @@ impl MapApiRO<ExpireKey> for Level {
.map(|(k, v)| (*k, v.clone()))
.collect::<Vec<_>>();

if vec.len() > 1000 {
warn!(
"Level::<ExpireKey>::range() returns big range of len={}",
vec.len()
);
}

let strm = futures::stream::iter(vec).map(Ok).boxed();
Ok(strm)
}
Expand Down
20 changes: 17 additions & 3 deletions src/meta/raft-store/src/sm_v002/sm_v002.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,12 +299,26 @@ impl SMV002 {
self.expire_cursor = ExpireKey::new(log_time_ms, 0);
}

/// List expiration index by expiration time.
/// List expiration index by expiration time,
/// upto current time(exclusive) in milli seconds.
///
/// Only records with expire time less than current time will be returned.
/// Expire time that equals to current time is not considered expired.
pub(crate) async fn list_expire_index(
&self,
curr_time_ms: u64,
) -> Result<impl Stream<Item = Result<(ExpireKey, String), io::Error>> + '_, io::Error> {
let start = self.expire_cursor;
let strm = self.levels.expire_map().range(start..).await?;

// curr_time > expire_at => expired
let end = ExpireKey::new(curr_time_ms, 0);

// There is chance the raft leader produce smaller timestamp for later logs.
if start >= end {
return Ok(futures::stream::empty().boxed());
}

let strm = self.levels.expire_map().range(start..end).await?;

let strm = strm
// Return only non-deleted records
Expand All @@ -313,7 +327,7 @@ impl SMV002 {
future::ready(Ok(expire_entry))
});

Ok(strm)
Ok(strm.boxed())
}

pub fn sys_data_ref(&self) -> &SysData {
Expand Down
32 changes: 29 additions & 3 deletions src/meta/raft-store/src/sm_v002/sm_v002_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,25 @@ async fn test_internal_expire_index() -> anyhow::Result<()> {
async fn test_list_expire_index() -> anyhow::Result<()> {
let mut sm = build_sm_with_expire().await?;

let curr_time_ms = 5000;
let got = sm
.list_expire_index()
.list_expire_index(curr_time_ms)
.await?
.try_collect::<Vec<_>>()
.await?;
assert!(got.is_empty());

let curr_time_ms = 5001;
let got = sm
.list_expire_index(curr_time_ms)
.await?
.try_collect::<Vec<_>>()
.await?;
assert_eq!(got, vec![(ExpireKey::new(5000, 2), s("b")),]);

let curr_time_ms = 20_001;
let got = sm
.list_expire_index(curr_time_ms)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -234,8 +251,9 @@ async fn test_list_expire_index() -> anyhow::Result<()> {

sm.update_expire_cursor(15000);

let curr_time_ms = 20_001;
let got = sm
.list_expire_index()
.list_expire_index(curr_time_ms)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -260,8 +278,16 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> {

assert_eq!(sm.get_maybe_expired_kv("a").await?, None, "a is expired");

// List until 20_000 ms
let got = sm
.list_expire_index(20_000)
.await?
.try_collect::<Vec<_>>()
.await?;
assert!(got.is_empty());

let got = sm
.list_expire_index()
.list_expire_index(20_001)
.await?
.try_collect::<Vec<_>>()
.await?;
Expand Down

0 comments on commit 97277ad

Please sign in to comment.