Skip to content

Commit

Permalink
refactor: simplify MapApi
Browse files Browse the repository at this point in the history
- Add trait `MapKey` and `MapValue` to define behavior of key values
  that are used in `MapApi`.

- Add Ref and RefMut as container of reference to leveled data.

- Collect get() and range() implementation into function.

The MapApi trait can not be generalized to adapt arbitrary lifetime,
such as using `self` instead of `&self`: `MapApiRO { fn get(self, key) }`.
Because there is a known limitation with rust GAT and hihger ranked
lifetime: See: rust-lang/rust#114046

```
error: implementation of `MapApiRO` is not general enough
  --> src/meta/raft-store/src/sm_v002/sm_v002.rs:80:74
   |
80 |       async fn get_kv(&self, key: &str) -> Result<GetKVReply, Self::Error> {
   |  __________________________________________________________________________^
81 | |         let got = self.sm.get_kv(key).await;
82 | |
83 | |         let local_now_ms = SeqV::<()>::now_ms();
84 | |         let got = Self::non_expired(got, local_now_ms);
85 | |         Ok(got)
86 | |     }
   | |_____^ implementation of `MapApiRO` is not general enough
   |
   = note: `MapApiRO<'1, std::string::String>` would have to be implemented for the type `&'0 LevelData`, for any two lifetimes `'0` and `'1`...
   = note: ...but `MapApiRO<'2, std::string::String>` is actually implemented for the type `&'2 LevelData`, for some specific lifetime `'2`
```
  • Loading branch information
drmingdrmer committed Sep 29, 2023
1 parent b17e271 commit c7a53db
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 178 deletions.
20 changes: 14 additions & 6 deletions src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,22 @@ impl<'a> Applier<'a> {
Change::new(prev, result).into()
}

// TODO(1): when get an applier, pass in a now_ms to ensure all expired are cleaned.
/// Update or insert a kv entry.
///
/// If the input entry has expired, it performs a delete operation.
#[minitrace::trace]
async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option<SeqV>, Option<SeqV>) {
pub(crate) async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option<SeqV>, Option<SeqV>) {
debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv");

let (prev, result) = self.sm.upsert_kv(upsert_kv.clone()).await;
let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await;

self.sm
.update_expire_index(&upsert_kv.key, &prev, &result)
.await;

let prev = Into::<Option<SeqV>>::into(prev);
let result = Into::<Option<SeqV>>::into(result);

debug!(
"applied UpsertKV: {:?}; prev: {:?}; result: {:?}",
Expand All @@ -209,7 +220,6 @@ impl<'a> Applier<'a> {
}

#[minitrace::trace]

async fn apply_txn(&mut self, req: &TxnRequest) -> AppliedState {
debug!(txn = as_display!(req); "apply txn cmd");

Expand Down Expand Up @@ -445,9 +455,7 @@ impl<'a> Applier<'a> {
assert_eq!(expire_key.seq, seq_v.seq);
info!("clean expired: {}, {}", key, expire_key);

self.sm.upsert_kv(UpsertKV::delete(key.clone())).await;
// dbg!("clean_expired", &key, &curr);
self.push_change(key, curr, None);
self.upsert_kv(&UpsertKV::delete(key.clone())).await;
} else {
unreachable!(
"trying to remove un-cleanable: {}, {}, kv-entry: {:?}",
Expand Down
1 change: 1 addition & 0 deletions src/meta/raft-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![allow(clippy::uninlined_format_args)]
#![feature(impl_trait_in_assoc_type)]
// #![feature(type_alias_impl_trait)]

// #![allow(incomplete_features)]
Expand Down
59 changes: 34 additions & 25 deletions src/meta/raft-store/src/sm_v002/leveled_store/level_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,19 @@ use futures_util::StreamExt;

use crate::sm_v002::leveled_store::map_api::MapApi;
use crate::sm_v002::leveled_store::map_api::MapApiRO;
use crate::sm_v002::leveled_store::map_api::MapKey;
use crate::sm_v002::leveled_store::sys_data::SysData;
use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use crate::sm_v002::marked::Marked;
use crate::state_machine::ExpireKey;

impl MapKey for String {
type V = Vec<u8>;
}
impl MapKey for ExpireKey {
type V = String;
}

/// A single level of state machine data.
///
/// State machine data is composed of multiple levels.
Expand Down Expand Up @@ -75,24 +83,25 @@ impl LevelData {

#[async_trait::async_trait]
impl MapApiRO<String> for LevelData {
type V = Vec<u8>;

async fn get<Q>(&self, key: &Q) -> Marked<Self::V>
async fn get<Q>(&self, key: &Q) -> Marked<<String as MapKey>::V>
where
String: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
{
self.kv.get(key).cloned().unwrap_or(Marked::empty())
}

async fn range<'a, T, R>(&'a self, range: R) -> BoxStream<'a, (String, Marked)>
async fn range<'f, Q, R>(
&'f self,
range: R,
) -> BoxStream<'f, (String, Marked<<String as MapKey>::V>)>
where
String: 'a,
String: Borrow<T>,
T: Ord + ?Sized,
R: RangeBounds<T> + Send,
String: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
R: RangeBounds<Q> + Clone + Send + Sync,
{
futures::stream::iter(self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()))).boxed()
let it = self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()));
futures::stream::iter(it).boxed()
}
}

Expand All @@ -101,8 +110,8 @@ impl MapApi<String> for LevelData {
async fn set(
&mut self,
key: String,
value: Option<(Self::V, Option<KVMeta>)>,
) -> (Marked<Self::V>, Marked<Self::V>) {
value: Option<(<String as MapKey>::V, Option<KVMeta>)>,
) -> (Marked<<String as MapKey>::V>, Marked<<String as MapKey>::V>) {
// The chance it is the bottom level is very low in a loaded system.
// Thus we always tombstone the key if it is None.

Expand All @@ -117,33 +126,30 @@ impl MapApi<String> for LevelData {
Marked::new_tomb_stone(seq)
};

let prev = MapApiRO::<String>::get(self, key.as_str()).await;
let prev = MapApiRO::<String>::get(&*self, key.as_str()).await;
self.kv.insert(key, marked.clone());
(prev, marked)
}
}

#[async_trait::async_trait]
impl MapApiRO<ExpireKey> for LevelData {
type V = String;

async fn get<Q>(&self, key: &Q) -> Marked<Self::V>
async fn get<Q>(&self, key: &Q) -> Marked<<ExpireKey as MapKey>::V>
where
ExpireKey: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
{
self.expire.get(key).cloned().unwrap_or(Marked::empty())
}

async fn range<'a, T: ?Sized, R>(
&'a self,
async fn range<'f, Q, R>(
&'f self,
range: R,
) -> BoxStream<'a, (ExpireKey, Marked<String>)>
) -> BoxStream<'f, (ExpireKey, Marked<<ExpireKey as MapKey>::V>)>
where
ExpireKey: 'a,
ExpireKey: Borrow<T>,
T: Ord,
R: RangeBounds<T> + Send,
ExpireKey: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
R: RangeBounds<Q> + Clone + Send + Sync,
{
let it = self
.expire
Expand All @@ -159,8 +165,11 @@ impl MapApi<ExpireKey> for LevelData {
async fn set(
&mut self,
key: ExpireKey,
value: Option<(Self::V, Option<KVMeta>)>,
) -> (Marked<Self::V>, Marked<Self::V>) {
value: Option<(<ExpireKey as MapKey>::V, Option<KVMeta>)>,
) -> (
Marked<<ExpireKey as MapKey>::V>,
Marked<<ExpireKey as MapKey>::V>,
) {
// dbg!("set expire", &key, &value);

let seq = self.curr_seq();
Expand All @@ -171,7 +180,7 @@ impl MapApi<ExpireKey> for LevelData {
Marked::TombStone { internal_seq: seq }
};

let prev = MapApiRO::<ExpireKey>::get(self, &key).await;
let prev = MapApiRO::<ExpireKey>::get(&*self, &key).await;
self.expire.insert(key, marked.clone());
(prev, marked)
}
Expand Down
Loading

0 comments on commit c7a53db

Please sign in to comment.