Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(procedure): auto split large value to multiple values #3605

Merged
merged 15 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 216 additions & 24 deletions src/common/meta/src/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
use common_procedure::store::state_store::{KeyValueStream, StateStore};
use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
use common_procedure::store::util::{multiple_values_collector, MultipleValuesStream};
use common_procedure::Result as ProcedureResult;
use futures::future::try_join_all;
use futures::StreamExt;
use snafu::ResultExt;

Expand All @@ -42,16 +44,20 @@ fn strip_prefix(key: &str) -> String {

pub struct KvStateStore {
kv_backend: KvBackendRef,
// limit is set to 0, it is treated as no limit.
max_size_per_range: usize,
// The max num of keys to be returned in a range scan request
// `None` stands no limit.
max_num_per_range: Option<usize>,
// The max bytes of value.
// `None` stands no limit.
max_size_per_value: Option<usize>,
}

impl KvStateStore {
// `max_size_per_range` is set to 0, it is treated as no limit.
pub fn new(kv_backend: KvBackendRef) -> Self {
Self {
kv_backend,
max_size_per_range: 0,
max_num_per_range: None,
max_size_per_value: None,
}
}
}
Expand All @@ -64,20 +70,80 @@ fn decode_kv(kv: KeyValue) -> Result<(String, Vec<u8>)> {
Ok((key, value))
}

enum SplitValue<'a> {
Single(&'a [u8]),
Multiple(Vec<&'a [u8]>),
}

fn split_value(value: &[u8], max_size_per_value: Option<usize>) -> SplitValue<'_> {
if let Some(max_size_per_value) = max_size_per_value {
if value.len() <= max_size_per_value {
SplitValue::Single(value)
} else {
SplitValue::Multiple(value.chunks(max_size_per_value).collect::<Vec<_>>())
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
SplitValue::Single(value)
}
}

#[async_trait]
impl StateStore for KvStateStore {
async fn put(&self, key: &str, value: Vec<u8>) -> ProcedureResult<()> {
let _ = self
.kv_backend
.put(PutRequest {
key: with_prefix(key).into_bytes(),
value,
..Default::default()
})
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;
Ok(())
let split = split_value(&value, self.max_size_per_value);
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
let key = with_prefix(key);
match split {
SplitValue::Single(_) => {
self.kv_backend
.put(
PutRequest::new()
.with_key(key.to_string().into_bytes())
.with_value(value),
)
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;
Ok(())
}
SplitValue::Multiple(values) => {
// Note:
// The length of values can be up to usize::MAX.
// The KeySet::with_segment_suffix method uses a 10-digit number to store the segment number,
// which is large enough for the usize type.

// The first segment key: "0b00001111"
// The 2nd segment key: "0b00001111/0000000001"
// The 3rd segment key: "0b00001111/0000000002"
let operations = values
.into_iter()
.enumerate()
.map(|(idx, value)| {
let key = if idx > 0 {
KeySet::with_segment_suffix(&key, idx)
} else {
key.to_string()
};
let kv_backend = self.kv_backend.clone();
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
async move {
kv_backend
.put(
PutRequest::new()
.with_key(key.into_bytes())
.with_value(value),
)
.await
}
})
.collect::<Vec<_>>();

try_join_all(operations)
.await
.map_err(BoxedError::new)
.context(PutStateSnafu { key })?;

Ok(())
}
}
}

async fn walk_top_down(&self, path: &str) -> ProcedureResult<KeyValueStream> {
Expand All @@ -90,7 +156,7 @@ impl StateStore for KvStateStore {
let stream = PaginationStream::new(
self.kv_backend.clone(),
req,
self.max_size_per_range,
self.max_num_per_range.unwrap_or_default(),
Arc::new(decode_kv),
);

Expand All @@ -100,6 +166,9 @@ impl StateStore for KvStateStore {
.with_context(|_| ListStateSnafu { path })
});

let stream =
MultipleValuesStream::new(Box::pin(stream), Box::new(multiple_values_collector));

Ok(Box::pin(stream))
}

Expand Down Expand Up @@ -128,19 +197,26 @@ impl StateStore for KvStateStore {

#[cfg(test)]
mod tests {
use std::env;
use std::sync::Arc;

use common_procedure::store::state_store::KeyValue;
use common_telemetry::info;
use futures::TryStreamExt;
use rand::{Rng, RngCore};
use uuid::Uuid;

use super::*;
use crate::kv_backend::chroot::ChrootKvBackend;
use crate::kv_backend::etcd::EtcdStore;
use crate::kv_backend::memory::MemoryKvBackend;

#[tokio::test]
async fn test_meta_state_store() {
let store = &KvStateStore {
kv_backend: Arc::new(MemoryKvBackend::new()),
max_size_per_range: 1, // for testing "more" in range
max_num_per_range: Some(1), // for testing "more" in range
max_size_per_value: None,
};

let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
Expand All @@ -165,18 +241,18 @@ mod tests {
let data = walk_top_down("/").await;
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
("b/1".to_string(), b"v3".to_vec())
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
("b/1".into(), b"v3".to_vec())
],
data
);

let data = walk_top_down("a/").await;
assert_eq!(
vec![
("a/1".to_string(), b"v1".to_vec()),
("a/2".to_string(), b"v2".to_vec()),
("a/1".into(), b"v1".to_vec()),
("a/2".into(), b"v2".to_vec()),
],
data
);
Expand All @@ -187,6 +263,122 @@ mod tests {
.unwrap();

let data = walk_top_down("a/").await;
assert_eq!(vec![("a/1".to_string(), b"v1".to_vec()),], data);
assert_eq!(vec![("a/1".into(), b"v1".to_vec()),], data);
}

struct TestCase {
prefix: String,
key: String,
value: Vec<u8>,
}

async fn test_meta_state_store_split_value_with_size_limit(
kv_backend: KvBackendRef,
size_limit: u32,
num_per_range: u32,
max_bytes: u32,
) {
let num_cases = rand::thread_rng().gen_range(1..=26);
let mut cases = Vec::with_capacity(num_cases);
for i in 0..num_cases {
let size = rand::thread_rng().gen_range(size_limit..=max_bytes);
let mut large_value = vec![0u8; size as usize];
rand::thread_rng().fill_bytes(&mut large_value);

// Starts from `a`.
let prefix = format!("{}/", std::char::from_u32(97 + i as u32).unwrap());
cases.push(TestCase {
key: format!("{}{}.commit", prefix, Uuid::new_v4()),
prefix,
value: large_value,
})
}
let store = &KvStateStore {
kv_backend: kv_backend.clone(),
max_num_per_range: Some(num_per_range as usize), // for testing "more" in range
max_size_per_value: Some(size_limit as usize),
};
let walk_top_down = async move |path: &str| -> Vec<KeyValue> {
let mut data = store
.walk_top_down(path)
.await
.unwrap()
.try_collect::<Vec<_>>()
.await
.unwrap();
data.sort_unstable_by(|a, b| a.0.cmp(&b.0));
data
};

// Puts the values
for TestCase { key, value, .. } in &cases {
store.put(key, value.clone()).await.unwrap();
}

// Validates the values
for TestCase { prefix, key, value } in &cases {
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 1);
let (keyset, got) = data.into_iter().next().unwrap();
let num_expected_keys = value.len().div_ceil(size_limit as usize);
assert_eq!(&got, value);
assert_eq!(keyset.key(), key);
assert_eq!(keyset.keys().len(), num_expected_keys);
}

// Deletes the values
for TestCase { prefix, .. } in &cases {
let data = walk_top_down(prefix).await;
let (keyset, _) = data.into_iter().next().unwrap();
// Deletes values
store.batch_delete(keyset.keys().as_slice()).await.unwrap();
let data = walk_top_down(prefix).await;
assert_eq!(data.len(), 0);
}
}

#[tokio::test]
async fn test_meta_state_store_split_value() {
let size_limit = rand::thread_rng().gen_range(128..=512);
let page_size = rand::thread_rng().gen_range(1..10);
let kv_backend = Arc::new(MemoryKvBackend::new());
test_meta_state_store_split_value_with_size_limit(kv_backend, size_limit, page_size, 8192)
.await;
}

#[tokio::test]
async fn test_etcd_store_split_value() {
common_telemetry::init_default_ut_logging();
let prefix = "test_etcd_store_split_value/";
let endpoints = env::var("GT_ETCD_ENDPOINTS").unwrap_or_default();
let kv_backend: KvBackendRef = if endpoints.is_empty() {
Arc::new(MemoryKvBackend::new())
} else {
let endpoints = endpoints
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let backend = EtcdStore::with_endpoints(endpoints, 128)
.await
.expect("malformed endpoints");
// Each retry requires a new isolation namespace.
let chroot = format!("{}{}", prefix, Uuid::new_v4());
info!("chroot length: {}", chroot.len());
Arc::new(ChrootKvBackend::new(chroot.into(), backend))
};

let key_preserve_size = 1024;
// The etcd default size limit of any requests is 1.5MiB.
// However, some KvBackends, the `ChrootKvBackend`, will add the prefix to `key`;
// we don't know the exact size of the key.
let size_limit = 1536 * 1024 - key_preserve_size;
let page_size = rand::thread_rng().gen_range(1..10);
test_meta_state_store_split_value_with_size_limit(
kv_backend,
size_limit,
page_size,
size_limit * 10,
)
.await;
}
}
16 changes: 15 additions & 1 deletion src/common/procedure/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ pub enum Error {
source: Arc<Error>,
location: Location,
},

#[snafu(display("Failed to parse segment key: {key}"))]
ParseSegmentKey {
location: Location,
key: String,
#[snafu(source)]
error: std::num::ParseIntError,
},

#[snafu(display("Unexpected: {err_msg}"))]
Unexpected { location: Location, err_msg: String },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -156,7 +167,10 @@ impl ErrorExt for Error {
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
Error::ProcedurePanic { .. } | Error::CorruptedData { .. } => StatusCode::Unexpected,
Error::ProcedurePanic { .. }
| Error::CorruptedData { .. }
| Error::ParseSegmentKey { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
Error::StartRemoveOutdatedMetaTask { source, .. }
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),
Expand Down
15 changes: 9 additions & 6 deletions src/common/procedure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub(crate) use crate::store::state_store::StateStoreRef;
use crate::ProcedureId;

pub mod state_store;
pub mod util;

/// Key prefix of procedure store.
const PROC_PATH: &str = "procedure/";
Expand Down Expand Up @@ -143,16 +144,17 @@ impl ProcedureStore {
// 8 should be enough for most procedures.
let mut step_keys = Vec::with_capacity(8);
let mut finish_keys = Vec::new();
while let Some((key, _)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else {
while let Some((key_set, _)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
if curr_key.key_type == KeyType::Step {
step_keys.push(key);
step_keys.extend(key_set.keys());
} else {
// .commit or .rollback
finish_keys.push(key);
finish_keys.extend(key_set.keys());
}
}

Expand Down Expand Up @@ -184,8 +186,9 @@ impl ProcedureStore {

// Scan all procedures.
let mut key_values = self.store.walk_top_down(&self.proc_path).await?;
while let Some((key, value)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else {
while let Some((key_set, value)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
continue;
};
Expand Down
Loading