diff --git a/src/meta/raft-store/src/applier.rs b/src/meta/raft-store/src/applier.rs index e570d36128a5..1e9209328fdf 100644 --- a/src/meta/raft-store/src/applier.rs +++ b/src/meta/raft-store/src/applier.rs @@ -469,7 +469,7 @@ impl<'a> Applier<'a> { info!("to clean expired kvs, log_time_ts: {}", log_time_ms); let mut to_clean = vec![]; - let mut strm = self.sm.list_expire_index().await?; + let mut strm = self.sm.list_expire_index(log_time_ms).await?; { let mut strm = std::pin::pin!(strm); diff --git a/src/meta/raft-store/src/sm_v002/leveled_store/level.rs b/src/meta/raft-store/src/sm_v002/leveled_store/level.rs index ac560b330f58..760151d70e37 100644 --- a/src/meta/raft-store/src/sm_v002/leveled_store/level.rs +++ b/src/meta/raft-store/src/sm_v002/leveled_store/level.rs @@ -19,6 +19,7 @@ use std::ops::RangeBounds; use databend_common_meta_types::KVMeta; use futures_util::StreamExt; +use log::warn; use crate::sm_v002::leveled_store::map_api::AsMap; use crate::sm_v002::leveled_store::map_api::KVResultStream; @@ -109,6 +110,13 @@ impl MapApiRO for Level { .map(|(k, v)| (k.clone(), v.clone())) .collect::>(); + if vec.len() > 1000 { + warn!( + "Level::::range() returns big range of len={}", + vec.len() + ); + } + let strm = futures::stream::iter(vec).map(Ok).boxed(); Ok(strm) } @@ -159,6 +167,13 @@ impl MapApiRO for Level { .map(|(k, v)| (*k, v.clone())) .collect::>(); + if vec.len() > 1000 { + warn!( + "Level::::range() returns big range of len={}", + vec.len() + ); + } + let strm = futures::stream::iter(vec).map(Ok).boxed(); Ok(strm) } diff --git a/src/meta/raft-store/src/sm_v002/sm_v002.rs b/src/meta/raft-store/src/sm_v002/sm_v002.rs index ed24e0d180fd..2cab6181d335 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002.rs @@ -299,12 +299,26 @@ impl SMV002 { self.expire_cursor = ExpireKey::new(log_time_ms, 0); } - /// List expiration index by expiration time. + /// List expiration index by expiration time, + /// upto current time(exclusive) in milli seconds. + /// + /// Only records with expire time less than current time will be returned. + /// Expire time that equals to current time is not considered expired. pub(crate) async fn list_expire_index( &self, + curr_time_ms: u64, ) -> Result> + '_, io::Error> { let start = self.expire_cursor; - let strm = self.levels.expire_map().range(start..).await?; + + // curr_time > expire_at => expired + let end = ExpireKey::new(curr_time_ms, 0); + + // There is chance the raft leader produce smaller timestamp for later logs. + if start >= end { + return Ok(futures::stream::empty().boxed()); + } + + let strm = self.levels.expire_map().range(start..end).await?; let strm = strm // Return only non-deleted records @@ -313,7 +327,7 @@ impl SMV002 { future::ready(Ok(expire_entry)) }); - Ok(strm) + Ok(strm.boxed()) } pub fn sys_data_ref(&self) -> &SysData { diff --git a/src/meta/raft-store/src/sm_v002/sm_v002_test.rs b/src/meta/raft-store/src/sm_v002/sm_v002_test.rs index 4633d0434188..0bf37600a64a 100644 --- a/src/meta/raft-store/src/sm_v002/sm_v002_test.rs +++ b/src/meta/raft-store/src/sm_v002/sm_v002_test.rs @@ -221,8 +221,25 @@ async fn test_internal_expire_index() -> anyhow::Result<()> { async fn test_list_expire_index() -> anyhow::Result<()> { let mut sm = build_sm_with_expire().await?; + let curr_time_ms = 5000; let got = sm - .list_expire_index() + .list_expire_index(curr_time_ms) + .await? + .try_collect::>() + .await?; + assert!(got.is_empty()); + + let curr_time_ms = 5001; + let got = sm + .list_expire_index(curr_time_ms) + .await? + .try_collect::>() + .await?; + assert_eq!(got, vec![(ExpireKey::new(5000, 2), s("b")),]); + + let curr_time_ms = 20_001; + let got = sm + .list_expire_index(curr_time_ms) .await? .try_collect::>() .await?; @@ -234,8 +251,9 @@ async fn test_list_expire_index() -> anyhow::Result<()> { sm.update_expire_cursor(15000); + let curr_time_ms = 20_001; let got = sm - .list_expire_index() + .list_expire_index(curr_time_ms) .await? .try_collect::>() .await?; @@ -260,8 +278,16 @@ async fn test_inserting_expired_becomes_deleting() -> anyhow::Result<()> { assert_eq!(sm.get_maybe_expired_kv("a").await?, None, "a is expired"); + // List until 20_000 ms + let got = sm + .list_expire_index(20_000) + .await? + .try_collect::>() + .await?; + assert!(got.is_empty()); + let got = sm - .list_expire_index() + .list_expire_index(20_001) .await? .try_collect::>() .await?;