From 5a0b6feddf7bdeb3609a47e219dbd7015cc83fcf Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 30 Jul 2019 16:49:01 +0800 Subject: [PATCH 1/3] Use async/await in the transaction APIs After this commit, the library requires a nightly Rust compiler to build. The required toolchain version is also updated in README.md and .travis.yml. Signed-off-by: Yilin Chen --- .travis.yml | 2 +- README.md | 6 ++---- examples/transaction.rs | 8 +++++--- src/lib.rs | 1 + src/rpc/client.rs | 6 +++++- src/transaction/client.rs | 26 +++++++++++++++----------- src/transaction/transaction.rs | 22 +++++++++++----------- 7 files changed, 40 insertions(+), 31 deletions(-) diff --git a/.travis.yml b/.travis.yml index 221109da..82eaad6f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ os: # - windows # TODO: https://github.com/pingcap/kvproto/issues/355 - osx rust: - # Requires nightly for now, stable can be re-enabled when 1.36 is stable. + # Requires nightly for now, stable can be re-enabled when async/await is stable. # - stable - nightly env: diff --git a/README.md b/README.md index b09948a4..b34d3672 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F ## Using the client -The TiKV client is a Rust library (crate). It requires version 1.36 of the compiler and standard libraries (which will be stable from the 4th July 2019, see below for ensuring compatibility). +The TiKV client is a Rust library (crate). It requires a nightly Rust compiler with async/await support. To use this crate in your project, add it as a dependency in your `Cargo.toml`: @@ -28,8 +28,6 @@ The client requires a Git dependency until we can [publish it](https://github.co There are [examples](examples) which show how to use the client in a Rust program. -The examples and documentation use async/await syntax. This is a new feature in Rust and is currently unstable. To use async/await you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below). - ## Access the documentation We recommend using the cargo-generated documentation to browse and understand the API. We've done @@ -52,7 +50,7 @@ To check what version of Rust you are using, run rustc --version ``` -You'll see something like `rustc 1.36.0-nightly (a784a8022 2019-05-09)` where the `1.36.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use +You'll see something like `rustc 1.38.0-nightly (4b65a86eb 2019-07-15)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use ```bash rustup toolchain install nightly diff --git a/examples/transaction.rs b/examples/transaction.rs index 72aaf7a0..ab4d7076 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -13,7 +13,7 @@ use tikv_client::{ }; async fn puts(client: &Client, pairs: impl IntoIterator>) { - let mut txn = client.begin(); + let mut txn = client.begin().await.expect("Could not begin a transaction"); future::join_all( pairs .into_iter() @@ -28,7 +28,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator } async fn get(client: &Client, key: Key) -> Value { - let txn = client.begin(); + let txn = client.begin().await.expect("Could not begin a transaction"); txn.get(key).await.expect("Could not get value") } @@ -37,6 +37,8 @@ async fn get(client: &Client, key: Key) -> Value { async fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { client .begin() + .await + .expect("Could not begin a transaction") .scan(range) .into_stream() .take_while(move |r| { @@ -53,7 +55,7 @@ async fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { } async fn dels(client: &Client, keys: impl IntoIterator) { - let mut txn = client.begin(); + let mut txn = client.begin().await.expect("Could not begin a transaction"); txn.set_isolation_level(IsolationLevel::ReadCommitted); let _: Vec<()> = stream::iter(keys.into_iter()) .then(|p| { diff --git a/src/lib.rs b/src/lib.rs index 192d8b6c..90168717 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ // Long and nested future chains can quickly result in large generic types. #![type_length_limit = "16777216"] #![allow(clippy::redundant_closure)] +#![feature(async_await)] //! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a //! distributed transactional Key-Value database written in Rust. diff --git a/src/rpc/client.rs b/src/rpc/client.rs index b52d883f..02984ae7 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -20,7 +20,7 @@ use crate::{ kv::BoundRange, raw::ColumnFamily, rpc::{ - pd::{PdClient, Region, RegionId, RetryClient, StoreId}, + pd::{PdClient, Region, RegionId, RetryClient, StoreId, Timestamp}, security::SecurityManager, tikv::KvClient, Address, RawContext, Store, TxnContext, @@ -225,6 +225,10 @@ impl RpcClient { future::err(Error::unimplemented()) } + pub fn get_timestamp(self: Arc) -> impl Future> { + Arc::clone(&self.pd).get_timestamp() + } + // Returns a Steam which iterates over the contexts for each region covered by range. fn regions_for_range( self: Arc, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 3a689e90..d59e78f6 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -1,15 +1,19 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use super::{Snapshot, Timestamp, Transaction}; -use crate::{Config, Error}; +use crate::rpc::RpcClient; +use crate::{Config, Result}; use derive_new::new; use futures::prelude::*; use futures::task::{Context, Poll}; use std::pin::Pin; +use std::sync::Arc; /// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster. -pub struct Client; +pub struct Client { + rpc: Arc, +} impl Client { /// Creates a new [`Client`](Client) once the [`Connect`](Connect) resolves. @@ -38,13 +42,13 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let transaction = client.begin(); + /// let transaction = client.begin().await.unwrap(); /// // ... Issue some commands. /// let commit = transaction.commit(); /// let result: () = commit.await.unwrap(); /// # }); /// ``` - pub fn begin(&self) -> Transaction { + pub async fn begin(&self) -> Result { unimplemented!() } @@ -57,11 +61,11 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let snapshot = client.snapshot(); + /// let snapshot = client.snapshot().await.unwrap(); /// // ... Issue some commands. /// # }); /// ``` - pub fn snapshot(&self) -> Snapshot { + pub async fn snapshot(&self) -> Result { unimplemented!() } @@ -79,7 +83,7 @@ impl Client { /// // ... Issue some commands. /// # }); /// ``` - pub fn snapshot_at(&self, _timestamp: Timestamp) -> Snapshot { + pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result { unimplemented!() } @@ -92,11 +96,11 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let timestamp = client.current_timestamp(); + /// let timestamp = client.current_timestamp().await.unwrap(); /// # }); /// ``` - pub fn current_timestamp(&self) -> Timestamp { - unimplemented!() + pub async fn current_timestamp(&self) -> Result { + Arc::clone(&self.rpc).get_timestamp().await } } @@ -120,7 +124,7 @@ pub struct Connect { } impl Future for Connect { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { let _config = &self.config; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 960ab084..cd899a4c 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -22,7 +22,7 @@ use std::ops::RangeBounds; /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); -/// let txn = client.begin(); +/// let txn = client.begin().await.unwrap(); /// # }); /// ``` #[derive(new)] @@ -42,7 +42,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let req = txn.commit(); /// let result: () = req.await.unwrap(); @@ -61,7 +61,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let req = txn.rollback(); /// let result: () = req.await.unwrap(); @@ -80,7 +80,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]); /// let result: () = req.await.unwrap(); @@ -103,7 +103,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let ts: Timestamp = txn.start_ts(); /// # }); @@ -121,7 +121,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let snap: Snapshot = txn.snapshot(); /// # }); @@ -139,7 +139,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// txn.set_isolation_level(IsolationLevel::SnapshotIsolation); /// # }); /// ``` @@ -159,7 +159,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let key = "TiKV".to_owned(); /// let req = txn.get(key); /// let result: Value = req.await.unwrap(); @@ -183,7 +183,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; /// let req = txn.batch_get(keys); /// let result: Vec = req.await.unwrap(); @@ -214,7 +214,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let key = "TiKV".to_owned(); /// let val = "TiKV".to_owned(); /// let req = txn.set(key, val); @@ -238,7 +238,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let key = "TiKV".to_owned(); /// let req = txn.delete(key); /// let result: () = req.await.unwrap(); From 94f080e269922e8c72c016e98937c4e325df9327 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Tue, 30 Jul 2019 18:54:10 +0800 Subject: [PATCH 2/3] Add integration tests for get TS Signed-off-by: Yilin Chen --- src/transaction/client.rs | 9 ++++++--- tests/integration_tests.rs | 41 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 tests/integration_tests.rs diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d59e78f6..d82da836 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -78,7 +78,7 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let timestamp = Timestamp { physical: 1564474902, logical: 1 }; + /// let timestamp = Timestamp { physical: 1564481750172, logical: 1 }; /// let snapshot = client.snapshot_at(timestamp); /// // ... Issue some commands. /// # }); @@ -127,7 +127,10 @@ impl Future for Connect { type Output = Result; fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _config = &self.config; - unimplemented!() + let config = &self.config; + // TODO: RpcClient::connect currently uses a blocking implementation. + // Make it asynchronous later. + let rpc = Arc::new(RpcClient::connect(config)?); + Poll::Ready(Ok(Client { rpc })) } } diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs new file mode 100644 index 00000000..ac8c0327 --- /dev/null +++ b/tests/integration_tests.rs @@ -0,0 +1,41 @@ +#![cfg(feature = "integration-tests")] +#![feature(async_await)] + +use failure::Fallible; +use futures::executor::ThreadPool; +use futures::prelude::*; +use std::env; +use tikv_client::{transaction::*, Config, Result}; + +#[test] +fn get_timestamp() -> Fallible<()> { + const COUNT: usize = 1 << 12; + let mut pool = ThreadPool::new()?; + let config = Config::new(pd_addrs()); + let fut = async { + let client = Client::connect(config).await?; + Result::Ok(future::join_all((0..COUNT).map(|_| client.current_timestamp())).await) + }; + // Calculate each version of retrieved timestamp + let mut versions = pool + .run(fut)? + .into_iter() + .map(|res| res.map(|ts| ts.physical << 18 + ts.logical)) + .collect::>>()?; + + // Each version should be unique + versions.sort_unstable(); + versions.dedup(); + assert_eq!(versions.len(), COUNT); + Ok(()) +} + +const ENV_PD_ADDRS: &str = "PD_ADDRS"; + +fn pd_addrs() -> Vec { + env::var(ENV_PD_ADDRS) + .expect(&format!("Expected {}:", ENV_PD_ADDRS)) + .split(",") + .map(From::from) + .collect() +} From 78fa34fd3777564571737f2458071c1d579a6071 Mon Sep 17 00:00:00 2001 From: Yilin Chen Date: Wed, 31 Jul 2019 13:53:07 +0800 Subject: [PATCH 3/3] Remove explicit Futures for transaction Signed-off-by: Yilin Chen --- examples/transaction.rs | 30 +--- src/rpc/pd/mod.rs | 2 +- src/transaction/client.rs | 12 +- src/transaction/mod.rs | 4 +- src/transaction/requests.rs | 128 +-------------- src/transaction/transaction.rs | 276 ++++++++++++++------------------- 6 files changed, 134 insertions(+), 318 deletions(-) diff --git a/examples/transaction.rs b/examples/transaction.rs index ab4d7076..efc5d7ce 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -7,23 +7,14 @@ mod common; use crate::common::parse_args; use futures::prelude::*; use std::ops::RangeBounds; -use tikv_client::{ - transaction::{Client, IsolationLevel}, - Config, Key, KvPair, Value, -}; +use tikv_client::{transaction::Client, Config, Key, KvPair, Value}; async fn puts(client: &Client, pairs: impl IntoIterator>) { let mut txn = client.begin().await.expect("Could not begin a transaction"); - future::join_all( - pairs - .into_iter() - .map(Into::into) - .map(|p| txn.set(p.key().clone(), p.value().clone())), - ) - .await - .into_iter() - .collect::, _>>() - .expect("Could not set key value pairs"); + for pair in pairs { + let (key, value) = pair.into().into(); + txn.set(key, value); + } txn.commit().await.expect("Could not commit transaction"); } @@ -56,14 +47,9 @@ async fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { async fn dels(client: &Client, keys: impl IntoIterator) { let mut txn = client.begin().await.expect("Could not begin a transaction"); - txn.set_isolation_level(IsolationLevel::ReadCommitted); - let _: Vec<()> = stream::iter(keys.into_iter()) - .then(|p| { - txn.delete(p) - .unwrap_or_else(|e| panic!("error in delete: {:?}", e)) - }) - .collect() - .await; + for key in keys { + txn.delete(key); + } txn.commit().await.expect("Could not commit transaction"); } diff --git a/src/rpc/pd/mod.rs b/src/rpc/pd/mod.rs index 79d2cbfd..0a05f8d5 100644 --- a/src/rpc/pd/mod.rs +++ b/src/rpc/pd/mod.rs @@ -87,7 +87,7 @@ impl Region { } } -#[derive(Eq, PartialEq, Debug)] +#[derive(Eq, PartialEq, Debug, Clone, Copy)] pub struct Timestamp { pub physical: i64, pub logical: i64, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index d82da836..99ddfb76 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -42,14 +42,15 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let transaction = client.begin().await.unwrap(); + /// let mut transaction = client.begin().await.unwrap(); /// // ... Issue some commands. /// let commit = transaction.commit(); /// let result: () = commit.await.unwrap(); /// # }); /// ``` pub async fn begin(&self) -> Result { - unimplemented!() + let snapshot = self.snapshot().await?; + Ok(Transaction::new(snapshot)) } /// Gets the latest [`Snapshot`](Snapshot). @@ -66,7 +67,8 @@ impl Client { /// # }); /// ``` pub async fn snapshot(&self) -> Result { - unimplemented!() + let timestamp = self.current_timestamp().await?; + self.snapshot_at(timestamp).await } /// Gets a [`Snapshot`](Snapshot) at the given point in time. @@ -83,8 +85,8 @@ impl Client { /// // ... Issue some commands. /// # }); /// ``` - pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result { - unimplemented!() + pub async fn snapshot_at(&self, timestamp: Timestamp) -> Result { + Ok(Snapshot::new(timestamp)) } /// Retrieves the current [`Timestamp`](Timestamp). diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 923f08f7..3de8b29c 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -10,8 +10,8 @@ //! pub use self::client::{Client, Connect}; -pub use self::requests::{BatchGet, Commit, Delete, Get, LockKeys, Rollback, Scanner, Set}; -pub use self::transaction::{IsolationLevel, Snapshot, Transaction, TxnInfo}; +pub use self::requests::Scanner; +pub use self::transaction::{Snapshot, Transaction, TxnInfo}; pub use super::rpc::Timestamp; use crate::{Key, Value}; diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 55cc97c7..c3d70ebe 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -1,9 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use super::Transaction; -use crate::{Error, Key, KvPair, Value}; +use crate::{Error, KvPair}; -use derive_new::new; use futures::prelude::*; use futures::task::{Context, Poll}; use std::pin::Pin; @@ -20,127 +18,3 @@ impl Stream for Scanner { unimplemented!() } } - -/// An unresolved [`Transaction::get`](Transaction::get) request. -/// -/// Once resolved this request will result in the fetching of the value associated with the given -/// key. -#[derive(new)] -pub struct Get { - key: Key, -} - -impl Future for Get { - type Output = Result; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _key = &self.key; - unimplemented!() - } -} - -/// An unresolved [`Transaction::batch_get`](Transaction::batch_get) request. -/// -/// Once resolved this request will result in the fetching of the values associated with the given -/// keys. -#[derive(new)] -pub struct BatchGet { - keys: Vec, -} - -impl Future for BatchGet { - type Output = Result, Error>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _keys = &self.keys; - unimplemented!() - } -} - -/// An unresolved [`Transaction::commit`](Transaction::commit) request. -/// -/// Once resolved this request will result in the committing of the transaction. -#[derive(new)] -pub struct Commit { - txn: Transaction, -} - -impl Future for Commit { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _txn = &self.txn; - unimplemented!() - } -} - -/// An unresolved [`Transaction::rollback`](Transaction::rollback) request. -/// -/// Once resolved this request will result in the rolling back of the transaction. -#[derive(new)] -pub struct Rollback { - txn: Transaction, -} - -impl Future for Rollback { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _txn = &self.txn; - unimplemented!() - } -} - -/// An unresolved [`Transaction::lock_keys`](Transaction::lock_keys) request. -/// -/// Once resolved this request will result in the locking of the given keys. -#[derive(new)] -pub struct LockKeys { - keys: Vec, -} - -impl Future for LockKeys { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _keys = &self.keys; - unimplemented!() - } -} - -/// An unresolved [`Transaction::set`](Transaction::set) request. -/// -/// Once resolved this request will result in the setting of the value associated with the given -/// key. -#[derive(new)] -pub struct Set { - key: Key, - value: Value, -} - -impl Future for Set { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _key = &self.key; - let _value = &self.value; - unimplemented!() - } -} - -/// An unresolved [`Transaction::delete`](Transaction::delete) request. -/// -/// Once resolved this request will result in the deletion of the given key. -#[derive(new)] -pub struct Delete { - key: Key, -} - -impl Future for Delete { - type Output = Result<(), Error>; - - fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { - let _key = &self.key; - unimplemented!() - } -} diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index cd899a4c..a285fd49 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,7 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use super::{BatchGet, Commit, Delete, Get, LockKeys, Rollback, Scanner, Set, Timestamp}; -use crate::{Key, Value}; +use super::{Scanner, Timestamp}; +use crate::{Key, KvPair, Result, Value}; use derive_new::new; use std::ops::RangeBounds; @@ -31,243 +31,228 @@ pub struct Transaction { } impl Transaction { - /// Commit the actions of the transaction. - /// - /// Once committed, it is no longer possible to `rollback` the actions in the transaction. + /// Gets the value associated with the given key. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Config, transaction::Client}; + /// # use tikv_client::{Value, Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// # let connect = Client::connect(Config::default()); - /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin().await.unwrap(); - /// // ... Do some actions. - /// let req = txn.commit(); - /// let result: () = req.await.unwrap(); + /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.await.unwrap(); + /// let mut txn = connected_client.begin().await.unwrap(); + /// let key = "TiKV".to_owned(); + /// let req = txn.get(key); + /// let result: Value = req.await.unwrap(); + /// // Finish the transaction... + /// txn.commit().await.unwrap(); /// # }); /// ``` - pub fn commit(self) -> Commit { - Commit::new(self) + pub async fn get(&self, _key: impl Into) -> Result { + unimplemented!() } - /// Rollback the actions of the transaction. + /// Gets the values associated with the given keys. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Config, transaction::Client}; + /// # use tikv_client::{KvPair, Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// # let connect = Client::connect(Config::default()); - /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin().await.unwrap(); - /// // ... Do some actions. - /// let req = txn.rollback(); - /// let result: () = req.await.unwrap(); + /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.await.unwrap(); + /// let mut txn = connected_client.begin().await.unwrap(); + /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; + /// let req = txn.batch_get(keys); + /// let result: Vec = req.await.unwrap(); + /// // Finish the transaction... + /// txn.commit().await.unwrap(); /// # }); /// ``` - pub fn rollback(self) -> Rollback { - Rollback::new(self) + pub async fn batch_get( + &self, + _keys: impl IntoIterator>, + ) -> Result> { + unimplemented!() + } + + pub fn scan(&self, _range: impl RangeBounds) -> Scanner { + unimplemented!() } - /// Lock the given keys. + pub fn scan_reverse(&self, _range: impl RangeBounds) -> Scanner { + unimplemented!() + } + + /// Sets the value associated with the given key. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Config, transaction::Client}; + /// # use tikv_client::{Key, Value, Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// # let connect = Client::connect(Config::default()); - /// # let connected_client = connect.await.unwrap(); + /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.await.unwrap(); /// let mut txn = connected_client.begin().await.unwrap(); - /// // ... Do some actions. - /// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]); - /// let result: () = req.await.unwrap(); + /// let key = "TiKV".to_owned(); + /// let val = "TiKV".to_owned(); + /// txn.set(key, val); + /// // Finish the transaction... + /// txn.commit().await.unwrap(); /// # }); /// ``` - pub fn lock_keys(&mut self, keys: impl IntoIterator>) -> LockKeys { - LockKeys::new(keys.into_iter().map(|v| v.into()).collect()) - } - - pub fn is_readonly(&self) -> bool { + pub fn set(&mut self, _key: impl Into, _value: impl Into) { unimplemented!() } - /// Returns the timestamp which the transaction started at. + /// Deletes the given key. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Config, transaction::{Client, Timestamp}}; + /// # use tikv_client::{Key, Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// # let connect = Client::connect(Config::default()); - /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin().await.unwrap(); - /// // ... Do some actions. - /// let ts: Timestamp = txn.start_ts(); + /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); + /// # let connected_client = connecting_client.await.unwrap(); + /// let mut txn = connected_client.begin().await.unwrap(); + /// let key = "TiKV".to_owned(); + /// txn.delete(key); + /// // Finish the transaction... + /// txn.commit().await.unwrap(); /// # }); /// ``` - pub fn start_ts(&self) -> Timestamp { + pub fn delete(&mut self, _key: impl Into) { unimplemented!() } - /// Get the `Snapshot` the transaction is operating on. + /// Locks the given keys. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Config, transaction::{Client, Snapshot}}; + /// # use tikv_client::{Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin().await.unwrap(); + /// let mut txn = connected_client.begin().await.unwrap(); + /// txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]); /// // ... Do some actions. - /// let snap: Snapshot = txn.snapshot(); + /// txn.commit().await.unwrap(); /// # }); /// ``` - pub fn snapshot(&self) -> Snapshot { + pub fn lock_keys(&mut self, _keys: impl IntoIterator>) { unimplemented!() } - /// Set the isolation level of the transaction. + /// Commits the actions of the transaction. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Config, transaction::{Client, IsolationLevel}}; + /// # use tikv_client::{Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); /// let mut txn = connected_client.begin().await.unwrap(); - /// txn.set_isolation_level(IsolationLevel::SnapshotIsolation); + /// // ... Do some actions. + /// let req = txn.commit(); + /// let result: () = req.await.unwrap(); /// # }); /// ``` - pub fn set_isolation_level(&mut self, _level: IsolationLevel) { + pub async fn commit(&mut self) -> Result<()> { unimplemented!() } - /// Create a new [`Get`](Get) request. - /// - /// Once resolved this request will result in the fetching of the value associated with the - /// given key. + /// Returns the timestamp which the transaction started at. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Value, Config, transaction::Client}; + /// # use tikv_client::{Config, transaction::{Client, Timestamp}}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin().await.unwrap(); - /// let key = "TiKV".to_owned(); - /// let req = txn.get(key); - /// let result: Value = req.await.unwrap(); - /// // Finish the transaction... - /// txn.commit().await.unwrap(); + /// # let connect = Client::connect(Config::default()); + /// # let connected_client = connect.await.unwrap(); + /// let txn = connected_client.begin().await.unwrap(); + /// // ... Do some actions. + /// let ts: Timestamp = txn.start_ts(); /// # }); /// ``` - pub fn get(&self, key: impl Into) -> Get { - self.snapshot.get(key.into()) + pub fn start_ts(&self) -> Timestamp { + self.snapshot().timestamp } - /// Create a new [`BatchGet`](BatchGet) request. - /// - /// Once resolved this request will result in the fetching of the values associated with the - /// given keys. + /// Gets the `Snapshot` the transaction is operating on. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{KvPair, Config, transaction::Client}; + /// # use tikv_client::{Config, transaction::{Client, Snapshot}}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { - /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); - /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin().await.unwrap(); - /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; - /// let req = txn.batch_get(keys); - /// let result: Vec = req.await.unwrap(); - /// // Finish the transaction... - /// txn.commit().await.unwrap(); + /// # let connect = Client::connect(Config::default()); + /// # let connected_client = connect.await.unwrap(); + /// let txn = connected_client.begin().await.unwrap(); + /// // ... Do some actions. + /// let snap: &Snapshot = txn.snapshot(); /// # }); /// ``` - pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { - self.snapshot.batch_get(keys) + pub fn snapshot(&self) -> &Snapshot { + &self.snapshot } +} - pub fn scan(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan(range) - } +pub struct TxnInfo { + pub txn: u64, + pub status: u64, +} - pub fn scan_reverse(&self, range: impl RangeBounds) -> Scanner { - self.snapshot.scan_reverse(range) - } +/// A snapshot of dataset at a particular point in time. +#[derive(new)] +pub struct Snapshot { + timestamp: Timestamp, +} - /// Create a new [`Set`](Set) request. - /// - /// Once resolved this request will result in the setting of the value associated with the given key. +impl Snapshot { + /// Gets the value associated with the given key. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Key, Value, Config, transaction::Client}; + /// # use tikv_client::{Value, Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin().await.unwrap(); + /// let snapshot = connected_client.snapshot().await.unwrap(); /// let key = "TiKV".to_owned(); - /// let val = "TiKV".to_owned(); - /// let req = txn.set(key, val); - /// let result: () = req.await.unwrap(); - /// // Finish the transaction... - /// txn.commit().await.unwrap(); + /// let req = snapshot.get(key); + /// let result: Value = req.await.unwrap(); /// # }); /// ``` - pub fn set(&mut self, key: impl Into, value: impl Into) -> Set { - Set::new(key.into(), value.into()) + pub async fn get(&self, _key: impl Into) -> Result { + unimplemented!() } - /// Create a new [`Delete`](Delete) request. - /// - /// Once resolved this request will result in the deletion of the given key. + /// Gets the values associated with the given keys. /// /// ```rust,no_run /// # #![feature(async_await)] - /// # use tikv_client::{Key, Config, transaction::Client}; + /// # use tikv_client::{KvPair, Config, transaction::Client}; /// # use futures::prelude::*; /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); /// let mut txn = connected_client.begin().await.unwrap(); - /// let key = "TiKV".to_owned(); - /// let req = txn.delete(key); - /// let result: () = req.await.unwrap(); + /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; + /// let req = txn.batch_get(keys); + /// let result: Vec = req.await.unwrap(); /// // Finish the transaction... /// txn.commit().await.unwrap(); /// # }); /// ``` - pub fn delete(&mut self, key: impl Into) -> Delete { - Delete::new(key.into()) - } -} - -pub struct TxnInfo { - pub txn: u64, - pub status: u64, -} - -/// A snapshot of dataset at a particular point in time. -pub struct Snapshot { - _timestamp: Timestamp, -} - -impl Snapshot { - pub fn get(&self, key: impl Into) -> Get { - Get::new(key.into()) - } - - pub fn batch_get(&self, keys: impl IntoIterator>) -> BatchGet { - BatchGet::new(keys.into_iter().map(|v| v.into()).collect()) + pub async fn batch_get( + &self, + _keys: impl IntoIterator>, + ) -> Result> { + unimplemented!() } pub fn scan(&self, range: impl RangeBounds) -> Scanner { @@ -280,34 +265,3 @@ impl Snapshot { unimplemented!() } } - -/// The isolation level guarantees provided by the transaction. -#[derive(Copy, Clone, Eq, PartialEq, Debug)] -pub enum IsolationLevel { - /// Consistent reads and conflict free writes. - /// - /// Snapshot isolation guarantees: - /// * All reads will see the last committed value of the data at the snapshot timestamp. - /// * The transaction will only successfully commit if no updates to the data have created a - /// conflict with concurrent updates made sine the snapshot. - /// - /// Using this level means: - /// * Lost updates don't occur. - /// * Dirty reads don't occur. - /// * Non-repeatable reads don't occur. - /// * Phantom reads don't occur. - SnapshotIsolation, - /// Reads may not be consistent, but writes are conflict free. - /// - /// Read committed guarantees: - /// * All reads are committed at the moment it is read. - /// not repeatable. - /// * Write locks are only released at the end of the transaction. - /// - /// Using this level means: - /// * Lost updates don't occur. - /// * Dirty reads don't occur. - /// * Non-repeatable reads may occur. - /// * Phantom reads may occur. - ReadCommitted, -}