Skip to content

Commit

Permalink
import/client: fix block_on error
Browse files Browse the repository at this point in the history
Signed-off-by: kennytm <kennytm@gmail.com>
  • Loading branch information
kennytm committed Dec 28, 2020
1 parent 7b757dc commit d218600
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 30 deletions.
8 changes: 1 addition & 7 deletions src/import/kv_importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,7 @@ impl KVImporter {

/// Import the engine to TiKV stores.
/// Engine can not be imported before it is closed.
pub async fn import_engine(&self, uuid: Uuid, pd_addr: &str) -> Result<()> {
let client = Client::new(
pd_addr,
self.cfg.num_import_jobs,
self.cfg.min_available_ratio,
self.security_mgr.clone(),
)?;
pub async fn import_engine(&self, uuid: Uuid, client: Client) -> Result<()> {
let job = {
let mut inner = self.inner.lock().unwrap();
// One engine only related to one ImportJob
Expand Down
53 changes: 30 additions & 23 deletions src/import/kv_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,22 @@ impl ImportKv for ImportKVService {

ctx.spawn(
self.threads
.spawn_with_handle(async move {
let client = Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)?;
match client.switch_cluster(req.get_request()).await {
Ok(_) => {
info!("switch cluster"; "req" => ?req.get_request());
Ok(SwitchModeResponse::default())
}
Err(e) => {
error!("switch cluster failed"; "req" => ?req.get_request(), "err" => %e);
Err(e)
.spawn_with_handle({
let client = Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr);
async move {
match client?.switch_cluster(req.get_request()).await {
Ok(_) => {
info!("switch cluster"; "req" => ?req.get_request());
Ok(SwitchModeResponse::default())
}
Err(e) => {
error!("switch cluster failed"; "req" => ?req.get_request(), "err" => %e);
Err(e)
}
}
}
}
.then(move |res| send_rpc_response!(res, sink, label, timer))
).unwrap(),
.then(move |res| send_rpc_response!(res, sink, label, timer))
}).unwrap(),
)
}

Expand Down Expand Up @@ -244,14 +245,20 @@ impl ImportKv for ImportKVService {

ctx.spawn(
self.threads
.spawn_with_handle(
.spawn_with_handle({
let client = Client::new(
req.get_pd_addr(),
self.cfg.num_import_jobs,
self.cfg.min_available_ratio,
import.security_mgr.clone(),
);
async move {
let uuid = Uuid::from_slice(req.get_uuid())?;
import.import_engine(uuid, req.get_pd_addr()).await?;
import.import_engine(uuid, client?).await?;
Ok(ImportEngineResponse::default())
}
.then(move |res| send_rpc_response!(res, sink, label, timer)),
)
.then(move |res| send_rpc_response!(res, sink, label, timer))
})
.unwrap(),
)
}
Expand Down Expand Up @@ -308,11 +315,11 @@ impl ImportKv for ImportKVService {

ctx.spawn(
self.threads
.spawn_with_handle(
.spawn_with_handle({
let client =
Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr);
async move {
let client =
Client::new(req.get_pd_addr(), 1, min_available_ratio, security_mgr)?;
match client.compact_cluster(&compact).await {
match client?.compact_cluster(&compact).await {
Ok(_) => {
info!("compact cluster"; "req" => ?compact);
Ok(CompactClusterResponse::default())
Expand All @@ -323,8 +330,8 @@ impl ImportKv for ImportKVService {
}
}
}
.then(move |res| send_rpc_response!(res, sink, label, timer)),
)
.then(move |res| send_rpc_response!(res, sink, label, timer))
})
.unwrap(),
)
}
Expand Down

0 comments on commit d218600

Please sign in to comment.