Skip to content

Commit

Permalink
import/client: fix block_on error
Browse files Browse the repository at this point in the history
Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm committed Dec 28, 2020
1 parent 7b757dc commit 60da433
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
5 changes: 3 additions & 2 deletions src/import/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct Client {
}

impl Client {
pub fn new(
pub async fn new(
pd_addr: &str,
cq_count: usize,
min_available_ratio: f64,
Expand All @@ -79,7 +79,8 @@ impl Client {
.cq_count(cq_count)
.build(),
);
let rpc_client = RpcClient::new(&cfg, Some(env.clone()), security_mgr.clone())?;
let rpc_client =
RpcClient::new_async(&cfg, Some(env.clone()), security_mgr.clone()).await?;
Ok(Client {
pd: Arc::new(rpc_client),
env,
Expand Down
3 changes: 2 additions & 1 deletion src/import/kv_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl KVImporter {
self.cfg.num_import_jobs,
self.cfg.min_available_ratio,
self.security_mgr.clone(),
)?;
)
.await?;
let job = {
let mut inner = self.inner.lock().unwrap();
// One engine only related to one ImportJob
Expand Down
5 changes: 3 additions & 2 deletions src/import/kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl ImportKv for ImportKVService {
ctx.spawn(
self.threads
.spawn_with_handle(async move {
let client = Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)?;
let client = Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr).await?;
match client.switch_cluster(req.get_request()).await {
Ok(_) => {
info!("switch cluster"; "req" => ?req.get_request());
Expand Down Expand Up @@ -311,7 +311,8 @@ impl ImportKv for ImportKVService {
.spawn_with_handle(
async move {
let client =
Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)?;
Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)
.await?;
match client.compact_cluster(&compact).await {
Ok(_) => {
info!("compact cluster"; "req" => ?compact);
Expand Down

0 comments on commit 60da433

Please sign in to comment.