diff --git a/.travis.yml b/.travis.yml index 221109da..82eaad6f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,7 @@ os: # - windows # TODO: https://github.com/pingcap/kvproto/issues/355 - osx rust: - # Requires nightly for now, stable can be re-enabled when 1.36 is stable. + # Requires nightly for now, stable can be re-enabled when async/await is stable. # - stable - nightly env: diff --git a/README.md b/README.md index b09948a4..75573847 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,9 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F ## Using the client -The TiKV client is a Rust library (crate). It requires version 1.36 of the compiler and standard libraries (which will be stable from the 4th July 2019, see below for ensuring compatibility). +The TiKV client is a Rust library (crate). It uses async/await internally and exposes some `async fn` APIs as well. + +Async/await is a new feature in Rust and is currently unstable. To use it you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below). To use this crate in your project, add it as a dependency in your `Cargo.toml`: @@ -28,8 +30,6 @@ The client requires a Git dependency until we can [publish it](https://github.co There are [examples](examples) which show how to use the client in a Rust program. -The examples and documentation use async/await syntax. This is a new feature in Rust and is currently unstable. To use async/await you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below). - ## Access the documentation We recommend using the cargo-generated documentation to browse and understand the API. We've done @@ -52,7 +52,7 @@ To check what version of Rust you are using, run rustc --version ``` -You'll see something like `rustc 1.36.0-nightly (a784a8022 2019-05-09)` where the `1.36.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use +You'll see something like `rustc 1.38.0-nightly (dddb7fca0 2019-07-30)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use ```bash rustup toolchain install nightly diff --git a/examples/transaction.rs b/examples/transaction.rs index 72aaf7a0..ab4d7076 100644 --- a/examples/transaction.rs +++ b/examples/transaction.rs @@ -13,7 +13,7 @@ use tikv_client::{ }; async fn puts(client: &Client, pairs: impl IntoIterator>) { - let mut txn = client.begin(); + let mut txn = client.begin().await.expect("Could not begin a transaction"); future::join_all( pairs .into_iter() @@ -28,7 +28,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator } async fn get(client: &Client, key: Key) -> Value { - let txn = client.begin(); + let txn = client.begin().await.expect("Could not begin a transaction"); txn.get(key).await.expect("Could not get value") } @@ -37,6 +37,8 @@ async fn get(client: &Client, key: Key) -> Value { async fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { client .begin() + .await + .expect("Could not begin a transaction") .scan(range) .into_stream() .take_while(move |r| { @@ -53,7 +55,7 @@ async fn scan(client: &Client, range: impl RangeBounds, mut limit: usize) { } async fn dels(client: &Client, keys: impl IntoIterator) { - let mut txn = client.begin(); + let mut txn = client.begin().await.expect("Could not begin a transaction"); txn.set_isolation_level(IsolationLevel::ReadCommitted); let _: Vec<()> = stream::iter(keys.into_iter()) .then(|p| { diff --git a/rust-toolchain b/rust-toolchain index 664d9426..ed6f0c18 100644 --- a/rust-toolchain +++ b/rust-toolchain @@ -1 +1 @@ -nightly-2019-07-16 +nightly-2019-07-31 diff --git a/src/lib.rs b/src/lib.rs index 192d8b6c..90168717 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ // Long and nested future chains can quickly result in large generic types. #![type_length_limit = "16777216"] #![allow(clippy::redundant_closure)] +#![feature(async_await)] //! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a //! distributed transactional Key-Value database written in Rust. diff --git a/src/rpc/client.rs b/src/rpc/client.rs index b52d883f..02984ae7 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -20,7 +20,7 @@ use crate::{ kv::BoundRange, raw::ColumnFamily, rpc::{ - pd::{PdClient, Region, RegionId, RetryClient, StoreId}, + pd::{PdClient, Region, RegionId, RetryClient, StoreId, Timestamp}, security::SecurityManager, tikv::KvClient, Address, RawContext, Store, TxnContext, @@ -225,6 +225,10 @@ impl RpcClient { future::err(Error::unimplemented()) } + pub fn get_timestamp(self: Arc) -> impl Future> { + Arc::clone(&self.pd).get_timestamp() + } + // Returns a Steam which iterates over the contexts for each region covered by range. fn regions_for_range( self: Arc, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 3a689e90..8577a478 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -1,15 +1,19 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use super::{Snapshot, Timestamp, Transaction}; -use crate::{Config, Error}; +use crate::rpc::RpcClient; +use crate::{Config, Result}; use derive_new::new; use futures::prelude::*; use futures::task::{Context, Poll}; use std::pin::Pin; +use std::sync::Arc; /// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster. -pub struct Client; +pub struct Client { + rpc: Arc, +} impl Client { /// Creates a new [`Client`](Client) once the [`Connect`](Connect) resolves. @@ -38,13 +42,13 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let transaction = client.begin(); + /// let transaction = client.begin().await.unwrap(); /// // ... Issue some commands. /// let commit = transaction.commit(); /// let result: () = commit.await.unwrap(); /// # }); /// ``` - pub fn begin(&self) -> Transaction { + pub async fn begin(&self) -> Result { unimplemented!() } @@ -57,11 +61,11 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let snapshot = client.snapshot(); + /// let snapshot = client.snapshot().await.unwrap(); /// // ... Issue some commands. /// # }); /// ``` - pub fn snapshot(&self) -> Snapshot { + pub async fn snapshot(&self) -> Result { unimplemented!() } @@ -79,7 +83,7 @@ impl Client { /// // ... Issue some commands. /// # }); /// ``` - pub fn snapshot_at(&self, _timestamp: Timestamp) -> Snapshot { + pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result { unimplemented!() } @@ -92,11 +96,11 @@ impl Client { /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); - /// let timestamp = client.current_timestamp(); + /// let timestamp = client.current_timestamp().await.unwrap(); /// # }); /// ``` - pub fn current_timestamp(&self) -> Timestamp { - unimplemented!() + pub async fn current_timestamp(&self) -> Result { + self.rpc.clone().get_timestamp().await } } @@ -120,7 +124,7 @@ pub struct Connect { } impl Future for Connect { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll { let _config = &self.config; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 960ab084..cd899a4c 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -22,7 +22,7 @@ use std::ops::RangeBounds; /// # futures::executor::block_on(async { /// let connect = Client::connect(Config::default()); /// let client = connect.await.unwrap(); -/// let txn = client.begin(); +/// let txn = client.begin().await.unwrap(); /// # }); /// ``` #[derive(new)] @@ -42,7 +42,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let req = txn.commit(); /// let result: () = req.await.unwrap(); @@ -61,7 +61,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let req = txn.rollback(); /// let result: () = req.await.unwrap(); @@ -80,7 +80,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]); /// let result: () = req.await.unwrap(); @@ -103,7 +103,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let ts: Timestamp = txn.start_ts(); /// # }); @@ -121,7 +121,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let txn = connected_client.begin(); + /// let txn = connected_client.begin().await.unwrap(); /// // ... Do some actions. /// let snap: Snapshot = txn.snapshot(); /// # }); @@ -139,7 +139,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connect = Client::connect(Config::default()); /// # let connected_client = connect.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// txn.set_isolation_level(IsolationLevel::SnapshotIsolation); /// # }); /// ``` @@ -159,7 +159,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let key = "TiKV".to_owned(); /// let req = txn.get(key); /// let result: Value = req.await.unwrap(); @@ -183,7 +183,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()]; /// let req = txn.batch_get(keys); /// let result: Vec = req.await.unwrap(); @@ -214,7 +214,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let key = "TiKV".to_owned(); /// let val = "TiKV".to_owned(); /// let req = txn.set(key, val); @@ -238,7 +238,7 @@ impl Transaction { /// # futures::executor::block_on(async { /// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"])); /// # let connected_client = connecting_client.await.unwrap(); - /// let mut txn = connected_client.begin(); + /// let mut txn = connected_client.begin().await.unwrap(); /// let key = "TiKV".to_owned(); /// let req = txn.delete(key); /// let result: () = req.await.unwrap();