Skip to content

Commit

Permalink
Merge pull request #4608 from lichuang/add_sm_range_subscribe_api
Browse files Browse the repository at this point in the history
Feature: add state machine range and subscribe API
  • Loading branch information
BohuTANG authored Apr 1, 2022
2 parents 26fc9f0 + 281b5b2 commit 0278dfd
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id";
// const TREE_META: &str = "meta";
const TREE_STATE_MACHINE: &str = "state_machine";

/// StateMachine subscriber trait
#[async_trait::async_trait]
pub trait StateMachineSubscriber: Debug + Sync + Send {
async fn kv_changed(&self, key: &str, prev: &Option<SeqV>, meta: &Option<SeqV>);
}

/// The state machine of the `MemStore`.
/// It includes user data and two raft-related informations:
/// `last_applied_logs` and `client_serial_responses` to achieve idempotence.
Expand All @@ -99,6 +105,9 @@ pub struct StateMachine {
/// - Store initialization state and last applied in keyspace `StateMachineMeta`.
/// - Every other state is store in its own keyspace such as `Nodes`.
pub sm_tree: SledTree,

/// subscriber of statemachine data
pub subscriber: Option<Box<dyn StateMachineSubscriber>>,
}

/// A key-value pair in a snapshot is a vec of two `Vec<u8>`.
Expand Down Expand Up @@ -149,7 +158,10 @@ impl StateMachine {

let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?;

let sm = StateMachine { sm_tree };
let sm = StateMachine {
sm_tree,
subscriber: None,
};

let inited = {
let sm_meta = sm.sm_meta();
Expand All @@ -167,6 +179,10 @@ impl StateMachine {
}
}

pub fn set_subscriber(&mut self, subscriber: Box<dyn StateMachineSubscriber>) {
self.subscriber = Some(subscriber);
}

/// Create a snapshot.
///
/// Returns:
Expand Down Expand Up @@ -344,10 +360,10 @@ impl StateMachine {
let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?;

let db_lookup_tree = txn_tree.key_space::<DatabaseLookup>();

let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string());
let (prev, result) = self.txn_sub_tree_upsert(
&db_lookup_tree,
&DatabaseLookupKey::new(tenant.to_string(), name.to_string()),
&db_key,
&MatchSeq::Exact(0),
Operation::Update(db_id),
None,
Expand Down Expand Up @@ -405,13 +421,9 @@ impl StateMachine {
) -> MetaStorageResult<AppliedState> {
let dbs = txn_tree.key_space::<DatabaseLookup>();

let (prev, result) = self.txn_sub_tree_upsert(
&dbs,
&DatabaseLookupKey::new(tenant.to_string(), name.to_string()),
&MatchSeq::Any,
Operation::Delete,
None,
)?;
let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string());
let (prev, result) =
self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?;

assert!(
result.is_none(),
Expand Down Expand Up @@ -466,6 +478,7 @@ impl StateMachine {
if result.is_some() {
self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?;
}

Ok(AppliedState::TableMeta(Change::new_with_id(
table_id.unwrap(),
prev,
Expand All @@ -490,6 +503,7 @@ impl StateMachine {
if result.is_none() {
self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?;
}

Ok(Change::new_with_id(table_id.unwrap(), prev, result).into())
}

Expand Down Expand Up @@ -530,6 +544,7 @@ impl StateMachine {
new_db_name,
new_table_name
);

Ok(AppliedState::TableMeta(Change::new_with_id(
new_table_id.unwrap(),
prev,
Expand All @@ -547,15 +562,21 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
) -> MetaStorageResult<AppliedState> {
let sub_tree = txn_tree.key_space::<GenericKV>();
let key_str = key.to_string();
let (prev, result) = self.txn_sub_tree_upsert(
&sub_tree,
&key.to_string(),
&key_str,
seq,
value_op.clone(),
value_meta.clone(),
)?;

tracing::debug!("applied UpsertKV: {} {:?}", key, result);

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.kv_changed(&key_str, &prev, &result);
}

Ok(Change::new(prev, result).into())
}

Expand Down

0 comments on commit 0278dfd

Please sign in to comment.