diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index 9b49d75ba622..3dd81f4d514e 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -196,6 +196,8 @@ mod tests { use common_procedure::store::state_store::KeyValue; use futures::TryStreamExt; + use rand::{Rng, RngCore}; + use uuid::Uuid; use super::*; use crate::kv_backend::memory::MemoryKvBackend; @@ -254,4 +256,82 @@ mod tests { let data = walk_top_down("a/").await; assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data); } + + struct TestCase { + prefix: String, + key: String, + value: Vec, + } + + async fn test_meta_state_store_split_value_with_size_limit( + kv_backend: KvBackendRef, + size_limit: u32, + num_per_range: u32, + ) { + let num_cases = rand::thread_rng().gen_range(1..=26); + let mut cases = Vec::with_capacity(num_cases); + for i in 0..num_cases { + let size = rand::thread_rng().gen_range(size_limit..=8192u32); + let mut large_value = vec![0u8; size as usize]; + rand::thread_rng().fill_bytes(&mut large_value); + + // Starts from `a`. + let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap()); + cases.push(TestCase { + key: format!("{}{}.commit", prefix, Uuid::new_v4()), + prefix, + value: large_value, + }) + } + let store = &KvStateStore { + kv_backend: kv_backend.clone(), + max_num_per_range: Some(num_per_range as usize), // for testing "more" in range + max_size_per_value: Some(size_limit as usize), + }; + let walk_top_down = async move |path: &str| -> Vec { + let mut data = store + .walk_top_down(path) + .await + .unwrap() + .try_collect::>() + .await + .unwrap(); + data.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + data + }; + + // Puts the values + for TestCase { key, value, .. } in &cases { + store.put(key, value.clone()).await.unwrap(); + } + + // Validates the values + for TestCase { prefix, key, value } in &cases { + let data = walk_top_down(prefix).await; + assert_eq!(data.len(), 1); + let (keyset, got) = data.into_iter().next().unwrap(); + let num_expected_keys = value.len().div_ceil(size_limit as usize); + assert_eq!(&got, value); + assert_eq!(keyset.key(), key); + assert_eq!(keyset.keys().len(), num_expected_keys); + } + + // Deletes the values + for TestCase { prefix, .. } in &cases { + let data = walk_top_down(prefix).await; + let (keyset, _) = data.into_iter().next().unwrap(); + // Deletes values + store.batch_delete(keyset.keys().as_slice()).await.unwrap(); + let data = walk_top_down(prefix).await; + assert_eq!(data.len(), 0); + } + } + + #[tokio::test] + async fn test_meta_state_store_split_value() { + let size_limit = rand::thread_rng().gen_range(128..=512); + let page_size = rand::thread_rng().gen_range(1..10); + let kv_backend = Arc::new(MemoryKvBackend::new()); + test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size).await; + } }