Skip to content

Commit

Permalink
adding retryable to scan (tikv#456)
Browse files Browse the repository at this point in the history
Co-authored-by: Ping Yu <yuping@pingcap.cn>
  • Loading branch information
limbooverlambda and pingyu authored Aug 22, 2024
1 parent 29745fc commit 5a8a3c4
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 148 deletions.
147 changes: 96 additions & 51 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use core::ops::Range;

use std::str::FromStr;
use std::sync::Arc;

use futures::StreamExt;
use log::debug;
use tokio::time::sleep;

use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::common::Error;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
use crate::proto::metapb;
use crate::raw::lowering::*;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::request::TruncateKeyspace;
use crate::request::{plan, Collect};
use crate::store::{HasRegionError, RegionStore};
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
use crate::Error::RegionError;
use crate::Key;
use crate::KvPair;
use crate::Result;
Expand Down Expand Up @@ -755,57 +758,37 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}

let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let backoff = DEFAULT_STORE_BACKOFF;
let mut range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
.next()
.await
.ok_or(Error::RegionForRangeNotFound {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;

while cur_limit > 0 {
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
let mut current_limit = limit;
let (start_key, end_key) = range.clone().into_keys();
let mut current_key: Key = start_key;

while current_limit > 0 {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
end_key: end_key.clone(),
limit: current_limit,
key_only,
reverse,
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);

// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Some(Ok(rs)) => {
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
rs
}
Some(Err(e)) => return Err(e),
None => break,
};
cur_limit -= res_len as u32;
} else {
backoff: backoff.clone(),
};
let (res, next_key) = self.retryable_scan(scan_args).await?;

let mut kvs = res
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
.unwrap_or(Vec::new());

if !kvs.is_empty() {
current_limit -= kvs.len() as u32;
result.append(&mut kvs);
}
if end_key.clone().is_some_and(|ek| ek <= next_key) {
break;
} else {
current_key = next_key;
range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to);
}
}

Expand All @@ -818,6 +801,58 @@ impl<PdC: PdClient> Client<PdC> {
Ok(result)
}

async fn retryable_scan(
&self,
mut scan_args: ScanInnerArgs,
) -> Result<(Option<RawScanResponse>, Key)> {
let start_key = scan_args.start_key;
let end_key = scan_args.end_key;
loop {
let region = self.rpc.clone().region_for_key(&start_key).await?;
let store = self.rpc.clone().store_for_id(region.id()).await?;
let request = new_raw_scan_request(
(start_key.clone(), end_key.clone()).into(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
let resp = self.do_store_scan(store.clone(), request.clone()).await;
return match resp {
Ok(mut r) => {
if let Some(err) = r.region_error() {
let status =
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone())
.await?;
if status {
continue;
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
sleep(duration).await;
continue;
} else {
return Err(RegionError(Box::new(err)));
}
}
Ok((Some(r), region.end_key()))
}
Err(err) => Err(err),
};
}
}

async fn do_store_scan(
&self,
store: RegionStore,
scan_request: RawScanRequest,
) -> Result<RawScanResponse> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
.single_region_with_store(store.clone())
.await?
.plan()
.execute()
.await
}

async fn batch_scan_inner(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
Expand Down Expand Up @@ -864,6 +899,16 @@ impl<PdC: PdClient> Client<PdC> {
}
}

#[derive(Clone)]
struct ScanInnerArgs {
start_key: Key,
end_key: Option<Key>,
limit: u32,
key_only: bool,
reverse: bool,
backoff: Backoff,
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
Loading

0 comments on commit 5a8a3c4

Please sign in to comment.