diff --git a/src/request/plan.rs b/src/request/plan.rs index fc1cb1d5..f092b7c8 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -148,9 +148,27 @@ where ) -> Result<::Result> { // limit concurrent requests let permit = permits.acquire().await.unwrap(); - let mut resp = plan.execute().await?; + let res = plan.execute().await; drop(permit); + let is_grpc_error = |e: &Error| matches!(e, Error::GrpcAPI(_) | Error::Grpc(_)); + let mut resp = match res { + Ok(resp) => resp, + Err(e) if is_grpc_error(&e) => { + return Self::handle_grpc_error( + pd_client, + plan, + region_store, + backoff, + permits, + preserve_region_results, + e, + ) + .await; + } + Err(e) => return Err(e), + }; + if let Some(e) = resp.key_errors() { Ok(vec![Err(Error::MultipleKeyErrors(e))]) } else if let Some(e) = resp.region_error() { @@ -273,6 +291,34 @@ where pd_client.invalidate_region_cache(ver_id).await; Ok(false) } + + async fn handle_grpc_error( + pd_client: Arc, + plan: P, + region_store: RegionStore, + mut backoff: Backoff, + permits: Arc, + preserve_region_results: bool, + e: Error, + ) -> Result<::Result> { + debug!("handle grpc error: {:?}", e); + let ver_id = region_store.region_with_leader.ver_id(); + pd_client.invalidate_region_cache(ver_id).await; + match backoff.next_delay_duration() { + Some(duration) => { + sleep(duration).await; + Self::single_plan_handler( + pd_client, + plan, + backoff, + permits, + preserve_region_results, + ) + .await + } + None => Err(e), + } + } } impl Clone for RetryableMultiRegion {