From 3a2060fb8ae378b80112daca00ba81f0da735651 Mon Sep 17 00:00:00 2001 From: andylokandy Date: Wed, 8 Dec 2021 00:16:22 +0800 Subject: [PATCH] add ttl for raw client Signed-off-by: andylokandy --- src/raw/client.rs | 44 +++++++++++++++++++++++--- src/raw/lowering.rs | 15 +++++++-- src/raw/requests.rs | 53 +++++++++++++++++++++++++++++--- src/request/plan.rs | 2 +- src/transaction/requests.rs | 8 ++--- tikv-client-store/src/errors.rs | 2 ++ tikv-client-store/src/request.rs | 5 +++ 7 files changed, 112 insertions(+), 17 deletions(-) diff --git a/src/raw/client.rs b/src/raw/client.rs index 523e955b..546f33e1 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -219,6 +219,17 @@ impl Client { .map(|r| r.into_iter().map(Into::into).collect()) } + pub async fn get_key_ttl_secs(&self, key: impl Into) -> Result> { + debug!(self.logger, "invoking raw get_key_ttl_secs request"); + let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone()); + let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) + .retry_multi_region(DEFAULT_REGION_BACKOFF) + .merge(CollectSingle) + .post_process_default() + .plan(); + plan.execute().await + } + /// Create a new 'put' request. /// /// Once resolved this request will result in the setting of the value associated with the given key. @@ -236,8 +247,23 @@ impl Client { /// # }); /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { - debug!(self.logger, "invoking raw put request"); - let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); + self.put_with_ttl(key, value, 0).await + } + + pub async fn put_with_ttl( + &self, + key: impl Into, + value: impl Into, + ttl_secs: u64, + ) -> Result<()> { + debug!(self.logger, "invoking raw put_with_ttl request"); + let request = new_raw_put_request( + key.into(), + value.into(), + self.cf.clone(), + ttl_secs, + self.atomic, + ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) .retry_multi_region(DEFAULT_REGION_BACKOFF) .merge(CollectSingle) @@ -268,9 +294,19 @@ impl Client { &self, pairs: impl IntoIterator>, ) -> Result<()> { - debug!(self.logger, "invoking raw batch_put request"); + self.batch_put_with_ttl(pairs.into_iter().map(|pair| (pair, 0))) + .await + } + + pub async fn batch_put_with_ttl( + &self, + pairs_with_ttls_secs: impl IntoIterator, u64)>, + ) -> Result<()> { + debug!(self.logger, "invoking raw batch_put_with_ttl request"); let request = new_raw_batch_put_request( - pairs.into_iter().map(Into::into), + pairs_with_ttls_secs + .into_iter() + .map(|(pair, ttl)| (pair.into(), ttl)), self.cf.clone(), self.atomic, ); diff --git a/src/raw/lowering.rs b/src/raw/lowering.rs index 32327a80..18590b1e 100644 --- a/src/raw/lowering.rs +++ b/src/raw/lowering.rs @@ -21,21 +21,30 @@ pub fn new_raw_batch_get_request( requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf) } +pub fn new_raw_get_key_ttl_request( + key: Key, + cf: Option, +) -> kvrpcpb::RawGetKeyTtlRequest { + requests::new_raw_get_key_ttl_request(key.into(), cf) +} + pub fn new_raw_put_request( key: Key, value: Value, cf: Option, + ttl: u64, atomic: bool, ) -> kvrpcpb::RawPutRequest { - requests::new_raw_put_request(key.into(), value, cf, atomic) + requests::new_raw_put_request(key.into(), value, cf, ttl, atomic) } pub fn new_raw_batch_put_request( - pairs: impl Iterator, + pairs_with_ttl: impl Iterator, cf: Option, atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { - requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic) + let (pairs, ttls) = pairs_with_ttl.map(|(pair, ttl)| (pair.into(), ttl)).unzip(); + requests::new_raw_batch_put_request(pairs, cf, ttls, atomic) } pub fn new_raw_delete_request( diff --git a/src/raw/requests.rs b/src/raw/requests.rs index bd678aaf..111e70d7 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -10,7 +10,7 @@ use tikv_client_store::Request; use super::RawRpcRequest; use crate::{ - collect_first, + collect_single, pd::PdClient, request::{ plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge, @@ -35,7 +35,7 @@ impl KvRequest for kvrpcpb::RawGetRequest { } shardable_key!(kvrpcpb::RawGetRequest); -collect_first!(kvrpcpb::RawGetResponse); +collect_single!(kvrpcpb::RawGetResponse); impl SingleKey for kvrpcpb::RawGetRequest { fn key(&self) -> &Vec { @@ -84,16 +84,55 @@ impl Merge for Collect { } } +pub fn new_raw_get_key_ttl_request( + key: Vec, + cf: Option, +) -> kvrpcpb::RawGetKeyTtlRequest { + let mut req = kvrpcpb::RawGetKeyTtlRequest::default(); + req.set_key(key); + req.maybe_set_cf(cf); + + req +} + +impl KvRequest for kvrpcpb::RawGetKeyTtlRequest { + type Response = kvrpcpb::RawGetKeyTtlResponse; +} + +shardable_key!(kvrpcpb::RawGetKeyTtlRequest); +collect_single!(kvrpcpb::RawGetKeyTtlResponse); + +impl SingleKey for kvrpcpb::RawGetKeyTtlRequest { + fn key(&self) -> &Vec { + &self.key + } +} + +impl Process for DefaultProcessor { + type Out = Option; + + fn process(&self, input: Result) -> Result { + let input = input?; + Ok(if input.not_found { + None + } else { + Some(input.ttl) + }) + } +} + pub fn new_raw_put_request( key: Vec, value: Vec, cf: Option, + ttl: u64, atomic: bool, ) -> kvrpcpb::RawPutRequest { let mut req = kvrpcpb::RawPutRequest::default(); req.set_key(key); req.set_value(value); req.maybe_set_cf(cf); + req.set_ttl(ttl); req.set_for_cas(atomic); req @@ -104,7 +143,7 @@ impl KvRequest for kvrpcpb::RawPutRequest { } shardable_key!(kvrpcpb::RawPutRequest); -collect_first!(kvrpcpb::RawPutResponse); +collect_single!(kvrpcpb::RawPutResponse); impl SingleKey for kvrpcpb::RawPutRequest { fn key(&self) -> &Vec { &self.key @@ -114,11 +153,13 @@ impl SingleKey for kvrpcpb::RawPutRequest { pub fn new_raw_batch_put_request( pairs: Vec, cf: Option, + ttls: Vec, atomic: bool, ) -> kvrpcpb::RawBatchPutRequest { let mut req = kvrpcpb::RawBatchPutRequest::default(); req.set_pairs(pairs); req.maybe_set_cf(cf); + req.set_ttls(ttls); req.set_for_cas(atomic); req @@ -168,7 +209,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest { } shardable_key!(kvrpcpb::RawDeleteRequest); -collect_first!(kvrpcpb::RawDeleteResponse); +collect_single!(kvrpcpb::RawDeleteResponse); impl SingleKey for kvrpcpb::RawDeleteRequest { fn key(&self) -> &Vec { &self.key @@ -314,7 +355,7 @@ impl KvRequest for kvrpcpb::RawCasRequest { } shardable_key!(kvrpcpb::RawCasRequest); -collect_first!(kvrpcpb::RawCasResponse); +collect_single!(kvrpcpb::RawCasResponse); impl SingleKey for kvrpcpb::RawCasRequest { fn key(&self) -> &Vec { &self.key @@ -445,6 +486,7 @@ macro_rules! impl_raw_rpc_request { impl_raw_rpc_request!(RawGetRequest); impl_raw_rpc_request!(RawBatchGetRequest); +impl_raw_rpc_request!(RawGetKeyTtlRequest); impl_raw_rpc_request!(RawPutRequest); impl_raw_rpc_request!(RawBatchPutRequest); impl_raw_rpc_request!(RawDeleteRequest); @@ -456,6 +498,7 @@ impl_raw_rpc_request!(RawCasRequest); impl HasLocks for kvrpcpb::RawGetResponse {} impl HasLocks for kvrpcpb::RawBatchGetResponse {} +impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {} impl HasLocks for kvrpcpb::RawPutResponse {} impl HasLocks for kvrpcpb::RawBatchPutResponse {} impl HasLocks for kvrpcpb::RawDeleteResponse {} diff --git a/src/request/plan.rs b/src/request/plan.rs index ed13f17f..a892bfba 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -303,7 +303,7 @@ pub struct Collect; pub struct CollectSingle; #[macro_export] -macro_rules! collect_first { +macro_rules! collect_single { ($type_: ty) => { impl Merge<$type_> for CollectSingle { type Out = $type_; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 8d421f5f..58903024 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,7 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use crate::{ - collect_first, + collect_single, pd::PdClient, request::{ Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process, @@ -74,7 +74,7 @@ impl KvRequest for kvrpcpb::GetRequest { } shardable_key!(kvrpcpb::GetRequest); -collect_first!(kvrpcpb::GetResponse); +collect_single!(kvrpcpb::GetResponse); impl SingleKey for kvrpcpb::GetRequest { fn key(&self) -> &Vec { &self.key @@ -183,7 +183,7 @@ impl KvRequest for kvrpcpb::CleanupRequest { } shardable_key!(kvrpcpb::CleanupRequest); -collect_first!(kvrpcpb::CleanupResponse); +collect_single!(kvrpcpb::CleanupResponse); impl SingleKey for kvrpcpb::CleanupRequest { fn key(&self) -> &Vec { &self.key @@ -489,7 +489,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest { } } -collect_first!(TxnHeartBeatResponse); +collect_single!(TxnHeartBeatResponse); impl SingleKey for kvrpcpb::TxnHeartBeatRequest { fn key(&self) -> &Vec { diff --git a/tikv-client-store/src/errors.rs b/tikv-client-store/src/errors.rs index c1e915ac..bc3da0d8 100644 --- a/tikv-client-store/src/errors.rs +++ b/tikv-client-store/src/errors.rs @@ -57,6 +57,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse); has_region_error!(kvrpcpb::GcResponse); has_region_error!(kvrpcpb::RawGetResponse); has_region_error!(kvrpcpb::RawBatchGetResponse); +has_region_error!(kvrpcpb::RawGetKeyTtlResponse); has_region_error!(kvrpcpb::RawPutResponse); has_region_error!(kvrpcpb::RawBatchPutResponse); has_region_error!(kvrpcpb::RawDeleteResponse); @@ -109,6 +110,7 @@ macro_rules! has_str_error { } has_str_error!(kvrpcpb::RawGetResponse); +has_str_error!(kvrpcpb::RawGetKeyTtlResponse); has_str_error!(kvrpcpb::RawPutResponse); has_str_error!(kvrpcpb::RawBatchPutResponse); has_str_error!(kvrpcpb::RawDeleteResponse); diff --git a/tikv-client-store/src/request.rs b/tikv-client-store/src/request.rs index ab3a4357..86849d54 100644 --- a/tikv-client-store/src/request.rs +++ b/tikv-client-store/src/request.rs @@ -47,6 +47,11 @@ macro_rules! impl_request { impl_request!(RawGetRequest, raw_get_async_opt, "raw_get"); impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get"); +impl_request!( + RawGetKeyTtlRequest, + raw_get_key_ttl_async_opt, + "raw_get_key_ttl" +); impl_request!(RawPutRequest, raw_put_async_opt, "raw_put"); impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put"); impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete");