Skip to content

Commit

Permalink
add ttl for raw client
Browse files Browse the repository at this point in the history
Signed-off-by: andylokandy <andylokandy@hotmail.com>
  • Loading branch information
andylokandy committed Dec 7, 2021
1 parent c045d1e commit 3a2060f
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 17 deletions.
44 changes: 40 additions & 4 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ impl<PdC: PdClient> Client<PdC> {
.map(|r| r.into_iter().map(Into::into).collect())
}

pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
debug!(self.logger, "invoking raw get_key_ttl_secs request");
let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
}

/// Create a new 'put' request.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
Expand All @@ -236,8 +247,23 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
self.put_with_ttl(key, value, 0).await
}

pub async fn put_with_ttl(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
ttl_secs: u64,
) -> Result<()> {
debug!(self.logger, "invoking raw put_with_ttl request");
let request = new_raw_put_request(
key.into(),
value.into(),
self.cf.clone(),
ttl_secs,
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.merge(CollectSingle)
Expand Down Expand Up @@ -268,9 +294,19 @@ impl<PdC: PdClient> Client<PdC> {
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
self.batch_put_with_ttl(pairs.into_iter().map(|pair| (pair, 0)))
.await
}

pub async fn batch_put_with_ttl(
&self,
pairs_with_ttls_secs: impl IntoIterator<Item = (impl Into<KvPair>, u64)>,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put_with_ttl request");
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
pairs_with_ttls_secs
.into_iter()
.map(|(pair, ttl)| (pair.into(), ttl)),
self.cf.clone(),
self.atomic,
);
Expand Down
15 changes: 12 additions & 3 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,30 @@ pub fn new_raw_batch_get_request(
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
}

pub fn new_raw_get_key_ttl_request(
key: Key,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
requests::new_raw_get_key_ttl_request(key.into(), cf)
}

pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf, atomic)
requests::new_raw_put_request(key.into(), value, cf, ttl, atomic)
}

pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
pairs_with_ttl: impl Iterator<Item = (KvPair, u64)>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
let (pairs, ttls) = pairs_with_ttl.map(|(pair, ttl)| (pair.into(), ttl)).unzip();
requests::new_raw_batch_put_request(pairs, cf, ttls, atomic)
}

pub fn new_raw_delete_request(
Expand Down
53 changes: 48 additions & 5 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tikv_client_store::Request;

use super::RawRpcRequest;
use crate::{
collect_first,
collect_single,
pd::PdClient,
request::{
plan::ResponseWithShard, Collect, CollectSingle, DefaultProcessor, KvRequest, Merge,
Expand All @@ -35,7 +35,7 @@ impl KvRequest for kvrpcpb::RawGetRequest {
}

shardable_key!(kvrpcpb::RawGetRequest);
collect_first!(kvrpcpb::RawGetResponse);
collect_single!(kvrpcpb::RawGetResponse);

impl SingleKey for kvrpcpb::RawGetRequest {
fn key(&self) -> &Vec<u8> {
Expand Down Expand Up @@ -84,16 +84,55 @@ impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
}
}

pub fn new_raw_get_key_ttl_request(
key: Vec<u8>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
req.set_key(key);
req.maybe_set_cf(cf);

req
}

impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
type Response = kvrpcpb::RawGetKeyTtlResponse;
}

shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
collect_single!(kvrpcpb::RawGetKeyTtlResponse);

impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
fn key(&self) -> &Vec<u8> {
&self.key
}
}

impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
type Out = Option<u64>;

fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
let input = input?;
Ok(if input.not_found {
None
} else {
Some(input.ttl)
})
}
}

pub fn new_raw_put_request(
key: Vec<u8>,
value: Vec<u8>,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.set_key(key);
req.set_value(value);
req.maybe_set_cf(cf);
req.set_ttl(ttl);
req.set_for_cas(atomic);

req
Expand All @@ -104,7 +143,7 @@ impl KvRequest for kvrpcpb::RawPutRequest {
}

shardable_key!(kvrpcpb::RawPutRequest);
collect_first!(kvrpcpb::RawPutResponse);
collect_single!(kvrpcpb::RawPutResponse);
impl SingleKey for kvrpcpb::RawPutRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand All @@ -114,11 +153,13 @@ impl SingleKey for kvrpcpb::RawPutRequest {
pub fn new_raw_batch_put_request(
pairs: Vec<kvrpcpb::KvPair>,
cf: Option<ColumnFamily>,
ttls: Vec<u64>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.set_pairs(pairs);
req.maybe_set_cf(cf);
req.set_ttls(ttls);
req.set_for_cas(atomic);

req
Expand Down Expand Up @@ -168,7 +209,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
}

shardable_key!(kvrpcpb::RawDeleteRequest);
collect_first!(kvrpcpb::RawDeleteResponse);
collect_single!(kvrpcpb::RawDeleteResponse);
impl SingleKey for kvrpcpb::RawDeleteRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -314,7 +355,7 @@ impl KvRequest for kvrpcpb::RawCasRequest {
}

shardable_key!(kvrpcpb::RawCasRequest);
collect_first!(kvrpcpb::RawCasResponse);
collect_single!(kvrpcpb::RawCasResponse);
impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -445,6 +486,7 @@ macro_rules! impl_raw_rpc_request {

impl_raw_rpc_request!(RawGetRequest);
impl_raw_rpc_request!(RawBatchGetRequest);
impl_raw_rpc_request!(RawGetKeyTtlRequest);
impl_raw_rpc_request!(RawPutRequest);
impl_raw_rpc_request!(RawBatchPutRequest);
impl_raw_rpc_request!(RawDeleteRequest);
Expand All @@ -456,6 +498,7 @@ impl_raw_rpc_request!(RawCasRequest);

impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
impl HasLocks for kvrpcpb::RawPutResponse {}
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
impl HasLocks for kvrpcpb::RawDeleteResponse {}
Expand Down
2 changes: 1 addition & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub struct Collect;
pub struct CollectSingle;

#[macro_export]
macro_rules! collect_first {
macro_rules! collect_single {
($type_: ty) => {
impl Merge<$type_> for CollectSingle {
type Out = $type_;
Expand Down
8 changes: 4 additions & 4 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use crate::{
collect_first,
collect_single,
pd::PdClient,
request::{
Collect, CollectSingle, CollectWithShard, DefaultProcessor, KvRequest, Merge, Process,
Expand Down Expand Up @@ -74,7 +74,7 @@ impl KvRequest for kvrpcpb::GetRequest {
}

shardable_key!(kvrpcpb::GetRequest);
collect_first!(kvrpcpb::GetResponse);
collect_single!(kvrpcpb::GetResponse);
impl SingleKey for kvrpcpb::GetRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -183,7 +183,7 @@ impl KvRequest for kvrpcpb::CleanupRequest {
}

shardable_key!(kvrpcpb::CleanupRequest);
collect_first!(kvrpcpb::CleanupResponse);
collect_single!(kvrpcpb::CleanupResponse);
impl SingleKey for kvrpcpb::CleanupRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -489,7 +489,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
}
}

collect_first!(TxnHeartBeatResponse);
collect_single!(TxnHeartBeatResponse);

impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
fn key(&self) -> &Vec<u8> {
Expand Down
2 changes: 2 additions & 0 deletions tikv-client-store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::GcResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
has_region_error!(kvrpcpb::RawGetKeyTtlResponse);
has_region_error!(kvrpcpb::RawPutResponse);
has_region_error!(kvrpcpb::RawBatchPutResponse);
has_region_error!(kvrpcpb::RawDeleteResponse);
Expand Down Expand Up @@ -109,6 +110,7 @@ macro_rules! has_str_error {
}

has_str_error!(kvrpcpb::RawGetResponse);
has_str_error!(kvrpcpb::RawGetKeyTtlResponse);
has_str_error!(kvrpcpb::RawPutResponse);
has_str_error!(kvrpcpb::RawBatchPutResponse);
has_str_error!(kvrpcpb::RawDeleteResponse);
Expand Down
5 changes: 5 additions & 0 deletions tikv-client-store/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ macro_rules! impl_request {

impl_request!(RawGetRequest, raw_get_async_opt, "raw_get");
impl_request!(RawBatchGetRequest, raw_batch_get_async_opt, "raw_batch_get");
impl_request!(
RawGetKeyTtlRequest,
raw_get_key_ttl_async_opt,
"raw_get_key_ttl"
);
impl_request!(RawPutRequest, raw_put_async_opt, "raw_put");
impl_request!(RawBatchPutRequest, raw_batch_put_async_opt, "raw_batch_put");
impl_request!(RawDeleteRequest, raw_delete_async_opt, "raw_delete");
Expand Down

0 comments on commit 3a2060f

Please sign in to comment.