Skip to content

Commit

Permalink
add ttl to RawClient
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Lok <andylokandy@hotmail.com>
  • Loading branch information
andylokandy committed Jul 11, 2023
1 parent 2c831ba commit 2e81b06
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 15 deletions.
55 changes: 53 additions & 2 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,33 @@ impl<PdC: PdClient> Client<PdC> {
.map(|r| r.into_iter().map(Into::into).collect())
}

/// Create a new 'get key ttl' request.
///
/// Once resolved this request will result in the fetching of the alive time left for the
/// given key.
///
/// Retuning `Ok(None)` indicates the key does not exist in TiKV.
///
/// # Examples
/// # use tikv_client::{Value, Config, RawClient};
/// # use futures::prelude::*;
/// # futures::executor::block_on(async {
/// # let client = RawClient::new(vec!["192.168.0.100"]).await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = client.get_key_ttl_secs(key);
/// let result: Option<Value> = req.await.unwrap();
/// # });
pub async fn get_key_ttl_secs(&self, key: impl Into<Key>) -> Result<Option<u64>> {
debug!("invoking raw get_key_ttl_secs request");
let request = new_raw_get_key_ttl_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
.post_process_default()
.plan();
plan.execute().await
}

/// Create a new 'put' request.
///
/// Once resolved this request will result in the setting of the value associated with the given key.
Expand All @@ -268,8 +295,23 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_with_ttl(key, value, 0).await
}

pub async fn put_with_ttl(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
ttl_secs: u64,
) -> Result<()> {
debug!("invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let request = new_raw_put_request(
key.into(),
value.into(),
self.cf.clone(),
ttl_secs,
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(self.backoff.clone())
.merge(CollectSingle)
Expand Down Expand Up @@ -299,10 +341,19 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
self.batch_put_with_ttl(pairs, std::iter::repeat(0)).await
}

pub async fn batch_put_with_ttl(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
ttls: impl IntoIterator<Item = u64>,
) -> Result<()> {
debug!("invoking raw batch_put request");
let request = new_raw_batch_put_request(
pairs.into_iter().map(Into::into),
pairs.into_iter().map(|pair| pair.into()),
ttls.into_iter(),
self.cf.clone(),
self.atomic,
);
Expand Down
16 changes: 14 additions & 2 deletions src/raw/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,33 @@ pub fn new_raw_batch_get_request(
requests::new_raw_batch_get_request(keys.map(Into::into).collect(), cf)
}

pub fn new_raw_get_key_ttl_request(
key: Key,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
requests::new_raw_get_key_ttl_request(key.into(), cf)
}

pub fn new_raw_put_request(
key: Key,
value: Value,
cf: Option<ColumnFamily>,
ttl: u64,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
requests::new_raw_put_request(key.into(), value, cf, atomic)
requests::new_raw_put_request(key.into(), value, ttl, cf, atomic)
}

pub fn new_raw_batch_put_request(
pairs: impl Iterator<Item = KvPair>,
ttls: impl Iterator<Item = u64>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
requests::new_raw_batch_put_request(pairs.map(Into::into).collect(), cf, atomic)
let pairs = pairs.map(Into::into).collect::<Vec<_>>();
let ttls = ttls.collect::<Vec<_>>();
assert_eq!(pairs.len(), ttls.len());
requests::new_raw_batch_put_request(pairs, ttls, cf, atomic)
}

pub fn new_raw_delete_request(
Expand Down
53 changes: 48 additions & 5 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::stream::BoxStream;
use tonic::transport::Channel;

use super::RawRpcRequest;
use crate::collect_first;
use crate::collect_single;
use crate::pd::PdClient;
use crate::proto::kvrpcpb;
use crate::proto::metapb;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl KvRequest for kvrpcpb::RawGetRequest {
}

shardable_key!(kvrpcpb::RawGetRequest);
collect_first!(kvrpcpb::RawGetResponse);
collect_single!(kvrpcpb::RawGetResponse);

impl SingleKey for kvrpcpb::RawGetRequest {
fn key(&self) -> &Vec<u8> {
Expand Down Expand Up @@ -101,15 +101,54 @@ impl Merge<kvrpcpb::RawBatchGetResponse> for Collect {
}
}

pub fn new_raw_get_key_ttl_request(
key: Vec<u8>,
cf: Option<ColumnFamily>,
) -> kvrpcpb::RawGetKeyTtlRequest {
let mut req = kvrpcpb::RawGetKeyTtlRequest::default();
req.key = key;
req.maybe_set_cf(cf);

req
}

impl KvRequest for kvrpcpb::RawGetKeyTtlRequest {
type Response = kvrpcpb::RawGetKeyTtlResponse;
}

shardable_key!(kvrpcpb::RawGetKeyTtlRequest);
collect_single!(kvrpcpb::RawGetKeyTtlResponse);

impl SingleKey for kvrpcpb::RawGetKeyTtlRequest {
fn key(&self) -> &Vec<u8> {
&self.key
}
}

impl Process<kvrpcpb::RawGetKeyTtlResponse> for DefaultProcessor {
type Out = Option<u64>;

fn process(&self, input: Result<kvrpcpb::RawGetKeyTtlResponse>) -> Result<Self::Out> {
let input = input?;
Ok(if input.not_found {
None
} else {
Some(input.ttl)
})
}
}

pub fn new_raw_put_request(
key: Vec<u8>,
value: Vec<u8>,
ttl: u64,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawPutRequest {
let mut req = kvrpcpb::RawPutRequest::default();
req.key = key;
req.value = value;
req.ttl = ttl;
req.maybe_set_cf(cf);
req.for_cas = atomic;

Expand All @@ -121,7 +160,7 @@ impl KvRequest for kvrpcpb::RawPutRequest {
}

shardable_key!(kvrpcpb::RawPutRequest);
collect_first!(kvrpcpb::RawPutResponse);
collect_single!(kvrpcpb::RawPutResponse);
impl SingleKey for kvrpcpb::RawPutRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand All @@ -130,11 +169,13 @@ impl SingleKey for kvrpcpb::RawPutRequest {

pub fn new_raw_batch_put_request(
pairs: Vec<kvrpcpb::KvPair>,
ttls: Vec<u64>,
cf: Option<ColumnFamily>,
atomic: bool,
) -> kvrpcpb::RawBatchPutRequest {
let mut req = kvrpcpb::RawBatchPutRequest::default();
req.pairs = pairs;
req.ttls = ttls;
req.maybe_set_cf(cf);
req.for_cas = atomic;

Expand Down Expand Up @@ -185,7 +226,7 @@ impl KvRequest for kvrpcpb::RawDeleteRequest {
}

shardable_key!(kvrpcpb::RawDeleteRequest);
collect_first!(kvrpcpb::RawDeleteResponse);
collect_single!(kvrpcpb::RawDeleteResponse);
impl SingleKey for kvrpcpb::RawDeleteRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -331,7 +372,7 @@ impl KvRequest for kvrpcpb::RawCasRequest {
}

shardable_key!(kvrpcpb::RawCasRequest);
collect_first!(kvrpcpb::RawCasResponse);
collect_single!(kvrpcpb::RawCasResponse);
impl SingleKey for kvrpcpb::RawCasRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -463,6 +504,7 @@ macro_rules! impl_raw_rpc_request {

impl_raw_rpc_request!(RawGetRequest);
impl_raw_rpc_request!(RawBatchGetRequest);
impl_raw_rpc_request!(RawGetKeyTtlRequest);
impl_raw_rpc_request!(RawPutRequest);
impl_raw_rpc_request!(RawBatchPutRequest);
impl_raw_rpc_request!(RawDeleteRequest);
Expand All @@ -474,6 +516,7 @@ impl_raw_rpc_request!(RawCasRequest);

impl HasLocks for kvrpcpb::RawGetResponse {}
impl HasLocks for kvrpcpb::RawBatchGetResponse {}
impl HasLocks for kvrpcpb::RawGetKeyTtlResponse {}
impl HasLocks for kvrpcpb::RawPutResponse {}
impl HasLocks for kvrpcpb::RawBatchPutResponse {}
impl HasLocks for kvrpcpb::RawDeleteResponse {}
Expand Down
2 changes: 1 addition & 1 deletion src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub struct CollectSingle;

#[doc(hidden)]
#[macro_export]
macro_rules! collect_first {
macro_rules! collect_single {
($type_: ty) => {
impl Merge<$type_> for CollectSingle {
type Out = $type_;
Expand Down
2 changes: 2 additions & 0 deletions src/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ has_region_error!(kvrpcpb::DeleteRangeResponse);
has_region_error!(kvrpcpb::GcResponse);
has_region_error!(kvrpcpb::RawGetResponse);
has_region_error!(kvrpcpb::RawBatchGetResponse);
has_region_error!(kvrpcpb::RawGetKeyTtlResponse);
has_region_error!(kvrpcpb::RawPutResponse);
has_region_error!(kvrpcpb::RawBatchPutResponse);
has_region_error!(kvrpcpb::RawDeleteResponse);
Expand Down Expand Up @@ -102,6 +103,7 @@ macro_rules! has_str_error {
}

has_str_error!(kvrpcpb::RawGetResponse);
has_str_error!(kvrpcpb::RawGetKeyTtlResponse);
has_str_error!(kvrpcpb::RawPutResponse);
has_str_error!(kvrpcpb::RawBatchPutResponse);
has_str_error!(kvrpcpb::RawDeleteResponse);
Expand Down
1 change: 1 addition & 0 deletions src/store/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ macro_rules! impl_request {

impl_request!(RawGetRequest, raw_get, "raw_get");
impl_request!(RawBatchGetRequest, raw_batch_get, "raw_batch_get");
impl_request!(RawGetKeyTtlRequest, raw_get_key_ttl, "raw_get_key_ttl");
impl_request!(RawPutRequest, raw_put, "raw_put");
impl_request!(RawBatchPutRequest, raw_batch_put, "raw_batch_put");
impl_request!(RawDeleteRequest, raw_delete, "raw_delete");
Expand Down
10 changes: 5 additions & 5 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::stream::{self};
use futures::StreamExt;

use super::transaction::TXN_COMMIT_BATCH_SIZE;
use crate::collect_first;
use crate::collect_single;
use crate::common::Error::PessimisticLockError;
use crate::pd::PdClient;
use crate::proto::kvrpcpb::Action;
Expand Down Expand Up @@ -98,7 +98,7 @@ impl KvRequest for kvrpcpb::GetRequest {
}

shardable_key!(kvrpcpb::GetRequest);
collect_first!(kvrpcpb::GetResponse);
collect_single!(kvrpcpb::GetResponse);
impl SingleKey for kvrpcpb::GetRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -220,7 +220,7 @@ impl KvRequest for kvrpcpb::CleanupRequest {
}

shardable_key!(kvrpcpb::CleanupRequest);
collect_first!(kvrpcpb::CleanupResponse);
collect_single!(kvrpcpb::CleanupResponse);
impl SingleKey for kvrpcpb::CleanupRequest {
fn key(&self) -> &Vec<u8> {
&self.key
Expand Down Expand Up @@ -617,7 +617,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
}
}

collect_first!(TxnHeartBeatResponse);
collect_single!(TxnHeartBeatResponse);

impl SingleKey for kvrpcpb::TxnHeartBeatRequest {
fn key(&self) -> &Vec<u8> {
Expand Down Expand Up @@ -681,7 +681,7 @@ impl SingleKey for kvrpcpb::CheckTxnStatusRequest {
}
}

collect_first!(kvrpcpb::CheckTxnStatusResponse);
collect_single!(kvrpcpb::CheckTxnStatusResponse);

impl Process<kvrpcpb::CheckTxnStatusResponse> for DefaultProcessor {
type Out = TransactionStatus;
Expand Down
28 changes: 28 additions & 0 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,34 @@ async fn raw_write_million() -> Result<()> {
Ok(())
}

/// Tests raw ttl API.
#[tokio::test]
#[serial]
async fn raw_ttl() -> Result<()> {
init().await?;
let client = RawClient::new(pd_addrs()).await?;
let key1 = vec![1];
let key2 = vec![2];
let val = vec![42];

assert_eq!(client.get_key_ttl_secs(key1.clone()).await?, None);
client.put_with_ttl(key1.clone(), val.clone(), 10).await?;
assert_eq!(client.get(key1.clone()).await?, Some(val.clone()));
assert_eq!(client.get_key_ttl_secs(key1.clone()).await?, Some(10));
client
.batch_put_with_ttl(
vec![(key1.clone(), val.clone()), (key2.clone(), val.clone())],
vec![20, 20],
)
.await?;
assert_eq!(client.get(key1.clone()).await?, Some(val.clone()));
assert_eq!(client.get(key2.clone()).await?, Some(val.clone()));
assert_eq!(client.get_key_ttl_secs(key1.clone()).await?, Some(20));
assert_eq!(client.get_key_ttl_secs(key2.clone()).await?, Some(20));

Ok(())
}

#[tokio::test]
#[serial]
async fn txn_pessimistic_rollback() -> Result<()> {
Expand Down

0 comments on commit 2e81b06

Please sign in to comment.