Skip to content

Commit

Permalink
codec type for transaction
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Aug 30, 2023
1 parent d244bfc commit c00889d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 12 deletions.
10 changes: 5 additions & 5 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl<Cod: Codec> Client<Cod> {
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_optimistic(&self) -> Result<Transaction<PdRpcClient<Cod>>> {
pub async fn begin_optimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
debug!("creating new optimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
Expand All @@ -175,7 +175,7 @@ impl<Cod: Codec> Client<Cod> {
/// transaction.commit().await.unwrap();
/// # });
/// ```
pub async fn begin_pessimistic(&self) -> Result<Transaction<PdRpcClient<Cod>>> {
pub async fn begin_pessimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
debug!("creating new pessimistic transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
Expand All @@ -201,7 +201,7 @@ impl<Cod: Codec> Client<Cod> {
pub async fn begin_with_options(
&self,
options: TransactionOptions,
) -> Result<Transaction<PdRpcClient<Cod>>> {
) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
debug!("creating new customized transaction");
let timestamp = self.current_timestamp().await?;
Ok(self.new_transaction(timestamp, options))
Expand All @@ -212,7 +212,7 @@ impl<Cod: Codec> Client<Cod> {
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Snapshot<PdRpcClient<Cod>> {
) -> Snapshot<Cod, PdRpcClient<Cod>> {
debug!("creating new snapshot");
Snapshot::new(self.new_transaction(timestamp, options.read_only()))
}
Expand Down Expand Up @@ -311,7 +311,7 @@ impl<Cod: Codec> Client<Cod> {
&self,
timestamp: Timestamp,
options: TransactionOptions,
) -> Transaction<PdRpcClient<Cod>> {
) -> Transaction<Cod, PdRpcClient<Cod>> {
Transaction::new(timestamp, self.pd.clone(), options)
}
}
9 changes: 6 additions & 3 deletions src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use derive_new::new;
use log::debug;
use std::marker::PhantomData;

use crate::codec::ApiV1TxnCodec;
use crate::pd::{PdClient, PdRpcClient};
use crate::request::codec::Codec;
use crate::BoundRange;
Expand All @@ -20,11 +22,12 @@ use crate::Value;
///
/// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
#[derive(new)]
pub struct Snapshot<PdC: PdClient = PdRpcClient> {
transaction: Transaction<PdC>,
pub struct Snapshot<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
transaction: Transaction<Cod, PdC>,
phantom: PhantomData<Cod>,
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<PdC> {
impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<Cod, PdC> {
/// Get the value associated with the given key.
pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
debug!("invoking get request on snapshot");
Expand Down
12 changes: 8 additions & 4 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::iter;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Instant;

Expand All @@ -14,6 +15,7 @@ use tokio::time::Duration;

use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::codec::ApiV1TxnCodec;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb;
Expand Down Expand Up @@ -74,22 +76,23 @@ use crate::Value;
/// txn.commit().await.unwrap();
/// # });
/// ```
pub struct Transaction<PdC: PdClient = PdRpcClient> {
pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
status: Arc<RwLock<TransactionStatus>>,
timestamp: Timestamp,
buffer: Buffer,
rpc: Arc<PdC>,
options: TransactionOptions,
is_heartbeat_started: bool,
start_instant: Instant,
phantom: PhantomData<Cod>,
}

impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
pub(crate) fn new(
timestamp: Timestamp,
rpc: Arc<PdC>,
options: TransactionOptions,
) -> Transaction<PdC> {
) -> Transaction<Cod, PdC> {
let status = if options.read_only {
TransactionStatus::ReadOnly
} else {
Expand All @@ -103,6 +106,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
options,
is_heartbeat_started: false,
start_instant: std::time::Instant::now(),
phantom: PhantomData,
}
}

Expand Down Expand Up @@ -924,7 +928,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
}
}

impl<PdC: PdClient> Drop for Transaction<PdC> {
impl<Cod: Codec, PdC: PdClient> Drop for Transaction<Cod, PdC> {
fn drop(&mut self) {
debug!("dropping transaction");
if std::thread::panicking() {
Expand Down

0 comments on commit c00889d

Please sign in to comment.