diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index dae8605c6b40a..d2d5c915cb46a 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -90,6 +90,12 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id"; // const TREE_META: &str = "meta"; const TREE_STATE_MACHINE: &str = "state_machine"; +/// StateMachine subscriber trait +#[async_trait::async_trait] +pub trait StateMachineSubscriber: Debug + Sync + Send { + async fn kv_changed(&self, key: &str, prev: &Option, meta: &Option); +} + /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: /// `last_applied_logs` and `client_serial_responses` to achieve idempotence. @@ -99,6 +105,9 @@ pub struct StateMachine { /// - Store initialization state and last applied in keyspace `StateMachineMeta`. /// - Every other state is store in its own keyspace such as `Nodes`. pub sm_tree: SledTree, + + /// subscriber of statemachine data + pub subscriber: Option>, } /// A key-value pair in a snapshot is a vec of two `Vec`. @@ -149,7 +158,10 @@ impl StateMachine { let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?; - let sm = StateMachine { sm_tree }; + let sm = StateMachine { + sm_tree, + subscriber: None, + }; let inited = { let sm_meta = sm.sm_meta(); @@ -167,6 +179,10 @@ impl StateMachine { } } + pub fn set_subscriber(&mut self, subscriber: Box) { + self.subscriber = Some(subscriber); + } + /// Create a snapshot. /// /// Returns: @@ -344,10 +360,10 @@ impl StateMachine { let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?; let db_lookup_tree = txn_tree.key_space::(); - + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); let (prev, result) = self.txn_sub_tree_upsert( &db_lookup_tree, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), + &db_key, &MatchSeq::Exact(0), Operation::Update(db_id), None, @@ -405,13 +421,9 @@ impl StateMachine { ) -> MetaStorageResult { let dbs = txn_tree.key_space::(); - let (prev, result) = self.txn_sub_tree_upsert( - &dbs, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), - &MatchSeq::Any, - Operation::Delete, - None, - )?; + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); + let (prev, result) = + self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?; assert!( result.is_none(), @@ -466,6 +478,7 @@ impl StateMachine { if result.is_some() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + Ok(AppliedState::TableMeta(Change::new_with_id( table_id.unwrap(), prev, @@ -490,6 +503,7 @@ impl StateMachine { if result.is_none() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + Ok(Change::new_with_id(table_id.unwrap(), prev, result).into()) } @@ -530,6 +544,7 @@ impl StateMachine { new_db_name, new_table_name ); + Ok(AppliedState::TableMeta(Change::new_with_id( new_table_id.unwrap(), prev, @@ -547,15 +562,21 @@ impl StateMachine { txn_tree: &TransactionSledTree, ) -> MetaStorageResult { let sub_tree = txn_tree.key_space::(); + let key_str = key.to_string(); let (prev, result) = self.txn_sub_tree_upsert( &sub_tree, - &key.to_string(), + &key_str, seq, value_op.clone(), value_meta.clone(), )?; tracing::debug!("applied UpsertKV: {} {:?}", key, result); + + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.kv_changed(&key_str, &prev, &result); + } + Ok(Change::new(prev, result).into()) }