diff --git a/src/raw/client.rs b/src/raw/client.rs index 6cd5028a..71d40b2a 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -1,35 +1,30 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. -use async_recursion::async_recursion; use core::ops::Range; use std::str::FromStr; use std::sync::Arc; use std::u32; +use futures::StreamExt; use log::debug; -use tokio::sync::Semaphore; -use tokio::time::sleep; -use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; +use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::common::Error; use crate::config::Config; use crate::pd::PdClient; use crate::pd::PdRpcClient; -use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse}; use crate::proto::metapb; use crate::raw::lowering::*; +use crate::request::Collect; use crate::request::CollectSingle; use crate::request::EncodeKeyspace; use crate::request::KeyMode; use crate::request::Keyspace; use crate::request::Plan; use crate::request::TruncateKeyspace; -use crate::request::{plan, Collect}; -use crate::store::{HasRegionError, RegionStore}; use crate::Backoff; use crate::BoundRange; use crate::ColumnFamily; -use crate::Error::RegionError; use crate::Key; use crate::KvPair; use crate::Result; @@ -761,42 +756,57 @@ impl Client { max_limit: MAX_RAW_KV_SCAN_LIMIT, }); } - let backoff = DEFAULT_STORE_BACKOFF; - let permits = Arc::new(Semaphore::new(16)); - let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); + + let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw); let mut result = Vec::new(); - let mut current_limit = limit; - let (start_key, end_key) = range.clone().into_keys(); - let mut current_key: Option = Some(start_key); - while current_limit > 0 { - let scan_args = ScanInnerArgs { - start_key: current_key.clone(), - range: range.clone(), - limit, + let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed(); + let mut region_store = + scan_regions + .next() + .await + .ok_or(Error::RegionForRangeNotFound { + range: (cur_range.clone()), + })??; + let mut cur_limit = limit; + + while cur_limit > 0 { + let request = new_raw_scan_request( + cur_range.clone(), + cur_limit, key_only, reverse, - permits: permits.clone(), - backoff: backoff.clone(), - }; - let (res, next_key) = self.retryable_scan(scan_args).await?; - - let mut kvs = res - .map(|r| r.kvs.into_iter().map(Into::into).collect::>()) - .unwrap_or(Vec::new()); - - if !kvs.is_empty() { - current_limit -= kvs.len() as u32; - result.append(&mut kvs); - } - if end_key - .as_ref() - .map(|ek| ek <= next_key.as_ref() && !ek.is_empty()) - .unwrap_or(false) - || next_key.is_empty() - { - break; + self.cf.clone(), + ); + let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request) + .single_region_with_store(region_store.clone()) + .await? + .plan() + .execute() + .await?; + let mut region_scan_res = resp + .kvs + .into_iter() + .map(Into::into) + .collect::>(); + let res_len = region_scan_res.len(); + result.append(&mut region_scan_res); + + // if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region + if res_len < cur_limit as usize { + region_store = match scan_regions.next().await { + Some(Ok(rs)) => { + cur_range = BoundRange::new( + std::ops::Bound::Included(region_store.region_with_leader.range().1), + cur_range.to, + ); + rs + } + Some(Err(e)) => return Err(e), + None => break, + }; + cur_limit -= res_len as u32; } else { - current_key = Some(next_key); + break; } } @@ -809,61 +819,6 @@ impl Client { Ok(result) } - #[async_recursion] - async fn retryable_scan( - &self, - mut scan_args: ScanInnerArgs, - ) -> Result<(Option, Key)> { - let start_key = match scan_args.start_key { - None => return Ok((None, Key::EMPTY)), - Some(ref sk) => sk, - }; - let permit = scan_args.permits.acquire().await.unwrap(); - let region = self.rpc.clone().region_for_key(start_key).await?; - let store = self.rpc.clone().store_for_id(region.id()).await?; - let request = new_raw_scan_request( - scan_args.range.clone(), - scan_args.limit, - scan_args.key_only, - scan_args.reverse, - self.cf.clone(), - ); - let resp = self.do_store_scan(store.clone(), request).await; - drop(permit); - match resp { - Ok(mut r) => { - if let Some(err) = r.region_error() { - let status = - plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone()) - .await?; - return if status { - self.retryable_scan(scan_args.clone()).await - } else if let Some(duration) = scan_args.backoff.next_delay_duration() { - sleep(duration).await; - self.retryable_scan(scan_args.clone()).await - } else { - Err(RegionError(Box::new(err))) - }; - } - Ok((Some(r), region.end_key())) - } - Err(err) => Err(err), - } - } - - async fn do_store_scan( - &self, - store: RegionStore, - scan_request: RawScanRequest, - ) -> Result { - crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request) - .single_region_with_store(store.clone()) - .await? - .plan() - .execute() - .await - } - async fn batch_scan_inner( &self, ranges: impl IntoIterator>, @@ -910,17 +865,6 @@ impl Client { } } -#[derive(Clone)] -struct ScanInnerArgs { - start_key: Option, - range: BoundRange, - limit: u32, - key_only: bool, - reverse: bool, - permits: Arc, - backoff: Backoff, -} - #[cfg(test)] mod tests { use std::any::Any; diff --git a/src/request/plan.rs b/src/request/plan.rs index ffff6c24..369a2ff1 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -187,7 +187,7 @@ where match backoff.next_delay_duration() { Some(duration) => { let region_error_resolved = - handle_region_error(pd_client.clone(), e, region_store).await?; + Self::handle_region_error(pd_client.clone(), e, region_store).await?; // don't sleep if we have resolved the region error if !region_error_resolved { sleep(duration).await; @@ -208,6 +208,102 @@ where } } + // Returns + // 1. Ok(true): error has been resolved, retry immediately + // 2. Ok(false): backoff, and then retry + // 3. Err(Error): can't be resolved, return the error to upper level + async fn handle_region_error( + pd_client: Arc, + e: errorpb::Error, + region_store: RegionStore, + ) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if let Some(not_leader) = e.not_leader { + if let Some(leader) = not_leader.leader { + match pd_client + .update_leader(region_store.region_with_leader.ver_id(), leader) + .await + { + Ok(_) => Ok(true), + Err(e) => { + pd_client.invalidate_region_cache(ver_id).await; + Err(e) + } + } + } else { + // The peer doesn't know who is the current leader. Generally it's because + // the Raft group is in an election, but it's possible that the peer is + // isolated and removed from the Raft group. So it's necessary to reload + // the region from PD. + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } else if e.store_not_match.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.epoch_not_match.is_some() { + Self::on_region_epoch_not_match( + pd_client.clone(), + region_store, + e.epoch_not_match.unwrap(), + ) + .await + } else if e.stale_command.is_some() || e.region_not_found.is_some() { + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } else if e.server_is_busy.is_some() + || e.raft_entry_too_large.is_some() + || e.max_timestamp_not_synced.is_some() + { + Err(Error::RegionError(Box::new(e))) + } else { + // TODO: pass the logger around + // info!("unknwon region error: {:?}", e); + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + } + + // Returns + // 1. Ok(true): error has been resolved, retry immediately + // 2. Ok(false): backoff, and then retry + // 3. Err(Error): can't be resolved, return the error to upper level + async fn on_region_epoch_not_match( + pd_client: Arc, + region_store: RegionStore, + error: EpochNotMatch, + ) -> Result { + let ver_id = region_store.region_with_leader.ver_id(); + if error.current_regions.is_empty() { + pd_client.invalidate_region_cache(ver_id).await; + return Ok(true); + } + + for r in error.current_regions { + if r.id == region_store.region_with_leader.id() { + let region_epoch = r.region_epoch.unwrap(); + let returned_conf_ver = region_epoch.conf_ver; + let returned_version = region_epoch.version; + let current_region_epoch = region_store + .region_with_leader + .region + .region_epoch + .clone() + .unwrap(); + let current_conf_ver = current_region_epoch.conf_ver; + let current_version = current_region_epoch.version; + + // Find whether the current region is ahead of TiKV's. If so, backoff. + if returned_conf_ver < current_conf_ver || returned_version < current_version { + return Ok(false); + } + } + } + // TODO: finer grained processing + pd_client.invalidate_region_cache(ver_id).await; + Ok(false) + } + async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -237,97 +333,6 @@ where } } -// Returns -// 1. Ok(true): error has been resolved, retry immediately -// 2. Ok(false): backoff, and then retry -// 3. Err(Error): can't be resolved, return the error to upper level -pub(crate) async fn handle_region_error( - pd_client: Arc, - e: errorpb::Error, - region_store: RegionStore, -) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if let Some(not_leader) = e.not_leader { - if let Some(leader) = not_leader.leader { - match pd_client - .update_leader(region_store.region_with_leader.ver_id(), leader) - .await - { - Ok(_) => Ok(true), - Err(e) => { - pd_client.invalidate_region_cache(ver_id).await; - Err(e) - } - } - } else { - // The peer doesn't know who is the current leader. Generally it's because - // the Raft group is in an election, but it's possible that the peer is - // isolated and removed from the Raft group. So it's necessary to reload - // the region from PD. - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } - } else if e.store_not_match.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.epoch_not_match.is_some() { - on_region_epoch_not_match(pd_client.clone(), region_store, e.epoch_not_match.unwrap()).await - } else if e.stale_command.is_some() || e.region_not_found.is_some() { - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } else if e.server_is_busy.is_some() - || e.raft_entry_too_large.is_some() - || e.max_timestamp_not_synced.is_some() - { - Err(Error::RegionError(Box::new(e))) - } else { - // TODO: pass the logger around - // info!("unknwon region error: {:?}", e); - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) - } -} - -// Returns -// 1. Ok(true): error has been resolved, retry immediately -// 2. Ok(false): backoff, and then retry -// 3. Err(Error): can't be resolved, return the error to upper level -pub(crate) async fn on_region_epoch_not_match( - pd_client: Arc, - region_store: RegionStore, - error: EpochNotMatch, -) -> Result { - let ver_id = region_store.region_with_leader.ver_id(); - if error.current_regions.is_empty() { - pd_client.invalidate_region_cache(ver_id).await; - return Ok(true); - } - - for r in error.current_regions { - if r.id == region_store.region_with_leader.id() { - let region_epoch = r.region_epoch.unwrap(); - let returned_conf_ver = region_epoch.conf_ver; - let returned_version = region_epoch.version; - let current_region_epoch = region_store - .region_with_leader - .region - .region_epoch - .clone() - .unwrap(); - let current_conf_ver = current_region_epoch.conf_ver; - let current_version = current_region_epoch.version; - - // Find whether the current region is ahead of TiKV's. If so, backoff. - if returned_conf_ver < current_conf_ver || returned_version < current_version { - return Ok(false); - } - } - } - // TODO: finer grained processing - pd_client.invalidate_region_cache(ver_id).await; - Ok(false) -} - impl Clone for RetryableMultiRegion { fn clone(&self) -> Self { RetryableMultiRegion {