Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add state machine range and subscribe API #4608

Merged
merged 15 commits into from
Apr 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id";
// const TREE_META: &str = "meta";
const TREE_STATE_MACHINE: &str = "state_machine";

/// StateMachine subscriber trait
#[async_trait::async_trait]
pub trait StateMachineSubscriber: Debug + Sync + Send {
async fn kv_changed(&self, key: &str, prev: &Option<SeqV>, meta: &Option<SeqV>);
}

/// The state machine of the `MemStore`.
/// It includes user data and two raft-related informations:
/// `last_applied_logs` and `client_serial_responses` to achieve idempotence.
Expand All @@ -99,6 +105,9 @@ pub struct StateMachine {
/// - Store initialization state and last applied in keyspace `StateMachineMeta`.
/// - Every other state is store in its own keyspace such as `Nodes`.
pub sm_tree: SledTree,

/// subscriber of statemachine data
pub subscriber: Option<Box<dyn StateMachineSubscriber>>,
}

/// A key-value pair in a snapshot is a vec of two `Vec<u8>`.
Expand Down Expand Up @@ -149,7 +158,10 @@ impl StateMachine {

let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?;

let sm = StateMachine { sm_tree };
let sm = StateMachine {
sm_tree,
subscriber: None,
};

let inited = {
let sm_meta = sm.sm_meta();
Expand All @@ -167,6 +179,10 @@ impl StateMachine {
}
}

pub fn set_subscriber(&mut self, subscriber: Box<dyn StateMachineSubscriber>) {
self.subscriber = Some(subscriber);
}

/// Create a snapshot.
///
/// Returns:
Expand Down Expand Up @@ -344,10 +360,10 @@ impl StateMachine {
let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?;

let db_lookup_tree = txn_tree.key_space::<DatabaseLookup>();

let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string());
let (prev, result) = self.txn_sub_tree_upsert(
&db_lookup_tree,
&DatabaseLookupKey::new(tenant.to_string(), name.to_string()),
&db_key,
&MatchSeq::Exact(0),
Operation::Update(db_id),
None,
Expand Down Expand Up @@ -405,13 +421,9 @@ impl StateMachine {
) -> MetaStorageResult<AppliedState> {
let dbs = txn_tree.key_space::<DatabaseLookup>();

let (prev, result) = self.txn_sub_tree_upsert(
&dbs,
&DatabaseLookupKey::new(tenant.to_string(), name.to_string()),
&MatchSeq::Any,
Operation::Delete,
None,
)?;
let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string());
let (prev, result) =
self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?;

assert!(
result.is_none(),
Expand Down Expand Up @@ -466,6 +478,7 @@ impl StateMachine {
if result.is_some() {
self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?;
}

Ok(AppliedState::TableMeta(Change::new_with_id(
table_id.unwrap(),
prev,
Expand All @@ -490,6 +503,7 @@ impl StateMachine {
if result.is_none() {
self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?;
}

Ok(Change::new_with_id(table_id.unwrap(), prev, result).into())
}

Expand Down Expand Up @@ -530,6 +544,7 @@ impl StateMachine {
new_db_name,
new_table_name
);

Ok(AppliedState::TableMeta(Change::new_with_id(
new_table_id.unwrap(),
prev,
Expand All @@ -547,15 +562,21 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
) -> MetaStorageResult<AppliedState> {
let sub_tree = txn_tree.key_space::<GenericKV>();
let key_str = key.to_string();
let (prev, result) = self.txn_sub_tree_upsert(
&sub_tree,
&key.to_string(),
&key_str,
seq,
value_op.clone(),
value_meta.clone(),
)?;

tracing::debug!("applied UpsertKV: {} {:?}", key, result);

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.kv_changed(&key_str, &prev, &result);
}

Ok(Change::new(prev, result).into())
}

Expand Down