Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simplify MapApi #13063

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/meta/raft-store/src/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,22 @@ impl<'a> Applier<'a> {
Change::new(prev, result).into()
}

// TODO(1): when get an applier, pass in a now_ms to ensure all expired are cleaned.
/// Update or insert a kv entry.
///
/// If the input entry has expired, it performs a delete operation.
#[minitrace::trace]
async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option<SeqV>, Option<SeqV>) {
pub(crate) async fn upsert_kv(&mut self, upsert_kv: &UpsertKV) -> (Option<SeqV>, Option<SeqV>) {
debug!(upsert_kv = as_debug!(upsert_kv); "upsert_kv");

let (prev, result) = self.sm.upsert_kv(upsert_kv.clone()).await;
let (prev, result) = self.sm.upsert_kv_primary_index(upsert_kv).await;

self.sm
.update_expire_index(&upsert_kv.key, &prev, &result)
.await;

let prev = Into::<Option<SeqV>>::into(prev);
let result = Into::<Option<SeqV>>::into(result);

debug!(
"applied UpsertKV: {:?}; prev: {:?}; result: {:?}",
Expand All @@ -209,7 +220,6 @@ impl<'a> Applier<'a> {
}

#[minitrace::trace]

async fn apply_txn(&mut self, req: &TxnRequest) -> AppliedState {
debug!(txn = as_display!(req); "apply txn cmd");

Expand Down Expand Up @@ -445,9 +455,7 @@ impl<'a> Applier<'a> {
assert_eq!(expire_key.seq, seq_v.seq);
info!("clean expired: {}, {}", key, expire_key);

self.sm.upsert_kv(UpsertKV::delete(key.clone())).await;
// dbg!("clean_expired", &key, &curr);
self.push_change(key, curr, None);
self.upsert_kv(&UpsertKV::delete(key.clone())).await;
} else {
unreachable!(
"trying to remove un-cleanable: {}, {}, kv-entry: {:?}",
Expand Down
1 change: 1 addition & 0 deletions src/meta/raft-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#![allow(clippy::uninlined_format_args)]
#![feature(impl_trait_in_assoc_type)]
// #![feature(type_alias_impl_trait)]

// #![allow(incomplete_features)]
Expand Down
6 changes: 3 additions & 3 deletions src/meta/raft-store/src/sm_v002/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta_types::LogId;
use common_meta_types::StoredMembership;

use crate::key_spaces::RaftStoreEntry;
use crate::sm_v002::leveled_store::level_data::LevelData;
use crate::sm_v002::leveled_store::level::Level;
use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use crate::sm_v002::marked::Marked;
use crate::state_machine::ExpireKey;
Expand All @@ -29,7 +29,7 @@ use crate::state_machine::StateMachineMetaKey;
/// A container of temp data that are imported to a LevelData.
#[derive(Debug, Default)]
pub struct Importer {
level_data: LevelData,
level_data: Level,

kv: BTreeMap<String, Marked>,
expire: BTreeMap<ExpireKey, Marked<String>>,
Expand Down Expand Up @@ -109,7 +109,7 @@ impl Importer {
Ok(())
}

pub fn commit(mut self) -> LevelData {
pub fn commit(mut self) -> Level {
let d = &mut self.level_data;

d.replace_kv(self.kv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,24 @@ use futures_util::StreamExt;

use crate::sm_v002::leveled_store::map_api::MapApi;
use crate::sm_v002::leveled_store::map_api::MapApiRO;
use crate::sm_v002::leveled_store::map_api::MapKey;
use crate::sm_v002::leveled_store::sys_data::SysData;
use crate::sm_v002::leveled_store::sys_data_api::SysDataApiRO;
use crate::sm_v002::marked::Marked;
use crate::state_machine::ExpireKey;

impl MapKey for String {
type V = Vec<u8>;
}
impl MapKey for ExpireKey {
type V = String;
}

/// A single level of state machine data.
///
/// State machine data is composed of multiple levels.
#[derive(Debug, Default)]
pub struct LevelData {
pub struct Level {
/// System data(non-user data).
sys_data: SysData,

Expand All @@ -42,7 +50,7 @@ pub struct LevelData {
expire: BTreeMap<ExpireKey, Marked<String>>,
}

impl LevelData {
impl Level {
/// Create a new level that is based on this level.
pub(crate) fn new_level(&self) -> Self {
Self {
Expand Down Expand Up @@ -74,35 +82,36 @@ impl LevelData {
}

#[async_trait::async_trait]
impl MapApiRO<String> for LevelData {
type V = Vec<u8>;

async fn get<Q>(&self, key: &Q) -> Marked<Self::V>
impl MapApiRO<String> for Level {
async fn get<Q>(&self, key: &Q) -> Marked<<String as MapKey>::V>
where
String: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
{
self.kv.get(key).cloned().unwrap_or(Marked::empty())
}

async fn range<'a, T, R>(&'a self, range: R) -> BoxStream<'a, (String, Marked)>
async fn range<'f, Q, R>(
&'f self,
range: R,
) -> BoxStream<'f, (String, Marked<<String as MapKey>::V>)>
where
String: 'a,
String: Borrow<T>,
T: Ord + ?Sized,
R: RangeBounds<T> + Send,
String: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
R: RangeBounds<Q> + Clone + Send + Sync,
{
futures::stream::iter(self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()))).boxed()
let it = self.kv.range(range).map(|(k, v)| (k.clone(), v.clone()));
futures::stream::iter(it).boxed()
}
}

#[async_trait::async_trait]
impl MapApi<String> for LevelData {
impl MapApi<String> for Level {
async fn set(
&mut self,
key: String,
value: Option<(Self::V, Option<KVMeta>)>,
) -> (Marked<Self::V>, Marked<Self::V>) {
value: Option<(<String as MapKey>::V, Option<KVMeta>)>,
) -> (Marked<<String as MapKey>::V>, Marked<<String as MapKey>::V>) {
// The chance it is the bottom level is very low in a loaded system.
// Thus we always tombstone the key if it is None.

Expand All @@ -117,33 +126,30 @@ impl MapApi<String> for LevelData {
Marked::new_tomb_stone(seq)
};

let prev = MapApiRO::<String>::get(self, key.as_str()).await;
let prev = MapApiRO::<String>::get(&*self, key.as_str()).await;
self.kv.insert(key, marked.clone());
(prev, marked)
}
}

#[async_trait::async_trait]
impl MapApiRO<ExpireKey> for LevelData {
type V = String;

async fn get<Q>(&self, key: &Q) -> Marked<Self::V>
impl MapApiRO<ExpireKey> for Level {
async fn get<Q>(&self, key: &Q) -> Marked<<ExpireKey as MapKey>::V>
where
ExpireKey: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
{
self.expire.get(key).cloned().unwrap_or(Marked::empty())
}

async fn range<'a, T: ?Sized, R>(
&'a self,
async fn range<'f, Q, R>(
&'f self,
range: R,
) -> BoxStream<'a, (ExpireKey, Marked<String>)>
) -> BoxStream<'f, (ExpireKey, Marked<<ExpireKey as MapKey>::V>)>
where
ExpireKey: 'a,
ExpireKey: Borrow<T>,
T: Ord,
R: RangeBounds<T> + Send,
ExpireKey: Borrow<Q>,
Q: Ord + Send + Sync + ?Sized,
R: RangeBounds<Q> + Clone + Send + Sync,
{
let it = self
.expire
Expand All @@ -155,12 +161,15 @@ impl MapApiRO<ExpireKey> for LevelData {
}

#[async_trait::async_trait]
impl MapApi<ExpireKey> for LevelData {
impl MapApi<ExpireKey> for Level {
async fn set(
&mut self,
key: ExpireKey,
value: Option<(Self::V, Option<KVMeta>)>,
) -> (Marked<Self::V>, Marked<Self::V>) {
value: Option<(<ExpireKey as MapKey>::V, Option<KVMeta>)>,
) -> (
Marked<<ExpireKey as MapKey>::V>,
Marked<<ExpireKey as MapKey>::V>,
) {
// dbg!("set expire", &key, &value);

let seq = self.curr_seq();
Expand All @@ -171,13 +180,13 @@ impl MapApi<ExpireKey> for LevelData {
Marked::TombStone { internal_seq: seq }
};

let prev = MapApiRO::<ExpireKey>::get(self, &key).await;
let prev = MapApiRO::<ExpireKey>::get(&*self, &key).await;
self.expire.insert(key, marked.clone());
(prev, marked)
}
}

impl AsRef<SysData> for LevelData {
impl AsRef<SysData> for Level {
fn as_ref(&self) -> &SysData {
&self.sys_data
}
Expand Down
Loading
Loading