Skip to content

Commit

Permalink
feat(meta): auto-clean expired keys
Browse files Browse the repository at this point in the history
- Clean upto 32 expired keys before applying a raft-log.

- When modifying(insert/update/delete) a kv record, it emits at most two event:
  - If the existent key expired, emit a `delete` event, because a
    expired key is seen and removed.
  - Then the event of the update follows.

- Fix: #8489
- Fix: #8490
- Fix: #8540
  • Loading branch information
drmingdrmer committed Oct 31, 2022
1 parent b6a1728 commit 32df67d
Show file tree
Hide file tree
Showing 5 changed files with 573 additions and 90 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

207 changes: 182 additions & 25 deletions src/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,14 @@ use tracing::info;

use crate::config::RaftConfig;
use crate::sled_key_spaces::ClientLastResps;
use crate::sled_key_spaces::Expire;
use crate::sled_key_spaces::GenericKV;
use crate::sled_key_spaces::Nodes;
use crate::sled_key_spaces::Sequences;
use crate::sled_key_spaces::StateMachineMeta;
use crate::state_machine::ClientLastRespValue;
use crate::state_machine::ExpireKey;
use crate::state_machine::ExpireValue;
use crate::state_machine::MetaSnapshotId;
use crate::state_machine::StateMachineMetaKey;
use crate::state_machine::StateMachineMetaKey::Initialized;
Expand Down Expand Up @@ -259,19 +262,22 @@ impl StateMachine {
#[tracing::instrument(level = "debug", skip(self, entry), fields(log_id=%entry.log_id))]
pub async fn apply(&self, entry: &Entry<LogEntry>) -> Result<AppliedState, MetaStorageError> {
info!("apply: summary: {}", entry.summary(),);

let log_id = &entry.log_id;

debug!("sled tx start: {:?}", entry);

let log_id = &entry.log_id;
let log_time_ms = Self::get_log_time(entry);

let expired = self.list_expired_kvs(log_time_ms)?;
debug!("expired keys: {:?}", expired);

let kv_pairs = self.scan_prefix_if_needed(entry)?;

let result = self.sm_tree.txn(true, move |txn_tree| {
let txn_sm_meta = txn_tree.key_space::<StateMachineMeta>();
txn_sm_meta.insert(&LastApplied, &StateMachineMetaValue::LogId(*log_id))?;

self.clean_expired_kvs(&txn_tree, &expired)?;

match entry.payload {
EntryPayload::Blank => {
info!("apply: blank");
Expand Down Expand Up @@ -367,7 +373,7 @@ impl StateMachine {
key: &str,
txn_tree: &TransactionSledTree,
) -> Result<AppliedState, MetaStorageError> {
let r = self.txn_incr_seq(key, txn_tree)?;
let r = Self::txn_incr_seq(key, txn_tree)?;

Ok(r.into())
}
Expand Down Expand Up @@ -418,12 +424,17 @@ impl StateMachine {
) -> Result<AppliedState, MetaStorageError> {
debug!(upsert_kv = debug(upsert_kv), "apply_update_kv_cmd");

let (prev, result) = self.txn_upsert_kv(txn_tree, upsert_kv, log_time_ms)?;
let (expired, prev, result) = Self::txn_upsert_kv(txn_tree, upsert_kv, log_time_ms)?;

debug!("applied UpsertKV: {:?} {:?}", upsert_kv, result);

if let Some(subscriber) = &self.subscriber {
subscriber.kv_changed(&upsert_kv.key, prev.clone(), result.clone());
if expired.is_some() {
subscriber.kv_changed(&upsert_kv.key, expired, None);
}
if prev != result {
subscriber.kv_changed(&upsert_kv.key, prev.clone(), result.clone());
}
}

Ok(Change::new(prev, result).into())
Expand Down Expand Up @@ -551,7 +562,7 @@ impl StateMachine {
events: &mut Option<Vec<NotifyKVEvent>>,
log_time_ms: u64,
) -> Result<(), MetaStorageError> {
let (prev, result) = self.txn_upsert_kv(
let (expired, prev, result) = Self::txn_upsert_kv(
txn_tree,
&UpsertKV::update(&put.key, &put.value).with(KVMeta {
expire_at: put.expire_at,
Expand All @@ -560,7 +571,12 @@ impl StateMachine {
)?;

if let Some(events) = events {
events.push((put.key.to_string(), prev.clone(), result));
if expired.is_some() {
events.push((put.key.to_string(), expired, None));
}
if prev != result {
events.push((put.key.to_string(), prev.clone(), result));
}
}

let put_resp = TxnPutResponse {
Expand All @@ -587,11 +603,16 @@ impl StateMachine {
events: &mut Option<Vec<NotifyKVEvent>>,
log_time_ms: u64,
) -> Result<(), MetaStorageError> {
let (prev, result) =
self.txn_upsert_kv(txn_tree, &UpsertKV::delete(&delete.key), log_time_ms)?;
let (expired, prev, result) =
Self::txn_upsert_kv(txn_tree, &UpsertKV::delete(&delete.key), log_time_ms)?;

if let Some(events) = events {
events.push((delete.key.to_string(), prev.clone(), result));
if expired.is_some() {
events.push((delete.key.to_string(), expired, None));
}
if prev != result {
events.push((delete.key.to_string(), prev.clone(), result));
}
}

let del_resp = TxnDeleteResponse {
Expand Down Expand Up @@ -624,13 +645,18 @@ impl StateMachine {
if let Some(kv_pairs) = kv_pairs {
if let Some(kv_pairs) = kv_pairs.get(delete_by_prefix) {
for (key, _seq) in kv_pairs.iter() {
let ret = self.txn_upsert_kv(txn_tree, &UpsertKV::delete(key), log_time_ms);
let ret = Self::txn_upsert_kv(txn_tree, &UpsertKV::delete(key), log_time_ms);

if let Ok(ret) = ret {
count += 1;

if let Some(events) = events {
events.push((key.to_string(), ret.0.clone(), ret.1));
if ret.0.is_some() {
events.push((key.to_string(), ret.0.clone(), None));
}
if ret.1 != ret.2 {
events.push((key.to_string(), ret.1.clone(), ret.2));
}
}
}
}
Expand Down Expand Up @@ -791,11 +817,76 @@ impl StateMachine {
res
}

fn txn_incr_seq(
/// Before applying, list expired keys to clean.
///
/// Apply is done in a sled-txn tree, which does not provide listing function.
#[tracing::instrument(level = "debug", skip_all)]
pub fn list_expired_kvs(
&self,
log_time_ms: u64,
) -> Result<Vec<(String, ExpireKey)>, MetaStorageError> {
if log_time_ms == 0 {
return Ok(vec![]);
}

let at_most = 32;
let mut to_clean = Vec::with_capacity(at_most);

info!("list_expired_kv, log_time_ts: {}", log_time_ms);

let expires = self.sm_tree.key_space::<Expire>();

let it = expires.range(..)?.take(at_most);
for item_res in it {
let item = item_res?;
let k: ExpireKey = item.key()?;
if log_time_ms > k.time_ms {
let v: ExpireValue = item.value()?;
to_clean.push((v.key, k))
}
}

Ok(to_clean)
}

/// Remove expired key-values, and corresponding secondary expiration index record.
///
/// This should be done inside a sled-transaction.
#[tracing::instrument(level = "debug", skip_all)]
fn clean_expired_kvs(
&self,
key: &str,
txn_tree: &TransactionSledTree,
) -> Result<u64, MetaStorageError> {
expired: &[(String, ExpireKey)],
) -> Result<(), MetaStorageError> {
let kvs = txn_tree.key_space::<GenericKV>();
let expires = txn_tree.key_space::<Expire>();

for (key, expire_key) in expired.iter() {
let sv = kvs.get(key)?;

if let Some(seq_v) = &sv {
if expire_key.seq == seq_v.seq {
info!("clean expired: {}, {}", key, expire_key);

kvs.remove(key)?;
expires.remove(expire_key)?;

if let Some(subscriber) = &self.subscriber {
subscriber.kv_changed(key, sv, None);
}
continue;
}
}

unreachable!(
"trying to remove un-cleanable: {}, {}, kv-record: {:?}",
key, expire_key, sv
);
}
Ok(())
}

fn txn_incr_seq(key: &str, txn_tree: &TransactionSledTree) -> Result<u64, MetaStorageError> {
let seqs = txn_tree.key_space::<Sequences>();

let key = key.to_string();
Expand All @@ -809,43 +900,109 @@ impl StateMachine {
Ok(new_value.0)
}

/// Execute an upsert-kv operation on a transactional sled tree.
///
/// KV has two indexes:
/// - The primary index: `key -> (seq, meta(expire_time), value)`,
/// - and a secondary expiration index: `(expire_time, seq) -> key`.
///
/// Thus upsert a kv record is done in two steps:
/// update the primary index and optionally update the secondary index.
///
/// It returns 3 SeqV:
/// - `(None, None, x)`: upsert nonexistent key;
/// - `(None, Some, x)`: upsert existent and non-expired key;
/// - `(Some, None, x)`: upsert existent but expired key;
#[allow(clippy::type_complexity)]
fn txn_upsert_kv(
&self,
txn_tree: &TransactionSledTree,
upsert_kv: &UpsertKV,
log_time_ms: u64,
) -> Result<(Option<SeqV>, Option<SeqV>), MetaStorageError> {
) -> Result<(Option<SeqV>, Option<SeqV>, Option<SeqV>), MetaStorageError> {
let (expired, prev, res) =
Self::txn_upsert_kv_primary_index(txn_tree, upsert_kv, log_time_ms)?;

let expires = txn_tree.key_space::<Expire>();

if let Some(sv) = &expired {
if let Some(m) = &sv.meta {
if let Some(exp) = m.expire_at {
expires.remove(&ExpireKey::new(exp * 1000, sv.seq))?;
}
}
}

// No change, no need to update expiration index
if prev == res {
return Ok((expired, prev, res));
}

// Remove previous expiration index, add a new one.

if let Some(sv) = &prev {
if let Some(m) = &sv.meta {
if let Some(exp) = m.expire_at {
expires.remove(&ExpireKey::new(exp * 1000, sv.seq))?;
}
}
}

if let Some(sv) = &res {
if let Some(m) = &sv.meta {
if let Some(exp) = m.expire_at {
let k = ExpireKey::new(exp * 1000, sv.seq);
let v = ExpireValue {
key: upsert_kv.key.clone(),
};
expires.insert(&k, &v)?;
}
}
}

Ok((expired, prev, res))
}

/// It returns 3 SeqV:
/// - The first one is `Some` if an existent record expired.
/// - The second and the third represent the change that is made by the upsert operation.
///
/// Only one of the first and second can be `Some`.
#[allow(clippy::type_complexity)]
fn txn_upsert_kv_primary_index(
txn_tree: &TransactionSledTree,
upsert_kv: &UpsertKV,
log_time_ms: u64,
) -> Result<(Option<SeqV>, Option<SeqV>, Option<SeqV>), MetaStorageError> {
let kvs = txn_tree.key_space::<GenericKV>();

let prev = kvs.get(&upsert_kv.key)?;

// If prev is timed out, treat it as a None.
let (_original, prev) = Self::expire_seq_v(prev, log_time_ms);
// If prev is timed out, treat it as a None. But still keep the original value for cleaning up it.
let (expired, prev) = Self::expire_seq_v(prev, log_time_ms);

if upsert_kv.seq.match_seq(&prev).is_err() {
return Ok((prev.clone(), prev));
return Ok((expired, prev.clone(), prev));
}

let mut new_seq_v = match &upsert_kv.value {
Operation::Update(v) => SeqV::with_meta(0, upsert_kv.value_meta.clone(), v.clone()),
Operation::Delete => {
kvs.remove(&upsert_kv.key)?;
return Ok((prev, None));
return Ok((expired, prev, None));
}
Operation::AsIs => match prev {
None => return Ok((prev, None)),
None => return Ok((expired, prev, None)),
Some(ref prev_kv_value) => {
prev_kv_value.clone().set_meta(upsert_kv.value_meta.clone())
}
},
};

new_seq_v.seq = self.txn_incr_seq(GenericKV::NAME, txn_tree)?;
new_seq_v.seq = Self::txn_incr_seq(GenericKV::NAME, txn_tree)?;
kvs.insert(&upsert_kv.key, &new_seq_v)?;

debug!("applied upsert: {:?} res: {:?}", upsert_kv, new_seq_v);
Ok((prev, Some(new_seq_v)))
Ok((expired, prev, Some(new_seq_v)))
}

fn txn_client_last_resp_update(
Expand Down
Loading

0 comments on commit 32df67d

Please sign in to comment.