Skip to content

Commit

Permalink
Fix reverse scan for scene of multiple regions (#438)
Browse files Browse the repository at this point in the history
* reproduce issue

Signed-off-by: Ping Yu <yuping@pingcap.com>

* fix reverse range

Signed-off-by: Ping Yu <yuping@pingcap.com>

---------

Signed-off-by: Ping Yu <yuping@pingcap.com>
  • Loading branch information
pingyu authored Nov 28, 2023
1 parent bd14485 commit bbaf317
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 17 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ export RUSTFLAGS=-Dwarnings

.PHONY: default check unit-test integration-tests test doc docker-pd docker-kv docker all

PD_ADDRS ?= "127.0.0.1:2379"
MULTI_REGION ?= 1
export PD_ADDRS ?= 127.0.0.1:2379
export MULTI_REGION ?= 1

ALL_FEATURES := integration-tests

Expand Down
4 changes: 4 additions & 0 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use crate::proto::kvrpcpb;
use crate::proto::kvrpcpb::ApiVersion;
use crate::proto::metapb;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::range_request;
use crate::request::plan::ResponseWithShard;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::DefaultProcessor;
use crate::request::KvRequest;
use crate::request::Merge;
use crate::request::Process;
use crate::request::RangeRequest;
use crate::request::Shardable;
use crate::request::SingleKey;
use crate::shardable_key;
Expand Down Expand Up @@ -227,6 +229,7 @@ impl KvRequest for kvrpcpb::RawDeleteRangeRequest {
type Response = kvrpcpb::RawDeleteRangeResponse;
}

range_request!(kvrpcpb::RawDeleteRangeRequest);
shardable_range!(kvrpcpb::RawDeleteRangeRequest);

pub fn new_raw_scan_request(
Expand All @@ -250,6 +253,7 @@ impl KvRequest for kvrpcpb::RawScanRequest {
type Response = kvrpcpb::RawScanResponse;
}

range_request!(kvrpcpb::RawScanRequest); // TODO: support reverse raw scan.
shardable_range!(kvrpcpb::RawScanRequest);

impl Merge<kvrpcpb::RawScanResponse> for Collect {
Expand Down
1 change: 1 addition & 0 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use self::plan_builder::SingleKey;
pub use self::shard::Batchable;
pub use self::shard::HasNextBatch;
pub use self::shard::NextBatch;
pub use self::shard::RangeRequest;
pub use self::shard::Shardable;
use crate::backoff::Backoff;
use crate::backoff::DEFAULT_REGION_BACKOFF;
Expand Down
41 changes: 39 additions & 2 deletions src/request/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::request::KvRequest;
use crate::request::Plan;
use crate::request::ResolveLock;
use crate::store::RegionStore;
use crate::store::Request;
use crate::Result;

macro_rules! impl_inner_shardable {
Expand Down Expand Up @@ -204,6 +205,32 @@ macro_rules! shardable_keys {
};
}

pub trait RangeRequest: Request {
fn is_reverse(&self) -> bool {
false
}
}

#[doc(hidden)]
#[macro_export]
macro_rules! range_request {
($type_: ty) => {
impl RangeRequest for $type_ {}
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! reversible_range_request {
($type_: ty) => {
impl RangeRequest for $type_ {
fn is_reverse(&self) -> bool {
self.reverse
}
}
};
}

#[doc(hidden)]
#[macro_export]
macro_rules! shardable_range {
Expand All @@ -215,8 +242,13 @@ macro_rules! shardable_range {
&self,
pd_client: &Arc<impl $crate::pd::PdClient>,
) -> BoxStream<'static, $crate::Result<(Self::Shard, $crate::store::RegionStore)>> {
let start_key = self.start_key.clone().into();
let end_key = self.end_key.clone().into();
let mut start_key = self.start_key.clone().into();
let mut end_key = self.end_key.clone().into();
// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// Therefore, before fetching the regions from PD, it is necessary to swap the values of start_key and end_key.
if self.is_reverse() {
std::mem::swap(&mut start_key, &mut end_key);
}
$crate::store::store_stream_for_range((start_key, end_key), pd_client.clone())
}

Expand All @@ -227,8 +259,13 @@ macro_rules! shardable_range {
) -> $crate::Result<()> {
self.set_context(store.region_with_leader.context()?);

// In a reverse range request, the range is in the meaning of [end_key, start_key), i.e. end_key <= x < start_key.
// As a result, after obtaining start_key and end_key from PD, we need to swap their values when assigning them to the request.
self.start_key = shard.0.into();
self.end_key = shard.1.into();
if self.is_reverse() {
std::mem::swap(&mut self.start_key, &mut self.end_key);
}
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ use crate::request::KvRequest;
use crate::request::Merge;
use crate::request::NextBatch;
use crate::request::Process;
use crate::request::RangeRequest;
use crate::request::ResponseWithShard;
use crate::request::Shardable;
use crate::request::SingleKey;
use crate::request::{Batchable, StoreRequest};
use crate::reversible_range_request;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
Expand Down Expand Up @@ -170,6 +172,7 @@ impl KvRequest for kvrpcpb::ScanRequest {
type Response = kvrpcpb::ScanResponse;
}

reversible_range_request!(kvrpcpb::ScanRequest);
shardable_range!(kvrpcpb::ScanRequest);

impl Merge<kvrpcpb::ScanResponse> for Collect {
Expand Down
107 changes: 94 additions & 13 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1028,27 +1028,108 @@ async fn txn_scan_reverse() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;

let k1 = b"a1".to_vec();
let k2 = b"a2".to_vec();
let v1 = b"b1".to_vec();
let v2 = b"b2".to_vec();

let reverse_resp = vec![
(Key::from(k2.clone()), v2.clone()),
(Key::from(k1.clone()), v1.clone()),
];
let k1 = b"k1".to_vec();
let k2 = b"k2".to_vec();
let k3 = b"k3".to_vec();

let v1 = b"v1".to_vec();
let v2 = b"v2".to_vec();
let v3 = b"v3".to_vec();

// Pessimistic option is not stable in this case. Use optimistic options instead.
let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn);
let mut t = client.begin_with_options(option.clone()).await?;
t.put(k1.clone(), v1).await?;
t.put(k2.clone(), v2).await?;
t.put(k1.clone(), v1.clone()).await?;
t.put(k2.clone(), v2.clone()).await?;
t.put(k3.clone(), v3.clone()).await?;
t.commit().await?;

let mut t2 = client.begin_with_options(option).await?;
{
// For [k1, k3]:
let bound_range: BoundRange = (k1.clone()..=k3.clone()).into();
let resp = t2
.scan_reverse(bound_range, 3)
.await?
.map(|kv| (kv.0, kv.1))
.collect::<Vec<(Key, Vec<u8>)>>();
assert_eq!(
resp,
vec![
(Key::from(k3.clone()), v3.clone()),
(Key::from(k2.clone()), v2.clone()),
(Key::from(k1.clone()), v1.clone()),
]
);
}
{
// For [k1, k3):
let bound_range: BoundRange = (k1.clone()..k3.clone()).into();
let resp = t2
.scan_reverse(bound_range, 3)
.await?
.map(|kv| (kv.0, kv.1))
.collect::<Vec<(Key, Vec<u8>)>>();
assert_eq!(
resp,
vec![
(Key::from(k2.clone()), v2.clone()),
(Key::from(k1.clone()), v1),
]
);
}
{
// For (k1, k3):
let mut start_key = k1.clone();
start_key.push(0);
let bound_range: BoundRange = (start_key..k3).into();
let resp = t2
.scan_reverse(bound_range, 3)
.await?
.map(|kv| (kv.0, kv.1))
.collect::<Vec<(Key, Vec<u8>)>>();
assert_eq!(resp, vec![(Key::from(k2), v2),]);
}
t2.commit().await?;

Ok(())
}

#[tokio::test]
#[serial]
async fn txn_scan_reverse_multi_regions() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default()).await?;

// Keys in `keys` should locate in different regions. See `init()` for boundary of regions.
let keys: Vec<Key> = vec![
0x00000000_u32.to_be_bytes().to_vec(),
0x40000000_u32.to_be_bytes().to_vec(),
0x80000000_u32.to_be_bytes().to_vec(),
0xC0000000_u32.to_be_bytes().to_vec(),
]
.into_iter()
.map(Into::into)
.collect();
let values: Vec<Vec<u8>> = (0..keys.len())
.map(|i| format!("v{}", i).into_bytes())
.collect();
let bound_range: BoundRange =
(keys.first().unwrap().clone()..=keys.last().unwrap().clone()).into();

// Pessimistic option is not stable in this case. Use optimistic options instead.
let option = TransactionOptions::new_optimistic().drop_check(tikv_client::CheckLevel::Warn);
let mut t = client.begin_with_options(option.clone()).await?;
let mut reverse_resp = Vec::with_capacity(keys.len());
for (k, v) in keys.into_iter().zip(values.into_iter()).rev() {
t.put(k.clone(), v.clone()).await?;
reverse_resp.push((k, v));
}
t.commit().await?;

let mut t2 = client.begin_with_options(option).await?;
let bound_range: BoundRange = (k1..=k2).into();
let resp = t2
.scan_reverse(bound_range, 2)
.scan_reverse(bound_range, 100)
.await?
.map(|kv| (kv.0, kv.1))
.collect::<Vec<(Key, Vec<u8>)>>();
Expand Down

0 comments on commit bbaf317

Please sign in to comment.