Skip to content

Commit

Permalink
Merge pull request #6526 from lichuang/fix_txn_watch
Browse files Browse the repository at this point in the history
fix: add watch txn unit test
  • Loading branch information
BohuTANG authored Jul 8, 2022
2 parents b0338bc + 146645e commit 68f4053
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 3 deletions.
1 change: 1 addition & 0 deletions common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub use stop_handle::StopHandle;
pub use stoppable::Stoppable;
pub use string_func::escape_for_key;
pub use string_func::mask_string;
pub use string_func::replace_nth_char;
pub use string_func::unescape_for_key;
pub use thread::Thread;
pub use tokio;
Expand Down
11 changes: 11 additions & 0 deletions common/base/src/base/string_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,14 @@ pub fn mask_string(s: &str, unmask_len: usize) -> String {
ret
}
}

/// Replace idx-th char as new char
/// If idx is out of len(s) range, then no replacement is performed.
/// replace_nth_char("a13", 1, '2') -> 'a23'
/// replace_nth_char("a13", 10, '2') -> 'a13'
pub fn replace_nth_char(s: &str, idx: usize, newchar: char) -> String {
s.chars()
.enumerate()
.map(|(i, c)| if i == idx { newchar } else { c })
.collect()
}
41 changes: 41 additions & 0 deletions common/base/tests/it/range_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,45 @@ fn test_range_set() {
assert_eq!(a.get_by_point(&a1), vec![r11]);
assert_eq!(a.get_by_point(&a2), vec![r24, r26]);
}
// test get_by_point for string prefix
{
let mut a = RangeMap::new();

let a1 = "11".to_string();
let a2 = "12".to_string();

a.insert(a1..a2, 11, 11);
assert!(!a.get_by_point(&"11".to_string()).is_empty());
assert!(!a.get_by_point(&"111".to_string()).is_empty());
assert!(!a.get_by_point(&"11z".to_string()).is_empty());
assert!(!a.get_by_point(&"11/".to_string()).is_empty());
assert!(!a.get_by_point(&"11*".to_string()).is_empty());
assert!(a.get_by_point(&"12".to_string()).is_empty());
}
// test get_by_point for char upbound limit string prefix
{
let mut a = RangeMap::new();

let a1 = format!("{}", 255 as char);
let a2 = format!("{}{}", 255 as char, 255 as char);

a.insert(a1..a2, 11, 11);
assert!(!a.get_by_point(&format!("{}", 255 as char)).is_empty());
assert!(!a.get_by_point(&format!("{}z", 255 as char)).is_empty());
assert!(!a.get_by_point(&format!("{}/", 255 as char)).is_empty());
assert!(!a.get_by_point(&format!("{}*", 255 as char)).is_empty());
}
// test get_by_point for char upbound limit string prefix
{
let mut a = RangeMap::new();

let a1 = "1".to_string();
let a2 = format!("{}{}", a1, 255 as char);

a.insert(a1.clone()..a2, 11, 11);
assert!(!a.get_by_point(&a1).is_empty());
assert!(!a.get_by_point(&format!("{}z", a1)).is_empty());
assert!(!a.get_by_point(&format!("{}*", a1)).is_empty());
assert!(!a.get_by_point(&format!("{}/", a1)).is_empty());
}
}
6 changes: 6 additions & 0 deletions common/base/tests/it/string_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,9 @@ fn mask_string_test() {
assert_eq!(mask_string("string", 3), "******ing".to_string());
assert_eq!(mask_string("string", 20), "string".to_string());
}

#[test]
fn replace_nth_char_test() {
assert_eq!("a23".to_string(), replace_nth_char("a13", 1, '2'));
assert_eq!("a13".to_string(), replace_nth_char("a13", 10, '2'));
}
2 changes: 2 additions & 0 deletions common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,13 @@ build_exceptions! {

// Variable error codes.
UnknownVariable(2801),
OnlySupportAsciiChars(2802),

// Tenant quota error codes.
IllegalTenantQuotaFormat(2901),
TenantQuotaUnknown(2902),
TenantQuotaExceeded(2903),

}

// Storage errors [3001, 4000].
Expand Down
35 changes: 35 additions & 0 deletions common/meta/api/src/kv_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
use std::ops::Deref;

use async_trait::async_trait;
use common_base::base::replace_nth_char;
use common_exception::ErrorCode;
use common_meta_types::GetKVReply;
use common_meta_types::ListKVReply;
use common_meta_types::MGetKVReply;
Expand All @@ -35,6 +37,39 @@ pub trait ApiBuilder<T>: Clone {
async fn build_cluster(&self) -> Vec<T>;
}

/// Return a string that bigger than all the string prefix with input string(only support ASCII char).
/// "a" -> "b"
/// "1" -> "2"
/// [96,97,127] -> [96,98,127]
/// [127] -> [127, 127]
/// [127,127,127, 127] -> [127,127,127, 127, 127]
pub fn prefix_of_string(s: &str) -> common_exception::Result<String> {
for c in s.chars() {
if !c.is_ascii() {
return common_exception::Result::Err(ErrorCode::OnlySupportAsciiChars(format!(
"Only support ASCII characters: {}",
c
)));
}
}
let mut l = s.len();
while l > 0 {
l -= 1;
if let Some(c) = s.chars().nth(l) {
if c == 127 as char {
continue;
}
return Ok(replace_nth_char(s, l, (c as u8 + 1) as char));
}
}
Ok(format!("{}{}", s, 127 as char))
}

// return watch prefix (start, end) tuple(only support ASCII characters)
pub fn get_start_and_end_of_prefix(prefix: &str) -> common_exception::Result<(String, String)> {
Ok((prefix.to_string(), prefix_of_string(prefix)?))
}

#[async_trait]
pub trait KVApi: Send + Sync {
async fn upsert_kv(&self, req: UpsertKVReq) -> Result<UpsertKVReply, MetaError>;
Expand Down
2 changes: 2 additions & 0 deletions common/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ mod schema_api_impl;
mod schema_api_keys;
mod schema_api_test_suite;

pub use kv_api::get_start_and_end_of_prefix;
pub use kv_api::prefix_of_string;
pub use kv_api::ApiBuilder;
pub use kv_api::AsKVApi;
pub use kv_api::KVApi;
Expand Down
6 changes: 4 additions & 2 deletions common/meta/types/proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ message WatchRequest {
// key is the key to register for watching.
string key = 1;

// key_end is the end of the range [key, key_end) to watch.
// if key_end is None, then watch only key.
// `key_end`` is the end of the range [key, key_end) to watch.
// If key_end is None, then watch only key.
// If want to watch prefix of key, use `get_start_and_end_of_prefix` to
// generate [key, key_end).
optional string key_end = 2;

enum FilterType {
Expand Down
Loading

1 comment on commit 68f4053

@vercel
Copy link

@vercel vercel bot commented on 68f4053 Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.rs
databend-databend.vercel.app
databend.vercel.app

Please sign in to comment.