Skip to content

Commit

Permalink
Only return retry after duration that corresponding to the quorum thr…
Browse files Browse the repository at this point in the history
…eshold of validators
  • Loading branch information
halfprice committed May 7, 2024
1 parent 362aa13 commit 182f901
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 21 deletions.
58 changes: 46 additions & 12 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,46 @@ pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> G
}

#[derive(Debug, Default)]
struct RetryableOverloadInfo {
pub struct RetryableOverloadInfo {
// Total stake of validators that are overloaded and request client to retry.
stake: StakeUnit,
pub total_stake: StakeUnit,

// The maximum retry_after_secs requested by overloaded validators.
requested_retry_after: Duration,
// Records requested retry duration by stakes.
pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
}

impl RetryableOverloadInfo {
pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
self.total_stake += stake;
self.stake_requested_retry_after
.entry(retry_after)
.and_modify(|s| *s += stake)
.or_insert(stake);
}

// Gets the duration of retry requested by a quorum of validators with smallest retry durations.
pub fn get_quorum_retry_after(
&self,
good_stake: StakeUnit,
quorum_threshold: StakeUnit,
) -> Duration {
if self.stake_requested_retry_after.is_empty() {
return Duration::from_secs(0);
}

let mut quorum_stake = good_stake;
for (retry_after, stake) in self.stake_requested_retry_after.iter() {
quorum_stake += *stake;
if quorum_stake >= quorum_threshold {
return *retry_after;
}
}
self.stake_requested_retry_after
.last_key_value()
.unwrap()
.0
.clone()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1128,8 +1162,7 @@ where
//
// TODO: currently retryable overload and above overload error look redundant. We want to have a unified
// code path to handle both overload scenarios.
state.retryable_overload_info.stake += weight;
state.retryable_overload_info.requested_retry_after = state.retryable_overload_info.requested_retry_after.max(Duration::from_secs(err.retry_after_secs()));
state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
Expand Down Expand Up @@ -1274,16 +1307,17 @@ where
// we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects
// are locked in validators). If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overload_info.stake
if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
>= quorum_threshold
{
let retry_after_secs = state
.retryable_overload_info
.get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
.as_secs();
return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake: state.retryable_overload_info.stake,
overload_stake: state.retryable_overload_info.total_stake,
errors: group_errors(state.errors),
retry_after_secs: state
.retryable_overload_info
.requested_retry_after
.as_secs(),
retry_after_secs,
};
}

Expand Down
78 changes: 69 additions & 9 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1960,20 +1960,18 @@ async fn test_handle_overload_retry_response() {

let rpc_error = SuiError::RpcError("RPC".into(), "Error".into());

// Have 2f + 1 validators return the overload error and we should get the `SystemOverload` error.
// Have all validators return the overload error and we should get the `SystemOverload` error.
// Uses different retry_after_secs for each validator.
set_retryable_tx_info_response_error(&mut clients, &authority_keys);
for (index, (name, _)) in authority_keys.iter().skip(1).enumerate() {
for (index, (name, _)) in authority_keys.iter().enumerate() {
clients.get_mut(name).unwrap().set_tx_info_response_error(
SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: index as u64,
},
);
}

let agg = get_genesis_agg(authorities.clone(), clients.clone());

// We should get the `SystemOverloadRetryAfter` error with the highest retry_after_secs.
// We should get the `SystemOverloadRetryAfter` error with the retry_after_secs corresponding to the quorum
// threshold of validators.
assert_resp_err(
&agg,
txn.clone(),
Expand All @@ -1995,15 +1993,42 @@ async fn test_handle_overload_retry_response() {
)
.await;

// Change one of the valdiators' errors to RPC error so the system is considered not overloaded now and a `RetryableTransaction`
// Have 2f + 1 validators return the overload error (by setting one authority returning RPC error) and we
// should still get the `SystemOverload` error. The retry_after_secs corresponding to the quorum threshold
// now is the max of the retry_after_secs of the validators.
clients
.get_mut(&authority_keys[0].0)
.unwrap()
.set_tx_info_response_error(rpc_error.clone());
let agg = get_genesis_agg(authorities.clone(), clients.clone());
assert_resp_err(
&agg,
txn.clone(),
|e| {
matches!(
e,
AggregatorProcessTransactionError::SystemOverloadRetryAfter {
retry_after_secs,
..
} if *retry_after_secs == (authority_keys.len() as u64 - 1)
)
},
|e| {
matches!(
e,
SuiError::ValidatorOverloadedRetryAfter { .. } | SuiError::RpcError(..)
)
},
)
.await;

// Change another valdiators' errors to RPC error so the system is considered not overloaded now and a `RetryableTransaction`
// should be returned.
clients
.get_mut(&authority_keys[1].0)
.unwrap()
.set_tx_info_response_error(rpc_error);

let agg = get_genesis_agg(authorities.clone(), clients.clone());

assert_resp_err(
&agg,
txn.clone(),
Expand Down Expand Up @@ -2505,3 +2530,38 @@ fn set_tx_info_response_with_error<'a>(
.set_tx_info_response_error(error.clone());
}
}

#[test]
fn test_retryable_overload_info() {
let mut retryable_overload_info = RetryableOverloadInfo::default();
assert_eq!(
retryable_overload_info.get_quorum_retry_after(3000, 7000),
Duration::from_secs(0)
);

for _ in 0..4 {
retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(1));
}
assert_eq!(
retryable_overload_info.get_quorum_retry_after(3000, 7000),
Duration::from_secs(1)
);

retryable_overload_info = RetryableOverloadInfo::default();
retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(1));
retryable_overload_info.add_stake_retryable_overload(3000, Duration::from_secs(10));
retryable_overload_info.add_stake_retryable_overload(2000, Duration::from_secs(1));
assert_eq!(
retryable_overload_info.get_quorum_retry_after(4000, 7000),
Duration::from_secs(1)
);

retryable_overload_info = RetryableOverloadInfo::default();
for i in 0..10 {
retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(i));
}
assert_eq!(
retryable_overload_info.get_quorum_retry_after(0, 7000),
Duration::from_secs(6)
);
}

0 comments on commit 182f901

Please sign in to comment.