Skip to content

Commit

Permalink
test: add more tests for KvStateStore
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Mar 29, 2024
1 parent 65b2271 commit ac1dddb
Showing 1 changed file with 80 additions and 0 deletions.
80 changes: 80 additions & 0 deletions src/common/meta/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ mod tests {

use common_procedure::store::state_store::KeyValue;
use futures::TryStreamExt;
use rand::{Rng, RngCore};
use uuid::Uuid;

use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
Expand Down Expand Up @@ -254,4 +256,82 @@ mod tests {
let data = walk_top_down("a/").await;
assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
}

struct TestCase {
prefix: String,
key: String,
value: Vec<u8>,
}

async fn test_meta_state_store_split_value_with_size_limit(
kv_backend: KvBackendRef,
size_limit: u32,
num_per_range: u32,
) {
let num_cases = rand::thread_rng().gen_range(1..=26);
let mut cases = Vec::with_capacity(num_cases);
for i in 0..num_cases {
let size = rand::thread_rng().gen_range(size_limit..=8192u32);
let mut large_value = vec![0u8; size as usize];
rand::thread_rng().fill_bytes(&mut large_value);

// Starts from `a`.
let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
cases.push(TestCase {
key: format!("{}{}.commit", prefix, Uuid::new_v4()),
prefix,
value: large_value,
})
}
let store = &KvStateStore {
kv_backend: kv_backend.clone(),
max_num_per_range: Some(num_per_range as usize), // for testing "more" in range
max_size_per_value: Some(size_limit as usize),
};
let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
let mut data = store
.walk_top_down(path)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
data
};

// Puts the values
for TestCase { key, value, .. } in &cases {
store.put(key, value.clone()).await.unwrap();
}

// Validates the values
for TestCase { prefix, key, value } in &cases {
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 1);
let (keyset, got) = data.into_iter().next().unwrap();
let num_expected_keys = value.len().div_ceil(size_limit as usize);
assert_eq!(&got, value);
assert_eq!(keyset.key(), key);
assert_eq!(keyset.keys().len(), num_expected_keys);
}

// Deletes the values
for TestCase { prefix, .. } in &cases {
let data = walk_top_down(prefix).await;
let (keyset, _) = data.into_iter().next().unwrap();
// Deletes values
store.batch_delete(keyset.keys().as_slice()).await.unwrap();
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 0);
}
}

#[tokio::test]
async fn test_meta_state_store_split_value() {
let size_limit = rand::thread_rng().gen_range(128..=512);
let page_size = rand::thread_rng().gen_range(1..10);
let kv_backend = Arc::new(MemoryKvBackend::new());
test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size).await;
}
}

0 comments on commit ac1dddb

Please sign in to comment.