Skip to content

Commit

Permalink
*: Use lower version of openssl and make more functions async (#89)
Browse files Browse the repository at this point in the history
* Cargo.toml: use the same version of vendored openssl as tikv

Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm authored Dec 30, 2020
1 parent 5f94d0c commit 2b82183
Showing 10 changed files with 291 additions and 193 deletions.
117 changes: 75 additions & 42 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ path = "tests/integrations/import/mod.rs"
clap = "2.33"
cmd = { git = "https://github.com/tikv/tikv.git", branch = "release-5.0-rc", features = ["prost-codec"] }
crc32fast = "1.2"
crossbeam = "0.7"
async-channel = "1.5"
engine_rocks = { git = "https://github.com/tikv/tikv.git", branch = "release-5.0-rc", default-features = false, features = ["prost-codec"] }
engine_traits = { git = "https://github.com/tikv/tikv.git", branch = "release-5.0-rc", default-features = false }
futures = { version = "0.3", features = ["thread-pool"] }
@@ -51,6 +51,9 @@ hex = "0.3"
collections = { git = "https://github.com/tikv/tikv.git", branch = "release-5.0-rc", default-features = false }
futures-timer = "3"

# make sure this is the same as TiKV's Cargo.lock
openssl-src = "=111.10.2+1.1.1g"

[dependencies.engine_rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
package = "rocksdb"
129 changes: 80 additions & 49 deletions src/import/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright 2018 TiKV Project Authors. Licensed under Apache-2.0.

use std::io::Read;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::Duration;

use futures::future::{self, BoxFuture, FutureExt};
use futures::lock::Mutex;
use futures::future::{self, BoxFuture, FutureExt, TryFuture, TryFutureExt};
use futures::stream::{self, StreamExt};
use futures::SinkExt;
use grpcio::{CallOption, Channel, ChannelBuilder, EnvBuilder, Environment, WriteFlags};
@@ -24,11 +25,11 @@ use super::common::*;
use super::{Error, Result};

pub trait ImportClient: Send + Sync + Clone + 'static {
fn get_region(&self, _: &[u8]) -> Result<RegionInfo> {
fn get_region<'a>(&'a self, _: &'a [u8]) -> BoxFuture<'a, Result<RegionInfo>> {
unimplemented!()
}

fn split_region(&self, _: &RegionInfo, _: &[u8]) -> Result<SplitRegionResponse> {
fn split_region(&self, _: &RegionInfo, _: &[u8]) -> BoxFuture<'_, Result<SplitRegionResponse>> {
unimplemented!()
}

@@ -40,7 +41,7 @@ pub trait ImportClient: Send + Sync + Clone + 'static {
unimplemented!()
}

fn ingest_sst(&self, _: u64, _: IngestRequest) -> Result<IngestResponse> {
fn ingest_sst(&self, _: u64, _: IngestRequest) -> BoxFuture<'_, Result<IngestResponse>> {
unimplemented!()
}

@@ -52,11 +53,18 @@ pub trait ImportClient: Send + Sync + Clone + 'static {
unimplemented!()
}

fn is_space_enough(&self, _: u64, _: u64) -> Result<bool> {
fn is_space_enough(&self, _: u64, _: u64) -> BoxFuture<'_, Result<bool>> {
unimplemented!()
}
}

fn grpc_timeout(secs: u64) -> CallOption {
let write_flags = WriteFlags::default().buffer_hint(true);
CallOption::default()
.timeout(Duration::from_secs(secs))
.write_flags(write_flags)
}

pub struct Client {
pd: Arc<RpcClient>,
env: Arc<Environment>,
@@ -66,7 +74,7 @@ pub struct Client {
}

impl Client {
pub fn new(
pub async fn new(
pd_addr: &str,
cq_count: usize,
min_available_ratio: f64,
@@ -79,7 +87,8 @@ impl Client {
.cq_count(cq_count)
.build(),
);
let rpc_client = RpcClient::new(&cfg, Some(env.clone()), security_mgr.clone())?;
let rpc_client =
RpcClient::new_async(&cfg, Some(env.clone()), security_mgr.clone()).await?;
Ok(Client {
pd: Arc::new(rpc_client),
env,
@@ -89,19 +98,12 @@ impl Client {
})
}

fn option(&self, timeout: Duration) -> CallOption {
let write_flags = WriteFlags::default().buffer_hint(true);
CallOption::default()
.timeout(timeout)
.write_flags(write_flags)
}

fn resolve(&self, store_id: u64) -> Result<Channel> {
let mut channels = self.channels.lock().unwrap();
async fn resolve(&self, store_id: u64) -> Result<Channel> {
let mut channels = self.channels.lock().await;
match channels.entry(store_id) {
HashMapEntry::Occupied(e) => Ok(e.get().clone()),
HashMapEntry::Vacant(e) => {
let store = self.pd.get_store(store_id)?;
let store = self.pd.get_store_async(store_id).await?;
let builder = ChannelBuilder::new(self.env.clone());
let tar_addr = if !store.get_peer_address().is_empty() {
store.get_peer_address()
@@ -114,18 +116,25 @@ impl Client {
}
}

fn post_resolve<T>(&self, store_id: u64, res: Result<T>) -> Result<T> {
res.map_err(|e| {
self.channels.lock().unwrap().remove(&store_id);
e
})
async fn with_resolve<F, R>(&self, store_id: u64, action: F) -> Result<R::Ok>
where
F: FnOnce(Channel) -> R,
R: TryFuture,
R::Error: Into<Error>,
{
let ch = self.resolve(store_id).await?;
let res = action(ch).into_future().await;
if res.is_err() {
self.channels.lock().await.remove(&store_id);
}
res.map_err(Into::into)
}

pub async fn switch_cluster(&self, req: &SwitchModeRequest) -> Result<()> {
let mut futures = Vec::new();
// Exclude tombstone stores.
for store in self.pd.get_all_stores(true)? {
let ch = match self.resolve(store.get_id()) {
let ch = match self.resolve(store.get_id()).await {
Ok(v) => v,
Err(e) => {
error!("get store channel failed"; "store" => ?store, "err" => %e);
@@ -151,7 +160,7 @@ impl Client {
let mut futures = Vec::new();
// Exclude tombstone stores.
for store in self.pd.get_all_stores(true)? {
let ch = match self.resolve(store.get_id()) {
let ch = match self.resolve(store.get_id()).await {
Ok(v) => v,
Err(e) => {
error!("get store channel failed"; "store" => ?store, "err" => %e);
@@ -187,22 +196,36 @@ impl Clone for Client {
}

impl ImportClient for Client {
fn get_region(&self, key: &[u8]) -> Result<RegionInfo> {
self.pd.get_region_info(key).map_err(Error::from)
fn get_region<'a>(&'a self, key: &'a [u8]) -> BoxFuture<'a, Result<RegionInfo>> {
async move {
self.pd
.get_region_info_async(key)
.await
.map_err(Error::from)
}
.boxed()
}

fn split_region(&self, region: &RegionInfo, split_key: &[u8]) -> Result<SplitRegionResponse> {
fn split_region(
&self,
region: &RegionInfo,
split_key: &[u8],
) -> BoxFuture<'_, Result<SplitRegionResponse>> {
let ctx = new_context(region);
let store_id = ctx.get_peer().get_store_id();

let mut req = SplitRegionRequest::default();
req.set_context(ctx);
req.set_split_key(Key::from_encoded_slice(split_key).into_raw()?);

let ch = self.resolve(store_id)?;
let client = TikvClient::new(ch);
let res = client.split_region_opt(&req, self.option(Duration::from_secs(3)));
self.post_resolve(store_id, res.map_err(Error::from))
match Key::from_encoded_slice(split_key).into_raw() {
Ok(key) => req.set_split_key(key),
Err(e) => return future::err(e.into()).boxed(),
};

self.with_resolve(store_id, |ch| async move {
let client = TikvClient::new(ch);
client.split_region_async_opt(&req, grpc_timeout(3))?.await
})
.boxed()
}

fn scatter_region(&self, region: &RegionInfo) -> Result<()> {
@@ -214,23 +237,27 @@ impl ImportClient for Client {
store_id: u64,
req: UploadStream,
) -> BoxFuture<'_, Result<UploadResponse>> {
async move {
let ch = self.resolve(store_id)?;
self.with_resolve(store_id, |ch| async move {
let client = ImportSstClient::new(ch);
let (tx, rx) = client.upload_opt(self.option(Duration::from_secs(30)))?;
let (tx, rx) = client.upload_opt(grpc_timeout(30))?;
stream::iter(req)
.forward(tx.sink_map_err(Error::from))
.await?;
self.post_resolve(store_id, rx.await.map_err(Error::from))
}
Ok::<_, Error>(rx.await?)
})
.boxed()
}

fn ingest_sst(&self, store_id: u64, req: IngestRequest) -> Result<IngestResponse> {
let ch = self.resolve(store_id)?;
let client = ImportSstClient::new(ch);
let res = client.ingest_opt(&req, self.option(Duration::from_secs(30)));
self.post_resolve(store_id, res.map_err(Error::from))
fn ingest_sst(
&self,
store_id: u64,
req: IngestRequest,
) -> BoxFuture<'_, Result<IngestResponse>> {
self.with_resolve(store_id, |ch| async move {
let client = ImportSstClient::new(ch);
client.ingest_async_opt(&req, grpc_timeout(30))?.await
})
.boxed()
}

fn has_region_id(&self, id: u64) -> BoxFuture<'_, Result<bool>> {
@@ -252,11 +279,15 @@ impl ImportClient for Client {
}
}

fn is_space_enough(&self, store_id: u64, size: u64) -> Result<bool> {
let stats = self.pd.get_store_stats(store_id)?;
let available_ratio = stats.available.saturating_sub(size) as f64 / stats.capacity as f64;
// Ensure target store have available disk space
Ok(available_ratio > self.min_available_ratio)
fn is_space_enough(&self, store_id: u64, size: u64) -> BoxFuture<'_, Result<bool>> {
async move {
let stats = self.pd.get_store_stats_async(store_id).await?;
let available_ratio =
stats.available.saturating_sub(size) as f64 / stats.capacity as f64;
// Ensure target store have available disk space
Ok(available_ratio > self.min_available_ratio)
}
.boxed()
}
}

10 changes: 6 additions & 4 deletions src/import/common.rs
Original file line number Diff line number Diff line change
@@ -78,15 +78,15 @@ impl<Client: ImportClient> RangeContext<Client> {
}

/// Reset size and region for the next key.
pub fn reset(&mut self, key: &[u8]) {
pub async fn reset(&mut self, key: &[u8]) {
self.raw_size = 0;
if let Some(ref region) = self.region {
if before_end(key, region.get_end_key()) {
// Still belongs in this region, no need to update.
return;
}
}
self.region = match self.client.get_region(key) {
self.region = match self.client.get_region(key).await {
Ok(region) => Some(region),
Err(e) => {
error!("get region failed"; "err" => %e);
@@ -184,6 +184,8 @@ mod tests {
use super::*;
use crate::import::test_helpers::*;

use futures::executor::block_on;

#[test]
fn test_before_end() {
assert!(before_end(b"ab", b"bc"));
@@ -227,14 +229,14 @@ mod tests {
// Reach size limit.
assert!(ctx.should_stop_before(b"k3"));

ctx.reset(b"k3");
block_on(ctx.reset(b"k3"));
assert_eq!(ctx.raw_size(), 0);
ctx.add(4);
assert_eq!(ctx.raw_size(), 4);
// Reach region end.
assert!(ctx.should_stop_before(b"k4"));

ctx.reset(b"k4");
block_on(ctx.reset(b"k4"));
assert_eq!(ctx.raw_size(), 0);
ctx.add(4);
assert!(!ctx.should_stop_before(b"k5"));
Loading

0 comments on commit 2b82183

Please sign in to comment.