From 1832b548421f47ca89b3a7a2c9da413e44c60bce Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 28 Mar 2022 15:24:39 +0800 Subject: [PATCH 1/8] Feature: add state machine range and subscribe API --- common/meta/raft-store/Cargo.toml | 1 + .../meta/raft-store/src/state_machine/sm.rs | 168 ++++++++++++++++-- 2 files changed, 158 insertions(+), 11 deletions(-) diff --git a/common/meta/raft-store/Cargo.toml b/common/meta/raft-store/Cargo.toml index 4ce0eb9788cf..7ed541e94d51 100644 --- a/common/meta/raft-store/Cargo.toml +++ b/common/meta/raft-store/Cargo.toml @@ -26,6 +26,7 @@ async-trait = "0.1.52" bytes = "1.1.0" clap = { version = "3.1.3", features = ["derive", "env"] } derive_more = "0.99.17" +dyn-clone = "1.0.4" hostname = "0.3.1" maplit = "1.0.2" once_cell = "1.9.0" diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index dae8605c6b40..a88abe29c9c2 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -14,6 +14,8 @@ use std::convert::TryInto; use std::fmt::Debug; +use std::fmt::Display; +use std::ops::RangeBounds; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -53,6 +55,7 @@ use common_meta_types::UnknownDatabaseId; use common_meta_types::UnknownTable; use common_meta_types::UnknownTableId; use common_tracing::tracing; +use dyn_clone::DynClone; use openraft::raft::Entry; use openraft::raft::EntryPayload; use serde::Deserialize; @@ -90,6 +93,45 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id"; // const TREE_META: &str = "meta"; const TREE_STATE_MACHINE: &str = "state_machine"; +/// database meta data kev-value type +pub type DatabaseMetaKV = (DatabaseLookupKey, DatabaseMeta); + +pub enum DataChangeType { + Create, + Update, + Drop, +} + +/// StateMachine subscriber trait +#[async_trait::async_trait] +pub trait StateMachineSubscriber: Display + Debug + Sync + Send + DynClone { + async fn database_changed( + &self, + change_type: DataChangeType, + key: &DatabaseLookupKey, + prev: &Option>, + meta: &Option>, + ); + + async fn table_changed( + &self, + change_type: DataChangeType, + table_name: &str, + prev: &Option>, + meta: &Option>, + ); + + async fn kv_changed( + &self, + change_type: DataChangeType, + key: &str, + prev: &Option, + meta: &Option, + ); +} + +dyn_clone::clone_trait_object!(StateMachineSubscriber); + /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: /// `last_applied_logs` and `client_serial_responses` to achieve idempotence. @@ -99,6 +141,9 @@ pub struct StateMachine { /// - Store initialization state and last applied in keyspace `StateMachineMeta`. /// - Every other state is store in its own keyspace such as `Nodes`. pub sm_tree: SledTree, + + /// subscriber of statemachine data + pub subscriber: Option>, } /// A key-value pair in a snapshot is a vec of two `Vec`. @@ -149,7 +194,10 @@ impl StateMachine { let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?; - let sm = StateMachine { sm_tree }; + let sm = StateMachine { + sm_tree, + subscriber: None, + }; let inited = { let sm_meta = sm.sm_meta(); @@ -167,6 +215,10 @@ impl StateMachine { } } + pub fn set_subscriber(&mut self, subscriber: Box) { + self.subscriber = Some(subscriber); + } + /// Create a snapshot. /// /// Returns: @@ -344,10 +396,10 @@ impl StateMachine { let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?; let db_lookup_tree = txn_tree.key_space::(); - + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); let (prev, result) = self.txn_sub_tree_upsert( &db_lookup_tree, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), + &db_key, &MatchSeq::Exact(0), Operation::Update(db_id), None, @@ -389,6 +441,15 @@ impl StateMachine { result ); + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.database_changed( + DataChangeType::Create, + &db_key, + &prev_meta, + &result_meta, + ); + } + Ok(AppliedState::DatabaseMeta(Change::new_with_id( db_id, prev_meta, @@ -405,13 +466,9 @@ impl StateMachine { ) -> MetaStorageResult { let dbs = txn_tree.key_space::(); - let (prev, result) = self.txn_sub_tree_upsert( - &dbs, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), - &MatchSeq::Any, - Operation::Delete, - None, - )?; + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); + let (prev, result) = + self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?; assert!( result.is_none(), @@ -431,6 +488,15 @@ impl StateMachine { tracing::debug!("applied drop Database: {} {:?}", name, result); + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.database_changed( + DataChangeType::Drop, + &db_key, + &prev_meta, + &result_meta, + ); + } + return Ok(AppliedState::DatabaseMeta(Change::new_with_id( db_id, prev_meta, @@ -466,6 +532,11 @@ impl StateMachine { if result.is_some() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.table_changed(DataChangeType::Create, table_name, &prev, &result); + } + Ok(AppliedState::TableMeta(Change::new_with_id( table_id.unwrap(), prev, @@ -490,6 +561,9 @@ impl StateMachine { if result.is_none() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.table_changed(DataChangeType::Drop, table_name, &prev, &result); + } Ok(Change::new_with_id(table_id.unwrap(), prev, result).into()) } @@ -530,6 +604,16 @@ impl StateMachine { new_db_name, new_table_name ); + + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.table_changed( + DataChangeType::Update, + table_name, + &new_prev, + &new_result, + ); + } + Ok(AppliedState::TableMeta(Change::new_with_id( new_table_id.unwrap(), prev, @@ -547,18 +631,34 @@ impl StateMachine { txn_tree: &TransactionSledTree, ) -> MetaStorageResult { let sub_tree = txn_tree.key_space::(); + let key_str = key.to_string(); let (prev, result) = self.txn_sub_tree_upsert( &sub_tree, - &key.to_string(), + &key_str, seq, value_op.clone(), value_meta.clone(), )?; tracing::debug!("applied UpsertKV: {} {:?}", key, result); + + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.kv_changed(DataChangeType::Update, &key_str, &prev, &result); + } + Ok(Change::new(prev, result).into()) } + pub fn get_kv_by_range( + &self, + range: R, + ) -> MetaStorageResult>)>> + where + R: RangeBounds, + { + self.kvs().range_kvs(range) + } + #[tracing::instrument(level = "debug", skip(self, txn_tree))] fn apply_upsert_table_options_cmd( &self, @@ -1025,6 +1125,29 @@ impl StateMachine { } } + pub fn get_database_meta_by_range( + &self, + range: R, + ) -> MetaStorageResult> + where + R: RangeBounds, + { + let sm_db_ids = self.database_lookup(); + let lookup_keys = sm_db_ids.range_kvs(range)?; + let mut ret = Vec::new(); + for lookup_key in &lookup_keys { + let key = &lookup_key.0; + let id = &lookup_key.1; + let meta = self.get_database_meta_by_id(&id.data); + match meta { + Ok(meta) => ret.push((key.clone(), meta.data)), + Err(_) => continue, + } + } + + Ok(ret) + } + pub fn get_database_meta_by_id(&self, db_id: &u64) -> MetaStorageResult> { let x = self.databases().get(db_id)?.ok_or_else(|| { MetaStorageError::AppError(AppError::UnknownDatabaseId(UnknownDatabaseId::new( @@ -1058,6 +1181,29 @@ impl StateMachine { Ok(x) } + pub fn get_table_meta_by_range( + &self, + range: R, + ) -> MetaStorageResult>)>> + where + R: RangeBounds, + { + let sm_table_ids = self.table_lookup(); + let lookup_keys = sm_table_ids.range_kvs(range)?; + let mut ret = Vec::new(); + for lookup_key in &lookup_keys { + let key = &lookup_key.0; + let id = &lookup_key.1; + let meta = self.get_table_meta_by_id(&id.data.0); + match meta { + Ok(meta) => ret.push((key.clone(), meta)), + Err(_) => continue, + } + } + + Ok(ret) + } + pub fn txn_get_table_meta_by_id( &self, tid: &u64, From efe7f8b256af92863e6167d4c11d35bd3e756467 Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 28 Mar 2022 15:33:23 +0800 Subject: [PATCH 2/8] Feature: add state machine range and subscribe API --- common/meta/raft-store/src/state_machine/sm.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index a88abe29c9c2..f7851fdb602e 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -93,9 +93,6 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id"; // const TREE_META: &str = "meta"; const TREE_STATE_MACHINE: &str = "state_machine"; -/// database meta data kev-value type -pub type DatabaseMetaKV = (DatabaseLookupKey, DatabaseMeta); - pub enum DataChangeType { Create, Update, From 6f34bbed7f3e7086ea2abd29c675103cd30572b1 Mon Sep 17 00:00:00 2001 From: lichuang Date: Tue, 29 Mar 2022 10:06:03 +0800 Subject: [PATCH 3/8] Feature: add state machine range and subscribe API --- common/meta/raft-store/Cargo.toml | 1 - .../meta/raft-store/src/state_machine/sm.rs | 63 +------------------ 2 files changed, 2 insertions(+), 62 deletions(-) diff --git a/common/meta/raft-store/Cargo.toml b/common/meta/raft-store/Cargo.toml index 7ed541e94d51..4ce0eb9788cf 100644 --- a/common/meta/raft-store/Cargo.toml +++ b/common/meta/raft-store/Cargo.toml @@ -26,7 +26,6 @@ async-trait = "0.1.52" bytes = "1.1.0" clap = { version = "3.1.3", features = ["derive", "env"] } derive_more = "0.99.17" -dyn-clone = "1.0.4" hostname = "0.3.1" maplit = "1.0.2" once_cell = "1.9.0" diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index f7851fdb602e..ad76683c6c55 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -14,8 +14,6 @@ use std::convert::TryInto; use std::fmt::Debug; -use std::fmt::Display; -use std::ops::RangeBounds; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -55,7 +53,6 @@ use common_meta_types::UnknownDatabaseId; use common_meta_types::UnknownTable; use common_meta_types::UnknownTableId; use common_tracing::tracing; -use dyn_clone::DynClone; use openraft::raft::Entry; use openraft::raft::EntryPayload; use serde::Deserialize; @@ -101,7 +98,7 @@ pub enum DataChangeType { /// StateMachine subscriber trait #[async_trait::async_trait] -pub trait StateMachineSubscriber: Display + Debug + Sync + Send + DynClone { +pub trait StateMachineSubscriber: Debug + Sync + Send { async fn database_changed( &self, change_type: DataChangeType, @@ -127,7 +124,7 @@ pub trait StateMachineSubscriber: Display + Debug + Sync + Send + DynClone { ); } -dyn_clone::clone_trait_object!(StateMachineSubscriber); +//dyn_clone::clone_trait_object!(StateMachineSubscriber); /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: @@ -646,16 +643,6 @@ impl StateMachine { Ok(Change::new(prev, result).into()) } - pub fn get_kv_by_range( - &self, - range: R, - ) -> MetaStorageResult>)>> - where - R: RangeBounds, - { - self.kvs().range_kvs(range) - } - #[tracing::instrument(level = "debug", skip(self, txn_tree))] fn apply_upsert_table_options_cmd( &self, @@ -1122,29 +1109,6 @@ impl StateMachine { } } - pub fn get_database_meta_by_range( - &self, - range: R, - ) -> MetaStorageResult> - where - R: RangeBounds, - { - let sm_db_ids = self.database_lookup(); - let lookup_keys = sm_db_ids.range_kvs(range)?; - let mut ret = Vec::new(); - for lookup_key in &lookup_keys { - let key = &lookup_key.0; - let id = &lookup_key.1; - let meta = self.get_database_meta_by_id(&id.data); - match meta { - Ok(meta) => ret.push((key.clone(), meta.data)), - Err(_) => continue, - } - } - - Ok(ret) - } - pub fn get_database_meta_by_id(&self, db_id: &u64) -> MetaStorageResult> { let x = self.databases().get(db_id)?.ok_or_else(|| { MetaStorageError::AppError(AppError::UnknownDatabaseId(UnknownDatabaseId::new( @@ -1178,29 +1142,6 @@ impl StateMachine { Ok(x) } - pub fn get_table_meta_by_range( - &self, - range: R, - ) -> MetaStorageResult>)>> - where - R: RangeBounds, - { - let sm_table_ids = self.table_lookup(); - let lookup_keys = sm_table_ids.range_kvs(range)?; - let mut ret = Vec::new(); - for lookup_key in &lookup_keys { - let key = &lookup_key.0; - let id = &lookup_key.1; - let meta = self.get_table_meta_by_id(&id.data.0); - match meta { - Ok(meta) => ret.push((key.clone(), meta)), - Err(_) => continue, - } - } - - Ok(ret) - } - pub fn txn_get_table_meta_by_id( &self, tid: &u64, From df3bcca18774eccb75e41069a2d69a5f18f46585 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 30 Mar 2022 10:29:36 +0800 Subject: [PATCH 4/8] Feature: add state machine range and subscribe API --- .../meta/raft-store/src/state_machine/sm.rs | 133 +----------------- 1 file changed, 5 insertions(+), 128 deletions(-) diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index a88abe29c9c2..a2753ca08dc4 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -14,8 +14,6 @@ use std::convert::TryInto; use std::fmt::Debug; -use std::fmt::Display; -use std::ops::RangeBounds; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -55,7 +53,6 @@ use common_meta_types::UnknownDatabaseId; use common_meta_types::UnknownTable; use common_meta_types::UnknownTableId; use common_tracing::tracing; -use dyn_clone::DynClone; use openraft::raft::Entry; use openraft::raft::EntryPayload; use serde::Deserialize; @@ -93,44 +90,13 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id"; // const TREE_META: &str = "meta"; const TREE_STATE_MACHINE: &str = "state_machine"; -/// database meta data kev-value type -pub type DatabaseMetaKV = (DatabaseLookupKey, DatabaseMeta); - -pub enum DataChangeType { - Create, - Update, - Drop, -} - /// StateMachine subscriber trait #[async_trait::async_trait] -pub trait StateMachineSubscriber: Display + Debug + Sync + Send + DynClone { - async fn database_changed( - &self, - change_type: DataChangeType, - key: &DatabaseLookupKey, - prev: &Option>, - meta: &Option>, - ); - - async fn table_changed( - &self, - change_type: DataChangeType, - table_name: &str, - prev: &Option>, - meta: &Option>, - ); - - async fn kv_changed( - &self, - change_type: DataChangeType, - key: &str, - prev: &Option, - meta: &Option, - ); +pub trait StateMachineSubscriber: Debug + Sync + Send { + async fn kv_changed(&self, key: &str, prev: &Option, meta: &Option); } -dyn_clone::clone_trait_object!(StateMachineSubscriber); +//dyn_clone::clone_trait_object!(StateMachineSubscriber); /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: @@ -441,15 +407,6 @@ impl StateMachine { result ); - if let Some(subscriber) = &self.subscriber { - let _ = subscriber.database_changed( - DataChangeType::Create, - &db_key, - &prev_meta, - &result_meta, - ); - } - Ok(AppliedState::DatabaseMeta(Change::new_with_id( db_id, prev_meta, @@ -488,15 +445,6 @@ impl StateMachine { tracing::debug!("applied drop Database: {} {:?}", name, result); - if let Some(subscriber) = &self.subscriber { - let _ = subscriber.database_changed( - DataChangeType::Drop, - &db_key, - &prev_meta, - &result_meta, - ); - } - return Ok(AppliedState::DatabaseMeta(Change::new_with_id( db_id, prev_meta, @@ -533,10 +481,6 @@ impl StateMachine { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } - if let Some(subscriber) = &self.subscriber { - let _ = subscriber.table_changed(DataChangeType::Create, table_name, &prev, &result); - } - Ok(AppliedState::TableMeta(Change::new_with_id( table_id.unwrap(), prev, @@ -561,9 +505,7 @@ impl StateMachine { if result.is_none() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } - if let Some(subscriber) = &self.subscriber { - let _ = subscriber.table_changed(DataChangeType::Drop, table_name, &prev, &result); - } + Ok(Change::new_with_id(table_id.unwrap(), prev, result).into()) } @@ -605,15 +547,6 @@ impl StateMachine { new_table_name ); - if let Some(subscriber) = &self.subscriber { - let _ = subscriber.table_changed( - DataChangeType::Update, - table_name, - &new_prev, - &new_result, - ); - } - Ok(AppliedState::TableMeta(Change::new_with_id( new_table_id.unwrap(), prev, @@ -643,22 +576,12 @@ impl StateMachine { tracing::debug!("applied UpsertKV: {} {:?}", key, result); if let Some(subscriber) = &self.subscriber { - let _ = subscriber.kv_changed(DataChangeType::Update, &key_str, &prev, &result); + let _ = subscriber.kv_changed(&key_str, &prev, &result); } Ok(Change::new(prev, result).into()) } - pub fn get_kv_by_range( - &self, - range: R, - ) -> MetaStorageResult>)>> - where - R: RangeBounds, - { - self.kvs().range_kvs(range) - } - #[tracing::instrument(level = "debug", skip(self, txn_tree))] fn apply_upsert_table_options_cmd( &self, @@ -1125,29 +1048,6 @@ impl StateMachine { } } - pub fn get_database_meta_by_range( - &self, - range: R, - ) -> MetaStorageResult> - where - R: RangeBounds, - { - let sm_db_ids = self.database_lookup(); - let lookup_keys = sm_db_ids.range_kvs(range)?; - let mut ret = Vec::new(); - for lookup_key in &lookup_keys { - let key = &lookup_key.0; - let id = &lookup_key.1; - let meta = self.get_database_meta_by_id(&id.data); - match meta { - Ok(meta) => ret.push((key.clone(), meta.data)), - Err(_) => continue, - } - } - - Ok(ret) - } - pub fn get_database_meta_by_id(&self, db_id: &u64) -> MetaStorageResult> { let x = self.databases().get(db_id)?.ok_or_else(|| { MetaStorageError::AppError(AppError::UnknownDatabaseId(UnknownDatabaseId::new( @@ -1181,29 +1081,6 @@ impl StateMachine { Ok(x) } - pub fn get_table_meta_by_range( - &self, - range: R, - ) -> MetaStorageResult>)>> - where - R: RangeBounds, - { - let sm_table_ids = self.table_lookup(); - let lookup_keys = sm_table_ids.range_kvs(range)?; - let mut ret = Vec::new(); - for lookup_key in &lookup_keys { - let key = &lookup_key.0; - let id = &lookup_key.1; - let meta = self.get_table_meta_by_id(&id.data.0); - match meta { - Ok(meta) => ret.push((key.clone(), meta)), - Err(_) => continue, - } - } - - Ok(ret) - } - pub fn txn_get_table_meta_by_id( &self, tid: &u64, From 447a88b5f6f72360b5107fe93fc026a4b78b797d Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 30 Mar 2022 11:41:39 +0800 Subject: [PATCH 5/8] Feature: remove unused comment --- common/meta/raft-store/src/state_machine/sm.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index a2753ca08dc4..d2d5c915cb46 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -96,8 +96,6 @@ pub trait StateMachineSubscriber: Debug + Sync + Send { async fn kv_changed(&self, key: &str, prev: &Option, meta: &Option); } -//dyn_clone::clone_trait_object!(StateMachineSubscriber); - /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: /// `last_applied_logs` and `client_serial_responses` to achieve idempotence. From d43e3f7c3475fa5328242abbc16046e47625b83c Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 28 Mar 2022 15:24:39 +0800 Subject: [PATCH 6/8] Feature: add state machine range and subscribe API Feature: add state machine range and subscribe API Feature: remove unused comment --- .../meta/raft-store/src/state_machine/sm.rs | 43 ++++++++++++++----- .../pipelines/new/executor/executor_notify.rs | 7 +++ .../pipelines/new/executor/executor_tasks.rs | 9 ++++ 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index dae8605c6b40..d2d5c915cb46 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -90,6 +90,12 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id"; // const TREE_META: &str = "meta"; const TREE_STATE_MACHINE: &str = "state_machine"; +/// StateMachine subscriber trait +#[async_trait::async_trait] +pub trait StateMachineSubscriber: Debug + Sync + Send { + async fn kv_changed(&self, key: &str, prev: &Option, meta: &Option); +} + /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: /// `last_applied_logs` and `client_serial_responses` to achieve idempotence. @@ -99,6 +105,9 @@ pub struct StateMachine { /// - Store initialization state and last applied in keyspace `StateMachineMeta`. /// - Every other state is store in its own keyspace such as `Nodes`. pub sm_tree: SledTree, + + /// subscriber of statemachine data + pub subscriber: Option>, } /// A key-value pair in a snapshot is a vec of two `Vec`. @@ -149,7 +158,10 @@ impl StateMachine { let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?; - let sm = StateMachine { sm_tree }; + let sm = StateMachine { + sm_tree, + subscriber: None, + }; let inited = { let sm_meta = sm.sm_meta(); @@ -167,6 +179,10 @@ impl StateMachine { } } + pub fn set_subscriber(&mut self, subscriber: Box) { + self.subscriber = Some(subscriber); + } + /// Create a snapshot. /// /// Returns: @@ -344,10 +360,10 @@ impl StateMachine { let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?; let db_lookup_tree = txn_tree.key_space::(); - + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); let (prev, result) = self.txn_sub_tree_upsert( &db_lookup_tree, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), + &db_key, &MatchSeq::Exact(0), Operation::Update(db_id), None, @@ -405,13 +421,9 @@ impl StateMachine { ) -> MetaStorageResult { let dbs = txn_tree.key_space::(); - let (prev, result) = self.txn_sub_tree_upsert( - &dbs, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), - &MatchSeq::Any, - Operation::Delete, - None, - )?; + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); + let (prev, result) = + self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?; assert!( result.is_none(), @@ -466,6 +478,7 @@ impl StateMachine { if result.is_some() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + Ok(AppliedState::TableMeta(Change::new_with_id( table_id.unwrap(), prev, @@ -490,6 +503,7 @@ impl StateMachine { if result.is_none() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + Ok(Change::new_with_id(table_id.unwrap(), prev, result).into()) } @@ -530,6 +544,7 @@ impl StateMachine { new_db_name, new_table_name ); + Ok(AppliedState::TableMeta(Change::new_with_id( new_table_id.unwrap(), prev, @@ -547,15 +562,21 @@ impl StateMachine { txn_tree: &TransactionSledTree, ) -> MetaStorageResult { let sub_tree = txn_tree.key_space::(); + let key_str = key.to_string(); let (prev, result) = self.txn_sub_tree_upsert( &sub_tree, - &key.to_string(), + &key_str, seq, value_op.clone(), value_meta.clone(), )?; tracing::debug!("applied UpsertKV: {} {:?}", key, result); + + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.kv_changed(&key_str, &prev, &result); + } + Ok(Change::new(prev, result).into()) } diff --git a/query/src/pipelines/new/executor/executor_notify.rs b/query/src/pipelines/new/executor/executor_notify.rs index 93a6b89fad25..6da7926e9a8a 100644 --- a/query/src/pipelines/new/executor/executor_notify.rs +++ b/query/src/pipelines/new/executor/executor_notify.rs @@ -37,6 +37,7 @@ struct WorkersNotifyMutable { } pub struct WorkersNotify { + workers: usize, mutable_state: Mutex, workers_notify: Vec, } @@ -52,6 +53,7 @@ impl WorkersNotify { } Arc::new(WorkersNotify { + workers, workers_notify, mutable_state: Mutex::new(WorkersNotifyMutable { waiting_size: 0, @@ -60,6 +62,11 @@ impl WorkersNotify { }) } + pub fn active_workers(&self) -> usize { + let mutable_state = self.mutable_state.lock(); + self.workers - mutable_state.waiting_size + } + pub fn is_empty(&self) -> bool { let mutable_state = self.mutable_state.lock(); mutable_state.waiting_size == 0 diff --git a/query/src/pipelines/new/executor/executor_tasks.rs b/query/src/pipelines/new/executor/executor_tasks.rs index c4878e1d97e2..2e966627cea1 100644 --- a/query/src/pipelines/new/executor/executor_tasks.rs +++ b/query/src/pipelines/new/executor/executor_tasks.rs @@ -54,6 +54,7 @@ impl ExecutorTasksQueue { pub unsafe fn steal_task_to_context(&self, context: &mut ExecutorWorkerContext) { { let mut workers_tasks = self.workers_tasks.lock(); + if !workers_tasks.is_empty() { let task = workers_tasks.pop_task(context.get_worker_num()); context.set_task(task); @@ -70,6 +71,14 @@ impl ExecutorTasksQueue { } } + // When tasks queue is empty and all workers are waiting, no new tasks will be generated. + let workers_notify = context.get_workers_notify(); + if workers_notify.active_workers() <= 1 { + self.finish(); + workers_notify.wakeup_all(); + return; + } + context.get_workers_notify().wait(context.get_worker_num()); } From f6b16fe51f9afa36d5bd463e98b26be116f47ecd Mon Sep 17 00:00:00 2001 From: lichuang Date: Mon, 28 Mar 2022 15:24:39 +0800 Subject: [PATCH 7/8] Feature: add state machine range and subscribe API --- .../meta/raft-store/src/state_machine/sm.rs | 99 ++++++++++++++++--- .../pipelines/new/executor/executor_notify.rs | 7 ++ .../pipelines/new/executor/executor_tasks.rs | 9 ++ 3 files changed, 104 insertions(+), 11 deletions(-) diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index dae8605c6b40..beca591b2e8d 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -14,6 +14,7 @@ use std::convert::TryInto; use std::fmt::Debug; +use std::ops::RangeBounds; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -90,6 +91,12 @@ const SEQ_DATABASE_META_ID: &str = "database_meta_id"; // const TREE_META: &str = "meta"; const TREE_STATE_MACHINE: &str = "state_machine"; +/// StateMachine subscriber trait +#[async_trait::async_trait] +pub trait StateMachineSubscriber: Debug + Sync + Send { + async fn kv_changed(&self, key: &str, prev: &Option, meta: &Option); +} + /// The state machine of the `MemStore`. /// It includes user data and two raft-related informations: /// `last_applied_logs` and `client_serial_responses` to achieve idempotence. @@ -99,6 +106,9 @@ pub struct StateMachine { /// - Store initialization state and last applied in keyspace `StateMachineMeta`. /// - Every other state is store in its own keyspace such as `Nodes`. pub sm_tree: SledTree, + + /// subscriber of statemachine data + pub subscriber: Option>, } /// A key-value pair in a snapshot is a vec of two `Vec`. @@ -149,7 +159,10 @@ impl StateMachine { let sm_tree = SledTree::open(&db, &tree_name, config.is_sync())?; - let sm = StateMachine { sm_tree }; + let sm = StateMachine { + sm_tree, + subscriber: None, + }; let inited = { let sm_meta = sm.sm_meta(); @@ -167,6 +180,10 @@ impl StateMachine { } } + pub fn set_subscriber(&mut self, subscriber: Box) { + self.subscriber = Some(subscriber); + } + /// Create a snapshot. /// /// Returns: @@ -344,10 +361,10 @@ impl StateMachine { let db_id = self.txn_incr_seq(SEQ_DATABASE_ID, txn_tree)?; let db_lookup_tree = txn_tree.key_space::(); - + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); let (prev, result) = self.txn_sub_tree_upsert( &db_lookup_tree, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), + &db_key, &MatchSeq::Exact(0), Operation::Update(db_id), None, @@ -405,13 +422,9 @@ impl StateMachine { ) -> MetaStorageResult { let dbs = txn_tree.key_space::(); - let (prev, result) = self.txn_sub_tree_upsert( - &dbs, - &DatabaseLookupKey::new(tenant.to_string(), name.to_string()), - &MatchSeq::Any, - Operation::Delete, - None, - )?; + let db_key = DatabaseLookupKey::new(tenant.to_string(), name.to_string()); + let (prev, result) = + self.txn_sub_tree_upsert(&dbs, &db_key, &MatchSeq::Any, Operation::Delete, None)?; assert!( result.is_none(), @@ -466,6 +479,7 @@ impl StateMachine { if result.is_some() { self.txn_incr_seq(SEQ_DATABASE_META_ID, txn_tree)?; } + Ok(AppliedState::TableMeta(Change::new_with_id( table_id.unwrap(), prev, @@ -530,6 +544,7 @@ impl StateMachine { new_db_name, new_table_name ); + Ok(AppliedState::TableMeta(Change::new_with_id( new_table_id.unwrap(), prev, @@ -547,18 +562,34 @@ impl StateMachine { txn_tree: &TransactionSledTree, ) -> MetaStorageResult { let sub_tree = txn_tree.key_space::(); + let key_str = key.to_string(); let (prev, result) = self.txn_sub_tree_upsert( &sub_tree, - &key.to_string(), + &key_str, seq, value_op.clone(), value_meta.clone(), )?; tracing::debug!("applied UpsertKV: {} {:?}", key, result); + + if let Some(subscriber) = &self.subscriber { + let _ = subscriber.kv_changed(&key_str, &prev, &result); + } + Ok(Change::new(prev, result).into()) } + pub fn get_kv_by_range( + &self, + range: R, + ) -> MetaStorageResult>)>> + where + R: RangeBounds, + { + self.kvs().range_kvs(range) + } + #[tracing::instrument(level = "debug", skip(self, txn_tree))] fn apply_upsert_table_options_cmd( &self, @@ -1025,6 +1056,29 @@ impl StateMachine { } } + pub fn get_database_meta_by_range( + &self, + range: R, + ) -> MetaStorageResult> + where + R: RangeBounds, + { + let sm_db_ids = self.database_lookup(); + let lookup_keys = sm_db_ids.range_kvs(range)?; + let mut ret = Vec::new(); + for lookup_key in &lookup_keys { + let key = &lookup_key.0; + let id = &lookup_key.1; + let meta = self.get_database_meta_by_id(&id.data); + match meta { + Ok(meta) => ret.push((key.clone(), meta.data)), + Err(_) => continue, + } + } + + Ok(ret) + } + pub fn get_database_meta_by_id(&self, db_id: &u64) -> MetaStorageResult> { let x = self.databases().get(db_id)?.ok_or_else(|| { MetaStorageError::AppError(AppError::UnknownDatabaseId(UnknownDatabaseId::new( @@ -1058,6 +1112,29 @@ impl StateMachine { Ok(x) } + pub fn get_table_meta_by_range( + &self, + range: R, + ) -> MetaStorageResult>)>> + where + R: RangeBounds, + { + let sm_table_ids = self.table_lookup(); + let lookup_keys = sm_table_ids.range_kvs(range)?; + let mut ret = Vec::new(); + for lookup_key in &lookup_keys { + let key = &lookup_key.0; + let id = &lookup_key.1; + let meta = self.get_table_meta_by_id(&id.data.0); + match meta { + Ok(meta) => ret.push((key.clone(), meta)), + Err(_) => continue, + } + } + + Ok(ret) + } + pub fn txn_get_table_meta_by_id( &self, tid: &u64, diff --git a/query/src/pipelines/new/executor/executor_notify.rs b/query/src/pipelines/new/executor/executor_notify.rs index 93a6b89fad25..6da7926e9a8a 100644 --- a/query/src/pipelines/new/executor/executor_notify.rs +++ b/query/src/pipelines/new/executor/executor_notify.rs @@ -37,6 +37,7 @@ struct WorkersNotifyMutable { } pub struct WorkersNotify { + workers: usize, mutable_state: Mutex, workers_notify: Vec, } @@ -52,6 +53,7 @@ impl WorkersNotify { } Arc::new(WorkersNotify { + workers, workers_notify, mutable_state: Mutex::new(WorkersNotifyMutable { waiting_size: 0, @@ -60,6 +62,11 @@ impl WorkersNotify { }) } + pub fn active_workers(&self) -> usize { + let mutable_state = self.mutable_state.lock(); + self.workers - mutable_state.waiting_size + } + pub fn is_empty(&self) -> bool { let mutable_state = self.mutable_state.lock(); mutable_state.waiting_size == 0 diff --git a/query/src/pipelines/new/executor/executor_tasks.rs b/query/src/pipelines/new/executor/executor_tasks.rs index c4878e1d97e2..2e966627cea1 100644 --- a/query/src/pipelines/new/executor/executor_tasks.rs +++ b/query/src/pipelines/new/executor/executor_tasks.rs @@ -54,6 +54,7 @@ impl ExecutorTasksQueue { pub unsafe fn steal_task_to_context(&self, context: &mut ExecutorWorkerContext) { { let mut workers_tasks = self.workers_tasks.lock(); + if !workers_tasks.is_empty() { let task = workers_tasks.pop_task(context.get_worker_num()); context.set_task(task); @@ -70,6 +71,14 @@ impl ExecutorTasksQueue { } } + // When tasks queue is empty and all workers are waiting, no new tasks will be generated. + let workers_notify = context.get_workers_notify(); + if workers_notify.active_workers() <= 1 { + self.finish(); + workers_notify.wakeup_all(); + return; + } + context.get_workers_notify().wait(context.get_worker_num()); } From 281b5b221949fb539370169859e12b3b20593f3c Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 31 Mar 2022 13:38:38 +0800 Subject: [PATCH 8/8] Fix: remove range API --- .../meta/raft-store/src/state_machine/sm.rs | 57 ------------------- 1 file changed, 57 deletions(-) diff --git a/common/meta/raft-store/src/state_machine/sm.rs b/common/meta/raft-store/src/state_machine/sm.rs index 7b855ccd7ae8..d2d5c915cb46 100644 --- a/common/meta/raft-store/src/state_machine/sm.rs +++ b/common/meta/raft-store/src/state_machine/sm.rs @@ -14,7 +14,6 @@ use std::convert::TryInto; use std::fmt::Debug; -use std::ops::RangeBounds; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -581,16 +580,6 @@ impl StateMachine { Ok(Change::new(prev, result).into()) } - pub fn get_kv_by_range( - &self, - range: R, - ) -> MetaStorageResult>)>> - where - R: RangeBounds, - { - self.kvs().range_kvs(range) - } - #[tracing::instrument(level = "debug", skip(self, txn_tree))] fn apply_upsert_table_options_cmd( &self, @@ -1057,29 +1046,6 @@ impl StateMachine { } } - pub fn get_database_meta_by_range( - &self, - range: R, - ) -> MetaStorageResult> - where - R: RangeBounds, - { - let sm_db_ids = self.database_lookup(); - let lookup_keys = sm_db_ids.range_kvs(range)?; - let mut ret = Vec::new(); - for lookup_key in &lookup_keys { - let key = &lookup_key.0; - let id = &lookup_key.1; - let meta = self.get_database_meta_by_id(&id.data); - match meta { - Ok(meta) => ret.push((key.clone(), meta.data)), - Err(_) => continue, - } - } - - Ok(ret) - } - pub fn get_database_meta_by_id(&self, db_id: &u64) -> MetaStorageResult> { let x = self.databases().get(db_id)?.ok_or_else(|| { MetaStorageError::AppError(AppError::UnknownDatabaseId(UnknownDatabaseId::new( @@ -1113,29 +1079,6 @@ impl StateMachine { Ok(x) } - pub fn get_table_meta_by_range( - &self, - range: R, - ) -> MetaStorageResult>)>> - where - R: RangeBounds, - { - let sm_table_ids = self.table_lookup(); - let lookup_keys = sm_table_ids.range_kvs(range)?; - let mut ret = Vec::new(); - for lookup_key in &lookup_keys { - let key = &lookup_key.0; - let id = &lookup_key.1; - let meta = self.get_table_meta_by_id(&id.data.0); - match meta { - Ok(meta) => ret.push((key.clone(), meta)), - Err(_) => continue, - } - } - - Ok(ret) - } - pub fn txn_get_table_meta_by_id( &self, tid: &u64,