Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add state machine range and subscribe API #4608

Merged
merged 15 commits into from
Apr 1, 2022
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 95 additions & 11 deletions common/meta/raft-store/src/state_machine/sm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,42 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id";
// const TREE_META: &str = "meta";
const TREE_STATE_MACHINE: &str = "state_machine";

pub enum DataChangeType {
Create,
Update,
Drop,
}
lichuang marked this conversation as resolved.
Show resolved Hide resolved

/// StateMachine subscriber trait
#[async_trait::async_trait]
pub trait StateMachineSubscriber: Debug + Sync + Send {
async fn database_changed(
&self,
change_type: DataChangeType,
key: &DatabaseLookupKey,
prev: &Option<SeqV<DatabaseMeta>>,
meta: &Option<SeqV<DatabaseMeta>>,
);
lichuang marked this conversation as resolved.
Show resolved Hide resolved

async fn table_changed(
&self,
change_type: DataChangeType,
table_name: &str,
lichuang marked this conversation as resolved.
Show resolved Hide resolved
lichuang marked this conversation as resolved.
Show resolved Hide resolved
prev: &Option<SeqV<TableMeta>>,
meta: &Option<SeqV<TableMeta>>,
);

async fn kv_changed(
&self,
change_type: DataChangeType,
key: &str,
prev: &Option<SeqV>,
meta: &Option<SeqV>,
);
}

//dyn_clone::clone_trait_object!(StateMachineSubscriber);
lichuang marked this conversation as resolved.
Show resolved Hide resolved

/// The state machine of the `MemStore`.
/// It includes user data and two raft-related informations:
/// `last_applied_logs` and `client_serial_responses` to achieve idempotence.
Expand All @@ -99,6 +135,9 @@ pub struct StateMachine {
/// - Store initialization state and last applied in keyspace `StateMachineMeta`.
/// - Every other state is store in its own keyspace such as `Nodes`.
pub sm_tree: SledTree,

/// subscriber of statemachine data
pub subscriber: Option<Box<dyn StateMachineSubscriber>>,
}

/// A key-value pair in a snapshot is a vec of two `Vec<u8>`.
Expand Down Expand Up @@ -149,7 +188,10 @@ impl StateMachine {

let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?;

let sm = StateMachine { sm_tree };
let sm = StateMachine {
sm_tree,
subscriber: None,
};

let inited = {
let sm_meta = sm.sm_meta();
Expand All @@ -167,6 +209,10 @@ impl StateMachine {
}
}

pub fn set_subscriber(&mut self, subscriber: Box<dyn StateMachineSubscriber>) {
self.subscriber = Some(subscriber);
}

/// Create a snapshot.
///
/// Returns:
Expand Down Expand Up @@ -344,10 +390,10 @@ impl StateMachine {
let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?;

let db_lookup_tree = txn_tree.key_space::<DatabaseLookup>();

let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string());
let (prev, result) = self.txn_sub_tree_upsert(
&db_lookup_tree,
&DatabaseLookupKey::new(tenant.to_string(), name.to_string()),
&db_key,
&MatchSeq::Exact(0),
Operation::Update(db_id),
None,
Expand Down Expand Up @@ -389,6 +435,15 @@ impl StateMachine {
result
);

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.database_changed(
DataChangeType::Create,
&db_key,
&prev_meta,
&result_meta,
);
}

Ok(AppliedState::DatabaseMeta(Change::new_with_id(
db_id,
prev_meta,
Expand All @@ -405,13 +460,9 @@ impl StateMachine {
) -> MetaStorageResult<AppliedState> {
let dbs = txn_tree.key_space::<DatabaseLookup>();

let (prev, result) = self.txn_sub_tree_upsert(
&dbs,
&DatabaseLookupKey::new(tenant.to_string(), name.to_string()),
&MatchSeq::Any,
Operation::Delete,
None,
)?;
let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string());
let (prev, result) =
self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?;

assert!(
result.is_none(),
Expand All @@ -431,6 +482,15 @@ impl StateMachine {

tracing::debug!("applied drop Database: {} {:?}", name, result);

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.database_changed(
DataChangeType::Drop,
&db_key,
&prev_meta,
&result_meta,
);
}

return Ok(AppliedState::DatabaseMeta(Change::new_with_id(
db_id,
prev_meta,
Expand Down Expand Up @@ -466,6 +526,11 @@ impl StateMachine {
if result.is_some() {
self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?;
}

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.table_changed(DataChangeType::Create, table_name, &prev, &result);
}

Ok(AppliedState::TableMeta(Change::new_with_id(
table_id.unwrap(),
prev,
Expand All @@ -490,6 +555,9 @@ impl StateMachine {
if result.is_none() {
self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?;
}
if let Some(subscriber) = &self.subscriber {
let _ = subscriber.table_changed(DataChangeType::Drop, table_name, &prev, &result);
}
Ok(Change::new_with_id(table_id.unwrap(), prev, result).into())
}

Expand Down Expand Up @@ -530,6 +598,16 @@ impl StateMachine {
new_db_name,
new_table_name
);

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.table_changed(
DataChangeType::Update,
table_name,
&new_prev,
&new_result,
);
}

Ok(AppliedState::TableMeta(Change::new_with_id(
new_table_id.unwrap(),
prev,
Expand All @@ -547,15 +625,21 @@ impl StateMachine {
txn_tree: &TransactionSledTree,
) -> MetaStorageResult<AppliedState> {
let sub_tree = txn_tree.key_space::<GenericKV>();
let key_str = key.to_string();
let (prev, result) = self.txn_sub_tree_upsert(
&sub_tree,
&key.to_string(),
&key_str,
seq,
value_op.clone(),
value_meta.clone(),
)?;

tracing::debug!("applied UpsertKV: {} {:?}", key, result);

if let Some(subscriber) = &self.subscriber {
let _ = subscriber.kv_changed(DataChangeType::Update, &key_str, &prev, &result);
}

Ok(Change::new(prev, result).into())
}

Expand Down