-
Notifications
You must be signed in to change notification settings - Fork 131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding retryable to scan #456
adding retryable to scan #456
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution !
The PR overall looks good. And I left some minor suggestions.
Besides, If you would like to verify the correctness when region error happens, consider to use failpoint. Please refer to failpoint_test.rs
. It's not a must before the PR is accepted.
src/raw/client.rs
Outdated
plan::handle_region_error(self.rpc.clone(), err.clone(), store.clone()) | ||
.await?; | ||
return if status { | ||
self.retryable_scan(scan_args.clone()).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to eliminate the recursion and let caller do the retry, to reduce overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the recursion, in the initial implementation, I followed the pattern as outlined in
client-rust/src/request/plan.rs
Line 109 in 54fd720
async fn single_plan_handler( |
Hi @pingyu, are there any more changes you need me to make before this change can be merged? Thanks for taking the time to look at this. |
src/raw/client.rs
Outdated
current_limit -= kvs.len() as u32; | ||
result.append(&mut kvs); | ||
} | ||
if end_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
if end_key | |
if end_key.is_some_and(|ek| ek <= next_key.as_ref()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Changed. end_key
had to be cloned however.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
end_key.as_ref().is_some_and(|ek| ek <= &next_key)
should work.
src/raw/client.rs
Outdated
while current_limit > 0 { | ||
let scan_args = ScanInnerArgs { | ||
start_key: current_key.clone(), | ||
range: range.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should scan from start_key
. Otherwise if there is region merge during scan, we would get duplicated kv paires, and lose some others if limit
is reached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to trace through the logic and from what I understand, we will only be looping if the limit of the scan has not been exhausted. So regardless of a split or merge, won't we be resuming the next scan from the end_key returned by the previous scan call? So if the first scan result returns an end_key "foo", doesn't the system guarantee that if we start the next scan starting from "foo", we are guaranteed to return all results that are lexicographically larger than "foo" and smaller than whatever end_key has been provided. This is regardless of whether the underlying regions are undergoing any churn(due to any splits or merges). There may be gaps in my understanding so will be more than happy to get some more feedback here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to trace through the logic and from what I understand, we will only be looping if the limit of the scan has not been exhausted. So regardless of a split or merge, won't we be resuming the next scan from the end_key returned by the previous scan call? So if the first scan result returns an end_key "foo", doesn't the system guarantee that if we start the next scan starting from "foo", we are guaranteed to return all results that are lexicographically larger than "foo" and smaller than whatever end_key has been provided. This is regardless of whether the underlying regions are undergoing any churn(due to any splits or merges). There may be gaps in my understanding so will be more than happy to get some more feedback here.
You are right. But I think the issue is here in retryable_scan
:
let request = new_raw_scan_request(
scan_args.range.clone(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
As the start key in scan_args.range
is not the end key of previous loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @pingyu. Apologies for not getting to this comment earlier, got caught up with some other deliverables. From the way I understand it, the retryable scan may not need to change the ranges since it is simply retrying the same ranges after the cache has been invalidated. The caller of retryable_scan should be changing the ranges as new regions are encountered. Let me know if that makes sense. I have however tried to simplify the logic using the loop and gotten rid of the pinned future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retryable_scan
should scan from current_key
.
Assume that we scan from key000
to key100
. And there are two regions: region1: key000
~ key050
, region2: key050
~ key100
.
First iteration, we scan kvs from region1.
Second iteration, current_key = key050
, but this time we get an error of EpochNotMatch
because region1 is just merged to region2, and now region2 is key000
~ key100
.
Then we retry, and we should scan from current_key
, which is key050
, instead of key000
. Otherwise, we will get duplicated kvs of frist iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you look at the logic, I am. scanning from current_key in the retry instead of the start_key. Let me break down the logic:
The very first call is from the start key:
let mut current_key: Key = start_key;
//line 166
Then we try scanning from this start key to scan:
let scan_args = ScanInnerArgs {
start_key: **current_key.clone(),** // This is mutated for every iteration
range: range.clone(),
limit: current_limit,
key_only,
reverse,
backoff: backoff.clone(),
};
let (res, next_key) = self.retryable_scan(scan_args).await?; //This is retried with the **current state of scan_args**
In the retryable_scan method we are encapsulating the calls to the scan for the start_key (which is the current_key), now in this call if we encounter the EpochNotMatch, we will be retrying from current_key, not from the start_key. The retry is taking place within the function. So, no duplicate will be returned in this scenario.
If you scroll further down in scan_inner
, we change the current_key with the end_key returned by the retryable_scan
function.
if end_key.clone().is_some_and(|ek| ek <= next_key) {
break;
} else {
current_key = next_key;
}
I understand your concern but given the logic, I am having a hard time seeing how duplicates will be returned. Maybe a good way to address your concern will be to write a test that injects a fault and then change the region mappings in the mock pd and see if the test passes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the start_key
(or from
) we pass to new_raw_scan_request
through range
argument is not the scan_args.start_key
, but the start_key
from the very beginning, the range
argument of scan_inner
.
let request = new_raw_scan_request(
scan_args.range.clone(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pingyu. Now I see where the issue is. Made the change.
@pingyu Thanks for your feedback. It will be great if you could take another look. |
Hi @pingyu , the integration tests pass locally. Not sure why they failed on CI. Please let me know if you need me to make any other change to this PR. |
63dddbb
to
032ceca
Compare
Signed-off-by: limbooverlambda <schakra1@gmail.com>
Signed-off-by: limbooverlambda <schakra1@gmail.com>
Signed-off-by: limbooverlambda <schakra1@gmail.com>
Signed-off-by: limbooverlambda <schakra1@gmail.com>
Signed-off-by: limbooverlambda <schakra1@gmail.com>
032ceca
to
3ee7eff
Compare
@limbooverlambda Please click the "Details" item on the right side of failed CI to check why CI failed. For example:
|
Signed-off-by: limbooverlambda <schakra1@gmail.com>
Signed-off-by: limbooverlambda <schakra1@gmail.com>
Makefile
Outdated
@@ -17,7 +17,7 @@ generate: | |||
check: generate | |||
cargo check --all --all-targets --features "${ALL_FEATURES}" | |||
cargo fmt -- --check | |||
cargo clippy --all-targets --features "${ALL_FEATURES}" -- -D clippy::all | |||
cargo clippy --fix --allow-dirty --all-targets --features "${ALL_FEATURES}" -- -D clippy::all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good idea to add --fix --allow-dirty
, as the fix will be done in CI only and not applied to code base. Besides, some automatic change may not be correct, we need to have a check.
I will open another PR in these few days to fix the errors introduced by the latest toolchain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pingyu . It seems like the toolchain upgrade started flagging the autogenerated docs. --fix --allow-dirty
is a workaround to make the build go through. I did't want to pin the toolchain to a certain version, so this was a quick way to bypass the clippy errors. I agree with you on not using --fix --allow-dirty
. Do you have any suggestion on how I can get this PR merged? Looks like all the tests are passing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a good idea to add
--fix --allow-dirty
, as the fix will be done in CI only and not applied to code base. Besides, some automatic change may not be correct, we need to have a check.I will open another PR in these few days to fix the errors introduced by the latest toolchain.
The check errors for latest toolchain will be fixed by #460.
src/raw/client.rs
Outdated
let region = self.rpc.clone().region_for_key(&start_key).await?; | ||
let store = self.rpc.clone().store_for_id(region.id()).await?; | ||
let request = new_raw_scan_request( | ||
scan_args.range.clone(), | ||
scan_args.limit, | ||
scan_args.key_only, | ||
scan_args.reverse, | ||
self.cf.clone(), | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pingyu The problem might be over here, we are not retrieving the new regions or stores once it is updated. This may need to be moved into the loop.
…lippy fix from Makefile Signed-off-by: limbooverlambda <schakra1@gmail.com>
Cargo.toml
Outdated
@@ -14,6 +14,7 @@ prometheus = ["prometheus/push", "prometheus/process"] | |||
# Enable integration tests with a running TiKV and PD instance. | |||
# Use $PD_ADDRS, comma separated, to set the addresses the tests use. | |||
integration-tests = [] | |||
protobuf-codec = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feature can be removed. We don't have it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/raw/client.rs
Outdated
let region = self.rpc.clone().region_for_key(&start_key).await?; | ||
let store = self.rpc.clone().store_for_id(region.id()).await?; | ||
let request = new_raw_scan_request( | ||
scan_args.range.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @pingyu and IIUC the problem is that scan_args.range
never changes across iterations.
scan_args.start_key
is totally fine; it moves forward as the scan makes progress. But since scan_args.range
doesn't change, it's possible for retryable_scan
to have scan_args.range=(key000, key100) scan_args.start_key=key050
. In this case, if we encounter a region merge, the raw scan request constructed here will scan the new region from key000 again, getting duplicate KVs.
I think one way to fix it is to gradually shrink the current range in the loop of scan_inner
, similar to what the old code did:
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, now I see it. The scan request is being constructed on the range but the range itself is not being changed for every iteration. So essentially, mutating the range with the updated start_key will fix the issue. Thanks @hbisheng and @pingyu for being so patient with the explanation. Adding the fix.
Signed-off-by: limbooverlambda <schakra1@gmail.com>
a7b4b74
to
49f5dba
Compare
Signed-off-by: limbooverlambda <schakra1@gmail.com>
src/raw/client.rs
Outdated
#[derive(Clone)] | ||
struct ScanInnerArgs { | ||
start_key: Key, | ||
range: BoundRange, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
range: BoundRange, | |
end_key: Option<Key>, |
As start_key
and range.from
are always the same, I think we can keep only one from them. And as it's easier to convert start_key
and end_key
to range
than opposite, I suggest to have start_key
and end_key
in ScanInnerArgs
.
Rest LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed! Thanks for the review.
… end_key Signed-off-by: limbooverlambda <schakra1@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM~
break; | ||
} else { | ||
current_key = next_key; | ||
range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
range = BoundRange::new(std::ops::Bound::Included(current_key.clone()), range.to); |
/cc @iosmanthus |
PTAL~ |
@limbooverlambda Thanks for your contribution ! |
solve: #455
We were running into an issue with scans where periodically we noticed scans returning empty results for datasets that were present in a cluster. The hypothesis was that the scans were returning empties when the regions are undergoing splits. While trying to reproduce the issue (by issuing splits from pd-ctl), we found out that when there's a region error (epoch_version_mismatch et al), the scan_inner is not triggering any cache invalidations and subsequent retries. The scan simply returns an empty. This PR is fixing the issue by triggering the invalidations and retry for such issues.
@pingyu @ekexium @andylokandy.