diff --git a/Cargo.lock b/Cargo.lock index 22233c207c76..c77862c7d46d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5395,7 +5395,6 @@ dependencies = [ "databend-storages-common-index", "databend-storages-common-table-meta", "log", - "parking_lot 0.12.1", ] [[package]] diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index fe12c10ea71f..7edebfb7630b 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -53,7 +53,6 @@ use crate::leveled_store::map_api::MapApiExt; use crate::leveled_store::map_api::MapApiRO; use crate::leveled_store::sys_data_api::SysDataApiRO; use crate::marked::Marked; -use crate::state_machine::sm::BlockingConfig; use crate::state_machine::ExpireKey; use crate::state_machine::StateMachineSubscriber; use crate::utils::prefix_right_bound; @@ -117,8 +116,6 @@ impl<'a> SMV003KVApi<'a> { pub struct SMV003 { pub(in crate::sm_v003) levels: LeveledMap, - blocking_config: BlockingConfig, - /// The expiration key since which for next clean. pub(in crate::sm_v003) expire_cursor: ExpireKey, @@ -172,15 +169,6 @@ impl SMV003 { self.levels.persisted().cloned() } - /// Return a Arc of the blocking config. It is only used for testing. - pub fn blocking_config_mut(&mut self) -> &mut BlockingConfig { - &mut self.blocking_config - } - - pub fn blocking_config(&self) -> &BlockingConfig { - &self.blocking_config - } - #[allow(dead_code)] pub(crate) fn new_applier(&mut self) -> Applier { Applier::new(self) @@ -257,7 +245,7 @@ impl SMV003 { } /// List expiration index by expiration time, - /// upto current time(exclusive) in milli seconds. + /// upto current time(exclusive) in milliseconds. /// /// Only records with expire time less than current time will be returned. /// Expire time that equals to current time is not considered expired. @@ -323,7 +311,7 @@ impl SMV003 { self.levels.acquire_compactor().await } - /// Replace all of the state machine data with the given one. + /// Replace all the state machine data with the given one. /// The input is a multi-level data. pub fn replace(&mut self, level: LeveledMap) { let applied = self.levels.writable_ref().last_applied_ref(); @@ -338,8 +326,8 @@ impl SMV003 { self.levels = level; - // The installed data may not cleaned up all expired keys, if it is built with an older state machine. - // So we need to reset the cursor then the next time applying a log it will cleanup all expired. + // The installed data may not clean up all expired keys, if it is built with an older state machine. + // So we need to reset the cursor then the next time applying a log it will clean up all expired. self.expire_cursor = ExpireKey::new(0, 0); } diff --git a/src/meta/raft-store/src/state_machine/sm.rs b/src/meta/raft-store/src/state_machine/sm.rs index e02fa27d6f3b..06b7687c508b 100644 --- a/src/meta/raft-store/src/state_machine/sm.rs +++ b/src/meta/raft-store/src/state_machine/sm.rs @@ -107,8 +107,6 @@ pub struct StateMachine { /// - Every other state is store in its own keyspace such as `Nodes`. pub sm_tree: SledTree, - blocking_config: BlockingConfig, - /// subscriber of state machine data pub subscriber: Option>, } @@ -135,23 +133,7 @@ impl SerializableSnapshot { } } -/// Configuration of what operation to block for testing purpose. -#[derive(Debug, Clone, Default)] -pub struct BlockingConfig { - pub write_snapshot: Duration, - pub compact_snapshot: Duration, -} - impl StateMachine { - /// Return a Arc of the blocking config. It is only used for testing. - pub fn blocking_config_mut(&mut self) -> &mut BlockingConfig { - &mut self.blocking_config - } - - pub fn blocking_config(&self) -> &BlockingConfig { - &self.blocking_config - } - pub fn tree_name(config: &RaftConfig, sm_id: u64) -> String { config.tree_name(format!("{}/{}", TREE_STATE_MACHINE, sm_id)) } @@ -167,7 +149,6 @@ impl StateMachine { let sm = StateMachine { sm_tree, - blocking_config: BlockingConfig::default(), subscriber: None, }; diff --git a/src/meta/service/src/api/http/v1/ctrl.rs b/src/meta/service/src/api/http/v1/ctrl.rs index 7b6d5061fc3d..2578db12c70b 100644 --- a/src/meta/service/src/api/http/v1/ctrl.rs +++ b/src/meta/service/src/api/http/v1/ctrl.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; use databend_common_meta_sled_store::openraft::async_runtime::watch::WatchReceiver; use databend_common_meta_types::NodeId; @@ -128,21 +127,3 @@ pub async fn trigger_transfer_leader( voter_ids, })) } - -#[poem::handler] -pub async fn block_write_snapshot( - meta_node: Data<&Arc>, -) -> poem::Result { - let mut sm = meta_node.sto.get_state_machine().await; - sm.blocking_config_mut().write_snapshot = Duration::from_millis(1_000_000); - Ok(Json(())) -} - -#[poem::handler] -pub async fn block_compact_snapshot( - meta_node: Data<&Arc>, -) -> poem::Result { - let mut sm = meta_node.sto.get_state_machine().await; - sm.blocking_config_mut().compact_snapshot = Duration::from_millis(1_000_000); - Ok(Json(())) -} diff --git a/src/meta/service/src/api/http_service.rs b/src/meta/service/src/api/http_service.rs index 8eba7671e54c..d376a77698cd 100644 --- a/src/meta/service/src/api/http_service.rs +++ b/src/meta/service/src/api/http_service.rs @@ -65,14 +65,6 @@ impl HttpService { "/v1/ctrl/trigger_transfer_leader", get(super::http::v1::ctrl::trigger_transfer_leader), ) - .at( - "/v1/ctrl/block_write_snapshot", - get(super::http::v1::ctrl::block_write_snapshot), - ) - .at( - "/v1/ctrl/block_compact_snapshot", - get(super::http::v1::ctrl::block_compact_snapshot), - ) .at( "/v1/cluster/nodes", get(super::http::v1::cluster_state::nodes_handler), diff --git a/src/meta/service/tests/it/meta_node/meta_node_raft_api.rs b/src/meta/service/tests/it/meta_node/meta_node_raft_api.rs deleted file mode 100644 index d30479995058..000000000000 --- a/src/meta/service/tests/it/meta_node/meta_node_raft_api.rs +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2023 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Test raft protocol behaviors - -use std::time::Duration; - -use databend_common_base::base::tokio; -use databend_common_meta_types::Cmd; -use databend_common_meta_types::LogEntry; -use databend_common_meta_types::UpsertKV; -use log::info; -use maplit::btreeset; -use test_harness::test; - -use crate::testing::meta_service_test_harness; -use crate::tests::meta_node::start_meta_node_cluster; - -/// When a follower is dumping a snapshot, it should not block append entries request. -/// Thus heartbeat should still be processed, and logs can be committed by leader(but not by followers). -/// -/// Building a snapshot includes two steps: -/// 1. Dumping the state machine to a in-memory struct. -/// 2. Serialize the dumped data. -#[test(harness = meta_service_test_harness)] -#[fastrace::trace] -async fn test_meta_node_dumping_snapshot_does_not_block_append_entries() -> anyhow::Result<()> { - info!("--- initialize cluster 2 voters"); - let (mut _log_index, mut tcs) = start_meta_node_cluster(btreeset![0, 1], btreeset![]).await?; - - let tc0 = tcs.remove(0); - let tc1 = tcs.remove(0); - - let mn0 = tc0.meta_node.clone().unwrap(); - let mn1 = tc1.meta_node.clone().unwrap(); - - info!("--- block dumping snapshot from state machine for 5 seconds"); - { - let mut sm = mn1.sto.get_state_machine().await; - let blocking_config = sm.blocking_config_mut(); - blocking_config.write_snapshot = Duration::from_secs(5); - } - - info!("--- trigger building snapshot"); - mn1.raft.trigger().snapshot().await?; - - info!("--- Wait 500 ms for snapshot to be begin building"); - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - info!("--- With snapshot being blocked, leader can still write"); - let key = "foo"; - mn0.assume_leader() - .await? - .write(LogEntry { - txid: None, - time_ms: None, - cmd: Cmd::UpsertKV(UpsertKV::update(key, key.as_bytes())), - }) - .await?; - info!("--- Write done"); - - Ok(()) -} - -/// When a follower is serializing a snapshot, it should not block append entries request. -/// Thus heartbeat should still be processed, and logs can be committed by leader(but not by followers). -/// -/// Building a snapshot includes two steps: -/// 1. Dumping the state machine to a in-memory struct. -/// 2. Serialize the dumped data. -#[test(harness = meta_service_test_harness)] -#[fastrace::trace] -async fn test_meta_node_serializing_snapshot_does_not_block_append_entries() -> anyhow::Result<()> { - info!("--- initialize cluster 2 voters"); - let (mut _log_index, mut tcs) = start_meta_node_cluster(btreeset![0, 1], btreeset![]).await?; - - let tc0 = tcs.remove(0); - let tc1 = tcs.remove(0); - - let mn0 = tc0.meta_node.clone().unwrap(); - let mn1 = tc1.meta_node.clone().unwrap(); - - info!("--- block dumping snapshot from state machine for 5 seconds"); - { - let mut sm = mn1.sto.get_state_machine().await; - let blocking_config = sm.blocking_config_mut(); - blocking_config.compact_snapshot = Duration::from_secs(5); - } - - info!("--- trigger building snapshot"); - mn1.raft.trigger().snapshot().await?; - - info!("--- Wait 500 ms for snapshot to be begin building"); - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - info!("--- With snapshot being blocked, leader can still write"); - let key = "foo"; - mn0.assume_leader() - .await? - .write(LogEntry { - txid: None, - time_ms: None, - cmd: Cmd::UpsertKV(UpsertKV::update(key, key.as_bytes())), - }) - .await?; - info!("--- Write done"); - - Ok(()) -} diff --git a/src/meta/service/tests/it/meta_node/mod.rs b/src/meta/service/tests/it/meta_node/mod.rs index 0469da7482fc..6c02500ee58d 100644 --- a/src/meta/service/tests/it/meta_node/mod.rs +++ b/src/meta/service/tests/it/meta_node/mod.rs @@ -15,6 +15,5 @@ pub(crate) mod meta_node_kv_api; pub(crate) mod meta_node_kv_api_expire; pub(crate) mod meta_node_lifecycle; -pub(crate) mod meta_node_raft_api; pub(crate) mod meta_node_replication; pub(crate) mod meta_node_request_forwarding;