From 749db6769aa5b6c012d22882320159e3ce7193dd Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 11:25:37 +0800 Subject: [PATCH 1/9] fix: add watch txn unit test --- common/meta/api/src/lib.rs | 1 + common/meta/api/src/schema_api_keys.rs | 11 ++ metasrv/tests/it/grpc/metasrv_grpc_watch.rs | 180 ++++++++++++++++++++ 3 files changed, 192 insertions(+) diff --git a/common/meta/api/src/lib.rs b/common/meta/api/src/lib.rs index fb092b765891..fd2837a26705 100644 --- a/common/meta/api/src/lib.rs +++ b/common/meta/api/src/lib.rs @@ -35,6 +35,7 @@ pub(crate) use schema_api_impl::serialize_struct; pub use schema_api_impl::txn_cond_seq; pub use schema_api_impl::txn_op_del; pub use schema_api_impl::txn_op_put; +pub use schema_api_keys::get_start_and_end_of_prefix; pub use schema_api_keys::DatabaseIdGen; pub use schema_api_keys::TableIdGen; pub use schema_api_test_suite::SchemaApiTestSuite; diff --git a/common/meta/api/src/schema_api_keys.rs b/common/meta/api/src/schema_api_keys.rs index 9590769dbbbb..d925405b7514 100644 --- a/common/meta/api/src/schema_api_keys.rs +++ b/common/meta/api/src/schema_api_keys.rs @@ -276,3 +276,14 @@ impl KVApiKey for CountTablesKey { Ok(CountTablesKey { tenant }) } } + +fn prefix_of_string(s: &str) -> String { + let l = s.len(); + let a = s.chars().nth(l - 1).unwrap(); + return format!("{}{}", s[..l - 2].to_string(), (a as u8 + 1) as char); +} + +// return watch prefix (start, end) tuple +pub fn get_start_and_end_of_prefix(prefix: &str) -> (String, String) { + return (prefix.to_string(), prefix_of_string(prefix)); +} diff --git a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs index d165ba2a96f7..c97163dc6a14 100644 --- a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs +++ b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs @@ -15,14 +15,24 @@ use std::time::Duration; use common_base::base::tokio; +use common_meta_api::get_start_and_end_of_prefix; use common_meta_api::KVApi; use common_meta_grpc::MetaGrpcClient; use common_meta_types::protobuf::watch_request::FilterType; use common_meta_types::protobuf::Event; use common_meta_types::protobuf::SeqV; +use common_meta_types::protobuf::TxnRequest; use common_meta_types::protobuf::WatchRequest; +use common_meta_types::txn_condition; +use common_meta_types::txn_op; +use common_meta_types::ConditionResult; use common_meta_types::MatchSeq; use common_meta_types::Operation; +use common_meta_types::TxnCondition; +use common_meta_types::TxnDeleteByPrefixRequest; +use common_meta_types::TxnDeleteRequest; +use common_meta_types::TxnOp; +use common_meta_types::TxnPutRequest; use common_meta_types::UpsertKVReq; use common_tracing::tracing; @@ -46,6 +56,21 @@ async fn upsert_kv_client_main(addr: String, updates: Vec) -> anyho Ok(()) } +async fn txn_client_main(addr: String, txn: TxnRequest) -> anyhow::Result<()> { + let client = MetaGrpcClient::try_create( + vec![addr], + "root", + "xxx", + None, + Some(Duration::from_secs(10)), + None, + )?; + + let _ = client.transaction(txn).await; + + Ok(()) +} + async fn test_watch_main( addr: String, watch: WatchRequest, @@ -85,6 +110,43 @@ async fn test_watch_main( Ok(()) } +async fn test_watch_txn_main( + addr: String, + watch: WatchRequest, + mut watch_events: Vec, + txn: TxnRequest, +) -> anyhow::Result<()> { + let client = MetaGrpcClient::try_create( + vec![addr.clone()], + "root", + "xxx", + None, + Some(Duration::from_secs(10)), + None, + )?; + + let mut client_stream = client.request(watch).await?; + + let _h = tokio::spawn(txn_client_main(addr, txn)); + + loop { + if let Ok(Some(resp)) = client_stream.message().await { + if let Some(event) = resp.event { + assert!(!watch_events.is_empty()); + + assert_eq!(watch_events.get(0), Some(&event)); + watch_events.remove(0); + + if watch_events.is_empty() { + break; + } + } + } + } + + Ok(()) +} + #[async_entry::test(worker_threads = 3, init = "init_meta_ut!()", tracing_span = "debug")] async fn test_watch() -> anyhow::Result<()> { // - Start a metasrv server. @@ -211,6 +273,124 @@ async fn test_watch() -> anyhow::Result<()> { ]; test_watch_main(addr.clone(), watch, watch_events, updates).await?; } + // 3. test watch transaction + { + // first construct test kv + let delete_key = "watch_delete_key"; + let watch_delete_by_prefix_key = "watch_delete_by_prefix_key"; + + { + let client = MetaGrpcClient::try_create( + vec![addr.clone()], + "root", + "xxx", + None, + Some(Duration::from_secs(10)), + None, + )?; + + let updates = vec![ + UpsertKVReq::new( + delete_key, + MatchSeq::Any, + Operation::Update(delete_key.as_bytes().to_vec()), + None, + ), + UpsertKVReq::new( + watch_delete_by_prefix_key, + MatchSeq::Any, + Operation::Update(watch_delete_by_prefix_key.as_bytes().to_vec()), + None, + ), + ]; + + for update in updates { + let _ = client.upsert_kv(update.clone()).await; + } + } + seq += 2; + + let watch_prefix = "watch"; + + let k1 = "watch_txn_key"; + + let txn_key = k1.to_string(); + let txn_val = "txn_val".as_bytes().to_vec(); + + let (start, end) = get_start_and_end_of_prefix(watch_prefix); + + let watch = WatchRequest { + key: start, + key_end: Some(end), + filter_type: FilterType::All.into(), + }; + + let conditions = vec![TxnCondition { + key: txn_key.clone(), + expected: ConditionResult::Eq as i32, + target: Some(txn_condition::Target::Seq(0)), + }]; + + let if_then: Vec = vec![ + TxnOp { + request: Some(txn_op::Request::Put(TxnPutRequest { + key: txn_key.clone(), + value: txn_val.clone(), + prev_value: true, + })), + }, + TxnOp { + request: Some(txn_op::Request::Delete(TxnDeleteRequest { + key: delete_key.to_string(), + prev_value: true, + })), + }, + TxnOp { + request: Some(txn_op::Request::DeleteByPrefix(TxnDeleteByPrefixRequest { + prefix: watch_delete_by_prefix_key.to_string(), + })), + }, + ]; + + let else_then: Vec = vec![]; + + let txn = TxnRequest { + condition: conditions, + if_then, + else_then, + }; + + seq += 1; + + let watch_events = vec![ + Event { + key: txn_key.clone(), + current: Some(SeqV { + seq: seq + 2, + data: txn_val, + }), + prev: None, + }, + Event { + key: delete_key.to_string(), + prev: Some(SeqV { + seq: seq, + data: delete_key.as_bytes().to_vec(), + }), + current: None, + }, + Event { + key: watch_delete_by_prefix_key.to_string(), + prev: Some(SeqV { + seq: seq + 1, + data: watch_delete_by_prefix_key.as_bytes().to_vec(), + }), + current: None, + }, + ]; + + test_watch_txn_main(addr.clone(), watch, watch_events, txn).await?; + } Ok(()) } From 79b17ef423562027215401fd1bc2adc7a327a20f Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 11:32:16 +0800 Subject: [PATCH 2/9] fix: add watch txn unit test --- common/meta/api/src/kv_api.rs | 11 +++++++++++ common/meta/api/src/lib.rs | 2 +- common/meta/api/src/schema_api_keys.rs | 11 ----------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index 2debc264da23..f848c03465d6 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -35,6 +35,17 @@ pub trait ApiBuilder: Clone { async fn build_cluster(&self) -> Vec; } +fn prefix_of_string(s: &str) -> String { + let l = s.len(); + let a = s.chars().nth(l - 1).unwrap(); + return format!("{}{}", s[..l - 2].to_string(), (a as u8 + 1) as char); +} + +// return watch prefix (start, end) tuple +pub fn get_start_and_end_of_prefix(prefix: &str) -> (String, String) { + return (prefix.to_string(), prefix_of_string(prefix)); +} + #[async_trait] pub trait KVApi: Send + Sync { async fn upsert_kv(&self, req: UpsertKVReq) -> Result; diff --git a/common/meta/api/src/lib.rs b/common/meta/api/src/lib.rs index fd2837a26705..fe97e9f52b5b 100644 --- a/common/meta/api/src/lib.rs +++ b/common/meta/api/src/lib.rs @@ -23,6 +23,7 @@ mod schema_api_impl; mod schema_api_keys; mod schema_api_test_suite; +pub use kv_api::get_start_and_end_of_prefix; pub use kv_api::ApiBuilder; pub use kv_api::AsKVApi; pub use kv_api::KVApi; @@ -35,7 +36,6 @@ pub(crate) use schema_api_impl::serialize_struct; pub use schema_api_impl::txn_cond_seq; pub use schema_api_impl::txn_op_del; pub use schema_api_impl::txn_op_put; -pub use schema_api_keys::get_start_and_end_of_prefix; pub use schema_api_keys::DatabaseIdGen; pub use schema_api_keys::TableIdGen; pub use schema_api_test_suite::SchemaApiTestSuite; diff --git a/common/meta/api/src/schema_api_keys.rs b/common/meta/api/src/schema_api_keys.rs index d925405b7514..9590769dbbbb 100644 --- a/common/meta/api/src/schema_api_keys.rs +++ b/common/meta/api/src/schema_api_keys.rs @@ -276,14 +276,3 @@ impl KVApiKey for CountTablesKey { Ok(CountTablesKey { tenant }) } } - -fn prefix_of_string(s: &str) -> String { - let l = s.len(); - let a = s.chars().nth(l - 1).unwrap(); - return format!("{}{}", s[..l - 2].to_string(), (a as u8 + 1) as char); -} - -// return watch prefix (start, end) tuple -pub fn get_start_and_end_of_prefix(prefix: &str) -> (String, String) { - return (prefix.to_string(), prefix_of_string(prefix)); -} From 19de25241095b0a3903dd0ca9f20f0ab60747db6 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 11:33:58 +0800 Subject: [PATCH 3/9] fix: add watch txn unit test --- common/meta/api/src/kv_api.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index f848c03465d6..87e540754dd0 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -38,12 +38,12 @@ pub trait ApiBuilder: Clone { fn prefix_of_string(s: &str) -> String { let l = s.len(); let a = s.chars().nth(l - 1).unwrap(); - return format!("{}{}", s[..l - 2].to_string(), (a as u8 + 1) as char); + format!("{}{}", s[..l - 2].to_string(), (a as u8 + 1) as char) } // return watch prefix (start, end) tuple pub fn get_start_and_end_of_prefix(prefix: &str) -> (String, String) { - return (prefix.to_string(), prefix_of_string(prefix)); + (prefix.to_string(), prefix_of_string(prefix)) } #[async_trait] From f2c88e8940a529205553de8dface21418eed26ff Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 14:51:53 +0800 Subject: [PATCH 4/9] fix: add watch txn unit test --- common/meta/api/src/kv_api.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index 87e540754dd0..13755f0b66f7 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -36,9 +36,21 @@ pub trait ApiBuilder: Clone { } fn prefix_of_string(s: &str) -> String { - let l = s.len(); - let a = s.chars().nth(l - 1).unwrap(); - format!("{}{}", s[..l - 2].to_string(), (a as u8 + 1) as char) + let mut ret = s.to_string(); + let mut l = s.len(); + let bytes = s.as_bytes(); + while l > 0 { + l -= 1; + let a = bytes[l]; + if a == 255 { + continue; + } + unsafe { + ret.as_mut_str().as_bytes_mut()[l] = a + 1; + } + return ret; + } + format!("{}{}", 255 as char, s) } // return watch prefix (start, end) tuple From 8702de9f9f16cbb20cae8f079cdc7e5367faef31 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 14:52:35 +0800 Subject: [PATCH 5/9] fix: add watch txn unit test --- common/meta/api/src/kv_api.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index 13755f0b66f7..d799c0b8fa30 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -50,7 +50,7 @@ fn prefix_of_string(s: &str) -> String { } return ret; } - format!("{}{}", 255 as char, s) + format!("{}{}", s, 255 as char) } // return watch prefix (start, end) tuple From 5f80023e7424e3ed25ba22ca85fd2241b252fca2 Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 21:36:11 +0800 Subject: [PATCH 6/9] fix: add get_start_and_end_of_prefix,prefix_of_string unit test --- common/base/src/base/mod.rs | 2 + common/base/src/base/string_func.rs | 31 ++++++++++++++++ common/base/tests/it/range_map_test.rs | 41 +++++++++++++++++++++ common/base/tests/it/string_func.rs | 29 +++++++++++++++ common/meta/api/src/kv_api.rs | 19 +--------- metasrv/tests/it/grpc/metasrv_grpc_watch.rs | 23 +++++++++++- 6 files changed, 126 insertions(+), 19 deletions(-) diff --git a/common/base/src/base/mod.rs b/common/base/src/base/mod.rs index 38367af68bcd..150aaf26d3d6 100644 --- a/common/base/src/base/mod.rs +++ b/common/base/src/base/mod.rs @@ -42,6 +42,8 @@ pub use stop_handle::StopHandle; pub use stoppable::Stoppable; pub use string_func::escape_for_key; pub use string_func::mask_string; +pub use string_func::prefix_of_string; +pub use string_func::replace_nth_char; pub use string_func::unescape_for_key; pub use thread::Thread; pub use tokio; diff --git a/common/base/src/base/string_func.rs b/common/base/src/base/string_func.rs index 53b6684a542d..237b8f03f1fc 100644 --- a/common/base/src/base/string_func.rs +++ b/common/base/src/base/string_func.rs @@ -103,3 +103,34 @@ pub fn mask_string(s: &str, unmask_len: usize) -> String { ret } } + +/// Replace idx-th char as new char +/// If idx is out of len(s) range, then no replacement is performed. +/// replace_nth_char("a13", 1, '2') -> 'a23' +/// replace_nth_char("a13", 10, '2') -> 'a13' +pub fn replace_nth_char(s: &str, idx: usize, newchar: char) -> String { + s.chars() + .enumerate() + .map(|(i, c)| if i == idx { newchar } else { c }) + .collect() +} + +/// Return prefix of string. +/// "a" -> "b" +/// "1" -> "2" +/// [96,97,127] -> [96,98,127] +/// [127] -> [127, 127] +/// [127,127,127, 127] -> [127,127,127, 127, 127] +pub fn prefix_of_string(s: &str) -> String { + let mut l = s.len(); + while l > 0 { + l -= 1; + if let Some(c) = s.chars().nth(l) { + if c == 127 as char { + continue; + } + return replace_nth_char(s, l, (c as u8 + 1) as char); + } + } + format!("{}{}", s, 127 as char) +} diff --git a/common/base/tests/it/range_map_test.rs b/common/base/tests/it/range_map_test.rs index 4c6632724ad9..aff562ec15a9 100644 --- a/common/base/tests/it/range_map_test.rs +++ b/common/base/tests/it/range_map_test.rs @@ -67,4 +67,45 @@ fn test_range_set() { assert_eq!(a.get_by_point(&a1), vec![r11]); assert_eq!(a.get_by_point(&a2), vec![r24, r26]); } + // test get_by_point for string prefix + { + let mut a = RangeMap::new(); + + let a1 = "11".to_string(); + let a2 = "12".to_string(); + + a.insert(a1.clone()..a2.clone(), 11, 11); + assert!(a.get_by_point(&"11".to_string()).len() > 0); + assert!(a.get_by_point(&"111".to_string()).len() > 0); + assert!(a.get_by_point(&"11z".to_string()).len() > 0); + assert!(a.get_by_point(&"11/".to_string()).len() > 0); + assert!(a.get_by_point(&"11*".to_string()).len() > 0); + assert!(a.get_by_point(&"12".to_string()).len() == 0); + } + // test get_by_point for char upbound limit string prefix + { + let mut a = RangeMap::new(); + + let a1 = format!("{}", 255 as char); + let a2 = format!("{}{}", 255 as char, 255 as char); + + a.insert(a1.clone()..a2.clone(), 11, 11); + assert!(a.get_by_point(&format!("{}", 255 as char)).len() > 0); + assert!(a.get_by_point(&format!("{}z", 255 as char)).len() > 0); + assert!(a.get_by_point(&format!("{}/", 255 as char)).len() > 0); + assert!(a.get_by_point(&format!("{}*", 255 as char)).len() > 0); + } + // test get_by_point for char upbound limit string prefix + { + let mut a = RangeMap::new(); + + let a1 = "1".to_string(); + let a2 = format!("{}{}", a1, 255 as char); + + a.insert(a1.clone()..a2.clone(), 11, 11); + assert!(a.get_by_point(&a1).len() > 0); + assert!(a.get_by_point(&format!("{}z", a1)).len() > 0); + assert!(a.get_by_point(&format!("{}*", a1)).len() > 0); + assert!(a.get_by_point(&format!("{}/", a1)).len() > 0); + } } diff --git a/common/base/tests/it/string_func.rs b/common/base/tests/it/string_func.rs index e6183ffd6ef4..3a63468efa0c 100644 --- a/common/base/tests/it/string_func.rs +++ b/common/base/tests/it/string_func.rs @@ -36,3 +36,32 @@ fn mask_string_test() { assert_eq!(mask_string("string", 3), "******ing".to_string()); assert_eq!(mask_string("string", 20), "string".to_string()); } + +#[test] +fn prefix_of_string_test() { + assert_eq!("b".to_string(), prefix_of_string("a")); + assert_eq!("2".to_string(), prefix_of_string("1")); + assert_eq!( + "__fd_table_by_ie".to_string(), + prefix_of_string("__fd_table_by_id") + ); + { + let str = 127 as char; + let s = str.to_string(); + let ret = prefix_of_string(&s); + for byte in ret.as_bytes() { + assert_eq!(*byte, 127 as u8); + } + } + { + let s = format!("ab{}", 127 as char); + let ret = prefix_of_string(&s); + assert_eq!(ret, format!("ac{}", 127 as char)); + } +} + +#[test] +fn replace_nth_char_test() { + assert_eq!("a23".to_string(), replace_nth_char("a13", 1, '2')); + assert_eq!("a13".to_string(), replace_nth_char("a13", 10, '2')); +} diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index d799c0b8fa30..20ff50dd3a1c 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -16,6 +16,7 @@ use std::ops::Deref; use async_trait::async_trait; +use common_base::base::prefix_of_string; use common_meta_types::GetKVReply; use common_meta_types::ListKVReply; use common_meta_types::MGetKVReply; @@ -35,24 +36,6 @@ pub trait ApiBuilder: Clone { async fn build_cluster(&self) -> Vec; } -fn prefix_of_string(s: &str) -> String { - let mut ret = s.to_string(); - let mut l = s.len(); - let bytes = s.as_bytes(); - while l > 0 { - l -= 1; - let a = bytes[l]; - if a == 255 { - continue; - } - unsafe { - ret.as_mut_str().as_bytes_mut()[l] = a + 1; - } - return ret; - } - format!("{}{}", s, 255 as char) -} - // return watch prefix (start, end) tuple pub fn get_start_and_end_of_prefix(prefix: &str) -> (String, String) { (prefix.to_string(), prefix_of_string(prefix)) diff --git a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs index c97163dc6a14..a4b9a362d088 100644 --- a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs +++ b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs @@ -374,7 +374,7 @@ async fn test_watch() -> anyhow::Result<()> { Event { key: delete_key.to_string(), prev: Some(SeqV { - seq: seq, + seq, data: delete_key.as_bytes().to_vec(), }), current: None, @@ -394,3 +394,24 @@ async fn test_watch() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn test_get_start_and_end_of_prefix() -> anyhow::Result<()> { + assert_eq!( + ("aa".to_string(), "ab".to_string()), + get_start_and_end_of_prefix("aa") + ); + assert_eq!( + ("a1".to_string(), "a2".to_string()), + get_start_and_end_of_prefix("a1") + ); + { + let str = 127 as char; + let s = str.to_string(); + let (_, end) = get_start_and_end_of_prefix(&s); + for byte in end.as_bytes() { + assert_eq!(*byte, 127 as u8); + } + } + Ok(()) +} From f1a0ea68135a639c2cab576cf1c6389382da27be Mon Sep 17 00:00:00 2001 From: lichuang Date: Thu, 7 Jul 2022 22:12:44 +0800 Subject: [PATCH 7/9] make clippy happy --- common/base/tests/it/range_map_test.rs | 34 ++++++++++----------- common/base/tests/it/string_func.rs | 2 +- metasrv/tests/it/grpc/metasrv_grpc_watch.rs | 2 +- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/common/base/tests/it/range_map_test.rs b/common/base/tests/it/range_map_test.rs index aff562ec15a9..5281d93c604d 100644 --- a/common/base/tests/it/range_map_test.rs +++ b/common/base/tests/it/range_map_test.rs @@ -74,13 +74,13 @@ fn test_range_set() { let a1 = "11".to_string(); let a2 = "12".to_string(); - a.insert(a1.clone()..a2.clone(), 11, 11); - assert!(a.get_by_point(&"11".to_string()).len() > 0); - assert!(a.get_by_point(&"111".to_string()).len() > 0); - assert!(a.get_by_point(&"11z".to_string()).len() > 0); - assert!(a.get_by_point(&"11/".to_string()).len() > 0); - assert!(a.get_by_point(&"11*".to_string()).len() > 0); - assert!(a.get_by_point(&"12".to_string()).len() == 0); + a.insert(a1..a2, 11, 11); + assert!(!a.get_by_point(&"11".to_string()).is_empty()); + assert!(!a.get_by_point(&"111".to_string()).is_empty()); + assert!(!a.get_by_point(&"11z".to_string()).is_empty()); + assert!(!a.get_by_point(&"11/".to_string()).is_empty()); + assert!(!a.get_by_point(&"11*".to_string()).is_empty()); + assert!(a.get_by_point(&"12".to_string()).is_empty()); } // test get_by_point for char upbound limit string prefix { @@ -89,11 +89,11 @@ fn test_range_set() { let a1 = format!("{}", 255 as char); let a2 = format!("{}{}", 255 as char, 255 as char); - a.insert(a1.clone()..a2.clone(), 11, 11); - assert!(a.get_by_point(&format!("{}", 255 as char)).len() > 0); - assert!(a.get_by_point(&format!("{}z", 255 as char)).len() > 0); - assert!(a.get_by_point(&format!("{}/", 255 as char)).len() > 0); - assert!(a.get_by_point(&format!("{}*", 255 as char)).len() > 0); + a.insert(a1..a2, 11, 11); + assert!(!a.get_by_point(&format!("{}", 255 as char)).is_empty()); + assert!(!a.get_by_point(&format!("{}z", 255 as char)).is_empty()); + assert!(!a.get_by_point(&format!("{}/", 255 as char)).is_empty()); + assert!(!a.get_by_point(&format!("{}*", 255 as char)).is_empty()); } // test get_by_point for char upbound limit string prefix { @@ -102,10 +102,10 @@ fn test_range_set() { let a1 = "1".to_string(); let a2 = format!("{}{}", a1, 255 as char); - a.insert(a1.clone()..a2.clone(), 11, 11); - assert!(a.get_by_point(&a1).len() > 0); - assert!(a.get_by_point(&format!("{}z", a1)).len() > 0); - assert!(a.get_by_point(&format!("{}*", a1)).len() > 0); - assert!(a.get_by_point(&format!("{}/", a1)).len() > 0); + a.insert(a1.clone()..a2, 11, 11); + assert!(!a.get_by_point(&a1).is_empty()); + assert!(!a.get_by_point(&format!("{}z", a1)).is_empty()); + assert!(!a.get_by_point(&format!("{}*", a1)).is_empty()); + assert!(!a.get_by_point(&format!("{}/", a1)).is_empty()); } } diff --git a/common/base/tests/it/string_func.rs b/common/base/tests/it/string_func.rs index 3a63468efa0c..8b0a8f2037a1 100644 --- a/common/base/tests/it/string_func.rs +++ b/common/base/tests/it/string_func.rs @@ -50,7 +50,7 @@ fn prefix_of_string_test() { let s = str.to_string(); let ret = prefix_of_string(&s); for byte in ret.as_bytes() { - assert_eq!(*byte, 127 as u8); + assert_eq!(*byte, 127_u8); } } { diff --git a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs index a4b9a362d088..54300529d050 100644 --- a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs +++ b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs @@ -410,7 +410,7 @@ fn test_get_start_and_end_of_prefix() -> anyhow::Result<()> { let s = str.to_string(); let (_, end) = get_start_and_end_of_prefix(&s); for byte in end.as_bytes() { - assert_eq!(*byte, 127 as u8); + assert_eq!(*byte, 127_u8); } } Ok(()) From 673e8f4f127aea6ae16110de29dae0a373d18e03 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 8 Jul 2022 11:34:40 +0800 Subject: [PATCH 8/9] prefix only support ascii code --- common/base/src/base/string_func.rs | 18 +++++++++--- common/base/tests/it/string_func.rs | 31 +++++++++++++++++---- common/exception/src/exception_code.rs | 2 ++ common/meta/api/src/kv_api.rs | 6 ++-- metasrv/tests/it/grpc/metasrv_grpc_watch.rs | 28 +++++++++++++++---- 5 files changed, 66 insertions(+), 19 deletions(-) diff --git a/common/base/src/base/string_func.rs b/common/base/src/base/string_func.rs index 237b8f03f1fc..9e4a0d178580 100644 --- a/common/base/src/base/string_func.rs +++ b/common/base/src/base/string_func.rs @@ -14,6 +14,8 @@ use std::string::FromUtf8Error; +use common_exception::ErrorCode; + /// Function that escapes special characters in a string. /// /// All characters except digit, alphabet and '_' are treated as special characters. @@ -115,13 +117,21 @@ pub fn replace_nth_char(s: &str, idx: usize, newchar: char) -> String { .collect() } -/// Return prefix of string. +/// Return prefix of string(only support ASCII char). /// "a" -> "b" /// "1" -> "2" /// [96,97,127] -> [96,98,127] /// [127] -> [127, 127] /// [127,127,127, 127] -> [127,127,127, 127, 127] -pub fn prefix_of_string(s: &str) -> String { +pub fn prefix_of_string(s: &str) -> common_exception::Result { + for c in s.chars() { + if !c.is_ascii() { + return common_exception::Result::Err(ErrorCode::OnlySupportAsciiChars(format!( + "Only support ASCII characters: {}", + c + ))); + } + } let mut l = s.len(); while l > 0 { l -= 1; @@ -129,8 +139,8 @@ pub fn prefix_of_string(s: &str) -> String { if c == 127 as char { continue; } - return replace_nth_char(s, l, (c as u8 + 1) as char); + return Ok(replace_nth_char(s, l, (c as u8 + 1) as char)); } } - format!("{}{}", s, 127 as char) + Ok(format!("{}{}", s, 127 as char)) } diff --git a/common/base/tests/it/string_func.rs b/common/base/tests/it/string_func.rs index 8b0a8f2037a1..9a5fe31ed2e3 100644 --- a/common/base/tests/it/string_func.rs +++ b/common/base/tests/it/string_func.rs @@ -38,26 +38,45 @@ fn mask_string_test() { } #[test] -fn prefix_of_string_test() { - assert_eq!("b".to_string(), prefix_of_string("a")); - assert_eq!("2".to_string(), prefix_of_string("1")); +fn prefix_of_string_test() -> common_exception::Result<()> { + assert_eq!("b".to_string(), prefix_of_string("a")?); + assert_eq!("2".to_string(), prefix_of_string("1")?); assert_eq!( "__fd_table_by_ie".to_string(), - prefix_of_string("__fd_table_by_id") + prefix_of_string("__fd_table_by_id")? ); { let str = 127 as char; let s = str.to_string(); - let ret = prefix_of_string(&s); + let ret = prefix_of_string(&s)?; for byte in ret.as_bytes() { assert_eq!(*byte, 127_u8); } } { let s = format!("ab{}", 127 as char); - let ret = prefix_of_string(&s); + let ret = prefix_of_string(&s)?; assert_eq!(ret, format!("ac{}", 127 as char)); } + { + let s = "我".to_string(); + let ret = prefix_of_string(&s); + match ret { + Err(e) => { + assert_eq!( + e.to_string(), + common_exception::ErrorCode::OnlySupportAsciiChars(format!( + "Only support ASCII characters: {}", + "我" + )) + .to_string() + ); + } + Ok(_) => panic!("MUST return error "), + } + } + + Ok(()) } #[test] diff --git a/common/exception/src/exception_code.rs b/common/exception/src/exception_code.rs index d7a4ae0311af..9a7833a9f709 100644 --- a/common/exception/src/exception_code.rs +++ b/common/exception/src/exception_code.rs @@ -225,11 +225,13 @@ build_exceptions! { // Variable error codes. UnknownVariable(2801), + OnlySupportAsciiChars(2802), // Tenant quota error codes. IllegalTenantQuotaFormat(2901), TenantQuotaUnknown(2902), TenantQuotaExceeded(2903), + } // Storage errors [3001, 4000]. diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index 20ff50dd3a1c..57e93f91e8ad 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -36,9 +36,9 @@ pub trait ApiBuilder: Clone { async fn build_cluster(&self) -> Vec; } -// return watch prefix (start, end) tuple -pub fn get_start_and_end_of_prefix(prefix: &str) -> (String, String) { - (prefix.to_string(), prefix_of_string(prefix)) +// return watch prefix (start, end) tuple(only support ASCII characters) +pub fn get_start_and_end_of_prefix(prefix: &str) -> common_exception::Result<(String, String)> { + Ok((prefix.to_string(), prefix_of_string(prefix)?)) } #[async_trait] diff --git a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs index 54300529d050..eee58ffd9cda 100644 --- a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs +++ b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs @@ -148,7 +148,7 @@ async fn test_watch_txn_main( } #[async_entry::test(worker_threads = 3, init = "init_meta_ut!()", tracing_span = "debug")] -async fn test_watch() -> anyhow::Result<()> { +async fn test_watch() -> common_exception::Result<()> { // - Start a metasrv server. // - Watch some key. // - Write some data. @@ -317,7 +317,7 @@ async fn test_watch() -> anyhow::Result<()> { let txn_key = k1.to_string(); let txn_val = "txn_val".as_bytes().to_vec(); - let (start, end) = get_start_and_end_of_prefix(watch_prefix); + let (start, end) = get_start_and_end_of_prefix(watch_prefix)?; let watch = WatchRequest { key: start, @@ -396,22 +396,38 @@ async fn test_watch() -> anyhow::Result<()> { } #[test] -fn test_get_start_and_end_of_prefix() -> anyhow::Result<()> { +fn test_get_start_and_end_of_prefix() -> common_exception::Result<()> { assert_eq!( ("aa".to_string(), "ab".to_string()), - get_start_and_end_of_prefix("aa") + get_start_and_end_of_prefix("aa")? ); assert_eq!( ("a1".to_string(), "a2".to_string()), - get_start_and_end_of_prefix("a1") + get_start_and_end_of_prefix("a1")? ); { let str = 127 as char; let s = str.to_string(); - let (_, end) = get_start_and_end_of_prefix(&s); + let (_, end) = get_start_and_end_of_prefix(&s)?; for byte in end.as_bytes() { assert_eq!(*byte, 127_u8); } } + { + let ret = get_start_and_end_of_prefix("我"); + match ret { + Err(e) => { + assert_eq!( + e.to_string(), + common_exception::ErrorCode::OnlySupportAsciiChars(format!( + "Only support ASCII characters: {}", + "我" + )) + .to_string() + ); + } + Ok(_) => panic!("MUST return error "), + } + } Ok(()) } From 11e12473820ca23b1b1ee181578ddd207fed0522 Mon Sep 17 00:00:00 2001 From: lichuang Date: Fri, 8 Jul 2022 14:56:30 +0800 Subject: [PATCH 9/9] mv prefix_of_string to kv_api --- common/base/src/base/mod.rs | 1 - common/base/src/base/string_func.rs | 30 -------------- common/base/tests/it/string_func.rs | 42 -------------------- common/meta/api/src/kv_api.rs | 31 ++++++++++++++- common/meta/api/src/lib.rs | 1 + common/meta/types/proto/meta.proto | 6 ++- metasrv/tests/it/grpc/metasrv_grpc_watch.rs | 43 +++++++++++++++++++++ 7 files changed, 78 insertions(+), 76 deletions(-) diff --git a/common/base/src/base/mod.rs b/common/base/src/base/mod.rs index 150aaf26d3d6..aa6b8df27cee 100644 --- a/common/base/src/base/mod.rs +++ b/common/base/src/base/mod.rs @@ -42,7 +42,6 @@ pub use stop_handle::StopHandle; pub use stoppable::Stoppable; pub use string_func::escape_for_key; pub use string_func::mask_string; -pub use string_func::prefix_of_string; pub use string_func::replace_nth_char; pub use string_func::unescape_for_key; pub use thread::Thread; diff --git a/common/base/src/base/string_func.rs b/common/base/src/base/string_func.rs index 9e4a0d178580..c36ed9f1070f 100644 --- a/common/base/src/base/string_func.rs +++ b/common/base/src/base/string_func.rs @@ -14,8 +14,6 @@ use std::string::FromUtf8Error; -use common_exception::ErrorCode; - /// Function that escapes special characters in a string. /// /// All characters except digit, alphabet and '_' are treated as special characters. @@ -116,31 +114,3 @@ pub fn replace_nth_char(s: &str, idx: usize, newchar: char) -> String { .map(|(i, c)| if i == idx { newchar } else { c }) .collect() } - -/// Return prefix of string(only support ASCII char). -/// "a" -> "b" -/// "1" -> "2" -/// [96,97,127] -> [96,98,127] -/// [127] -> [127, 127] -/// [127,127,127, 127] -> [127,127,127, 127, 127] -pub fn prefix_of_string(s: &str) -> common_exception::Result { - for c in s.chars() { - if !c.is_ascii() { - return common_exception::Result::Err(ErrorCode::OnlySupportAsciiChars(format!( - "Only support ASCII characters: {}", - c - ))); - } - } - let mut l = s.len(); - while l > 0 { - l -= 1; - if let Some(c) = s.chars().nth(l) { - if c == 127 as char { - continue; - } - return Ok(replace_nth_char(s, l, (c as u8 + 1) as char)); - } - } - Ok(format!("{}{}", s, 127 as char)) -} diff --git a/common/base/tests/it/string_func.rs b/common/base/tests/it/string_func.rs index 9a5fe31ed2e3..cf1ffaeadd79 100644 --- a/common/base/tests/it/string_func.rs +++ b/common/base/tests/it/string_func.rs @@ -37,48 +37,6 @@ fn mask_string_test() { assert_eq!(mask_string("string", 20), "string".to_string()); } -#[test] -fn prefix_of_string_test() -> common_exception::Result<()> { - assert_eq!("b".to_string(), prefix_of_string("a")?); - assert_eq!("2".to_string(), prefix_of_string("1")?); - assert_eq!( - "__fd_table_by_ie".to_string(), - prefix_of_string("__fd_table_by_id")? - ); - { - let str = 127 as char; - let s = str.to_string(); - let ret = prefix_of_string(&s)?; - for byte in ret.as_bytes() { - assert_eq!(*byte, 127_u8); - } - } - { - let s = format!("ab{}", 127 as char); - let ret = prefix_of_string(&s)?; - assert_eq!(ret, format!("ac{}", 127 as char)); - } - { - let s = "我".to_string(); - let ret = prefix_of_string(&s); - match ret { - Err(e) => { - assert_eq!( - e.to_string(), - common_exception::ErrorCode::OnlySupportAsciiChars(format!( - "Only support ASCII characters: {}", - "我" - )) - .to_string() - ); - } - Ok(_) => panic!("MUST return error "), - } - } - - Ok(()) -} - #[test] fn replace_nth_char_test() { assert_eq!("a23".to_string(), replace_nth_char("a13", 1, '2')); diff --git a/common/meta/api/src/kv_api.rs b/common/meta/api/src/kv_api.rs index 57e93f91e8ad..c10485a2ac38 100644 --- a/common/meta/api/src/kv_api.rs +++ b/common/meta/api/src/kv_api.rs @@ -16,7 +16,8 @@ use std::ops::Deref; use async_trait::async_trait; -use common_base::base::prefix_of_string; +use common_base::base::replace_nth_char; +use common_exception::ErrorCode; use common_meta_types::GetKVReply; use common_meta_types::ListKVReply; use common_meta_types::MGetKVReply; @@ -36,6 +37,34 @@ pub trait ApiBuilder: Clone { async fn build_cluster(&self) -> Vec; } +/// Return a string that bigger than all the string prefix with input string(only support ASCII char). +/// "a" -> "b" +/// "1" -> "2" +/// [96,97,127] -> [96,98,127] +/// [127] -> [127, 127] +/// [127,127,127, 127] -> [127,127,127, 127, 127] +pub fn prefix_of_string(s: &str) -> common_exception::Result { + for c in s.chars() { + if !c.is_ascii() { + return common_exception::Result::Err(ErrorCode::OnlySupportAsciiChars(format!( + "Only support ASCII characters: {}", + c + ))); + } + } + let mut l = s.len(); + while l > 0 { + l -= 1; + if let Some(c) = s.chars().nth(l) { + if c == 127 as char { + continue; + } + return Ok(replace_nth_char(s, l, (c as u8 + 1) as char)); + } + } + Ok(format!("{}{}", s, 127 as char)) +} + // return watch prefix (start, end) tuple(only support ASCII characters) pub fn get_start_and_end_of_prefix(prefix: &str) -> common_exception::Result<(String, String)> { Ok((prefix.to_string(), prefix_of_string(prefix)?)) diff --git a/common/meta/api/src/lib.rs b/common/meta/api/src/lib.rs index fe97e9f52b5b..bc54e19fb0b1 100644 --- a/common/meta/api/src/lib.rs +++ b/common/meta/api/src/lib.rs @@ -24,6 +24,7 @@ mod schema_api_keys; mod schema_api_test_suite; pub use kv_api::get_start_and_end_of_prefix; +pub use kv_api::prefix_of_string; pub use kv_api::ApiBuilder; pub use kv_api::AsKVApi; pub use kv_api::KVApi; diff --git a/common/meta/types/proto/meta.proto b/common/meta/types/proto/meta.proto index 3ae602d5028c..589b859849d3 100644 --- a/common/meta/types/proto/meta.proto +++ b/common/meta/types/proto/meta.proto @@ -48,8 +48,10 @@ message WatchRequest { // key is the key to register for watching. string key = 1; - // key_end is the end of the range [key, key_end) to watch. - // if key_end is None, then watch only key. + // `key_end`` is the end of the range [key, key_end) to watch. + // If key_end is None, then watch only key. + // If want to watch prefix of key, use `get_start_and_end_of_prefix` to + // generate [key, key_end). optional string key_end = 2; enum FilterType { diff --git a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs index eee58ffd9cda..6a83cc3d34df 100644 --- a/metasrv/tests/it/grpc/metasrv_grpc_watch.rs +++ b/metasrv/tests/it/grpc/metasrv_grpc_watch.rs @@ -16,6 +16,7 @@ use std::time::Duration; use common_base::base::tokio; use common_meta_api::get_start_and_end_of_prefix; +use common_meta_api::prefix_of_string; use common_meta_api::KVApi; use common_meta_grpc::MetaGrpcClient; use common_meta_types::protobuf::watch_request::FilterType; @@ -395,6 +396,48 @@ async fn test_watch() -> common_exception::Result<()> { Ok(()) } +#[test] +fn prefix_of_string_test() -> common_exception::Result<()> { + assert_eq!("b".to_string(), prefix_of_string("a")?); + assert_eq!("2".to_string(), prefix_of_string("1")?); + assert_eq!( + "__fd_table_by_ie".to_string(), + prefix_of_string("__fd_table_by_id")? + ); + { + let str = 127 as char; + let s = str.to_string(); + let ret = prefix_of_string(&s)?; + for byte in ret.as_bytes() { + assert_eq!(*byte, 127_u8); + } + } + { + let s = format!("ab{}", 127 as char); + let ret = prefix_of_string(&s)?; + assert_eq!(ret, format!("ac{}", 127 as char)); + } + { + let s = "我".to_string(); + let ret = prefix_of_string(&s); + match ret { + Err(e) => { + assert_eq!( + e.to_string(), + common_exception::ErrorCode::OnlySupportAsciiChars(format!( + "Only support ASCII characters: {}", + "我" + )) + .to_string() + ); + } + Ok(_) => panic!("MUST return error "), + } + } + + Ok(()) +} + #[test] fn test_get_start_and_end_of_prefix() -> common_exception::Result<()> { assert_eq!(