Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async/await in the transaction APIs #91

Merged
merged 3 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ os:
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
- osx
rust:
# Requires nightly for now, stable can be re-enabled when 1.36 is stable.
# Requires nightly for now, stable can be re-enabled when async/await is stable.
# - stable
- nightly
env:
Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F

## Using the client

The TiKV client is a Rust library (crate). It requires version 1.36 of the compiler and standard libraries (which will be stable from the 4th July 2019, see below for ensuring compatibility).
The TiKV client is a Rust library (crate). It requires a nightly Rust compiler with async/await support.

To use this crate in your project, add it as a dependency in your `Cargo.toml`:

Expand All @@ -28,8 +28,6 @@ The client requires a Git dependency until we can [publish it](https://github.co

There are [examples](examples) which show how to use the client in a Rust program.

The examples and documentation use async/await syntax. This is a new feature in Rust and is currently unstable. To use async/await you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you alter this comment rather than delete it please? I think it is important for users to know


## Access the documentation

We recommend using the cargo-generated documentation to browse and understand the API. We've done
Expand All @@ -52,7 +50,7 @@ To check what version of Rust you are using, run
rustc --version
```

You'll see something like `rustc 1.36.0-nightly (a784a8022 2019-05-09)` where the `1.36.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
You'll see something like `rustc 1.38.0-nightly (4b65a86eb 2019-07-15)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use

```bash
rustup toolchain install nightly
Expand Down
8 changes: 5 additions & 3 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tikv_client::{
};

async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin();
let mut txn = client.begin().await.expect("Could not begin a transaction");
future::join_all(
pairs
.into_iter()
Expand All @@ -28,7 +28,7 @@ async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>
}

async fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
let txn = client.begin().await.expect("Could not begin a transaction");
txn.get(key).await.expect("Could not get value")
}

Expand All @@ -37,6 +37,8 @@ async fn get(client: &Client, key: Key) -> Value {
async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
client
.begin()
.await
.expect("Could not begin a transaction")
.scan(range)
.into_stream()
.take_while(move |r| {
Expand All @@ -53,7 +55,7 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
}

async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin();
let mut txn = client.begin().await.expect("Could not begin a transaction");
txn.set_isolation_level(IsolationLevel::ReadCommitted);
let _: Vec<()> = stream::iter(keys.into_iter())
.then(|p| {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Long and nested future chains can quickly result in large generic types.
#![type_length_limit = "16777216"]
#![allow(clippy::redundant_closure)]
#![feature(async_await)]

//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
//! distributed transactional Key-Value database written in Rust.
Expand Down
6 changes: 5 additions & 1 deletion src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
kv::BoundRange,
raw::ColumnFamily,
rpc::{
pd::{PdClient, Region, RegionId, RetryClient, StoreId},
pd::{PdClient, Region, RegionId, RetryClient, StoreId, Timestamp},
security::SecurityManager,
tikv::KvClient,
Address, RawContext, Store, TxnContext,
Expand Down Expand Up @@ -225,6 +225,10 @@ impl<PdC: PdClient> RpcClient<PdC> {
future::err(Error::unimplemented())
}

pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
Arc::clone(&self.pd).get_timestamp()
}

// Returns a Steam which iterates over the contexts for each region covered by range.
fn regions_for_range(
self: Arc<Self>,
Expand Down
26 changes: 15 additions & 11 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::{Snapshot, Timestamp, Transaction};
use crate::{Config, Error};
use crate::rpc::RpcClient;
use crate::{Config, Result};

use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
use std::sync::Arc;

/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
pub struct Client;
pub struct Client {
rpc: Arc<RpcClient>,
}

impl Client {
/// Creates a new [`Client`](Client) once the [`Connect`](Connect) resolves.
Expand Down Expand Up @@ -38,13 +42,13 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let transaction = client.begin();
/// let transaction = client.begin().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = commit.await.unwrap();
/// # });
/// ```
pub fn begin(&self) -> Transaction {
pub async fn begin(&self) -> Result<Transaction> {
unimplemented!()
}

Expand All @@ -57,11 +61,11 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let snapshot = client.snapshot();
/// let snapshot = client.snapshot().await.unwrap();
/// // ... Issue some commands.
/// # });
/// ```
pub fn snapshot(&self) -> Snapshot {
pub async fn snapshot(&self) -> Result<Snapshot> {
unimplemented!()
}

Expand All @@ -79,7 +83,7 @@ impl Client {
/// // ... Issue some commands.
/// # });
/// ```
pub fn snapshot_at(&self, _timestamp: Timestamp) -> Snapshot {
pub async fn snapshot_at(&self, _timestamp: Timestamp) -> Result<Snapshot> {
unimplemented!()
}

Expand All @@ -92,11 +96,11 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let timestamp = client.current_timestamp();
/// let timestamp = client.current_timestamp().await.unwrap();
/// # });
/// ```
pub fn current_timestamp(&self) -> Timestamp {
unimplemented!()
pub async fn current_timestamp(&self) -> Result<Timestamp> {
Arc::clone(&self.rpc).get_timestamp().await
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer self.rpc.clone()

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with both, but the official doc says "The Arc::clone(&from) syntax is the most idiomatic": https://doc.rust-lang.org/stable/std/sync/struct.Arc.html#cloning-references

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, I don't see the explicit form unless it is necessary for disambiguation (I had a quick search though the Rust repo and although it is hard to tell where .clone is used on an Arc, it does seem to be used more often than the explicit version).

}
}

Expand All @@ -120,7 +124,7 @@ pub struct Connect {
}

impl Future for Connect {
type Output = Result<Client, Error>;
type Output = Result<Client>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _config = &self.config;
Expand Down
22 changes: 11 additions & 11 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::ops::RangeBounds;
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let txn = client.begin();
/// let txn = client.begin().await.unwrap();
/// # });
/// ```
#[derive(new)]
Expand All @@ -42,7 +42,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.commit();
/// let result: () = req.await.unwrap();
Expand All @@ -61,7 +61,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.rollback();
/// let result: () = req.await.unwrap();
Expand All @@ -80,7 +80,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let req = txn.lock_keys(vec!["TiKV".to_owned(), "Rust".to_owned()]);
/// let result: () = req.await.unwrap();
Expand All @@ -103,7 +103,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let ts: Timestamp = txn.start_ts();
/// # });
Expand All @@ -121,7 +121,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let txn = connected_client.begin();
/// let txn = connected_client.begin().await.unwrap();
/// // ... Do some actions.
/// let snap: Snapshot = txn.snapshot();
/// # });
Expand All @@ -139,7 +139,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connect = Client::connect(Config::default());
/// # let connected_client = connect.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// txn.set_isolation_level(IsolationLevel::SnapshotIsolation);
/// # });
/// ```
Expand All @@ -159,7 +159,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.get(key);
/// let result: Value = req.await.unwrap();
Expand All @@ -183,7 +183,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let keys = vec!["TiKV".to_owned(), "TiDB".to_owned()];
/// let req = txn.batch_get(keys);
/// let result: Vec<KvPair> = req.await.unwrap();
Expand Down Expand Up @@ -214,7 +214,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let val = "TiKV".to_owned();
/// let req = txn.set(key, val);
Expand All @@ -238,7 +238,7 @@ impl Transaction {
/// # futures::executor::block_on(async {
/// # let connecting_client = Client::connect(Config::new(vec!["192.168.0.100", "192.168.0.101"]));
/// # let connected_client = connecting_client.await.unwrap();
/// let mut txn = connected_client.begin();
/// let mut txn = connected_client.begin().await.unwrap();
/// let key = "TiKV".to_owned();
/// let req = txn.delete(key);
/// let result: () = req.await.unwrap();
Expand Down