Skip to content

Commit

Permalink
Quorum driver take server requested retry interval into consideration
Browse files Browse the repository at this point in the history
  • Loading branch information
halfprice committed May 6, 2024
1 parent c1dfdcd commit 30bae9d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 20 deletions.
32 changes: 26 additions & 6 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,21 @@ pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> G
.collect()
}

#[derive(Debug)]
struct RetryableOverloadInfo {
stake: StakeUnit,
requested_retry_after: Duration,
}

impl Default for RetryableOverloadInfo {
fn default() -> Self {
Self {
stake: 0,
requested_retry_after: Duration::from_secs(1),
}
}
}

#[derive(Debug)]
struct ProcessTransactionState {
// The list of signatures gathered at any point
Expand All @@ -309,7 +324,7 @@ struct ProcessTransactionState {
// Validators that are overloaded with txns pending execution.
overloaded_stake: StakeUnit,
// Validators that are overloaded and request client to retry.
retryable_overloaded_stake: StakeUnit,
retryable_overload_info: RetryableOverloadInfo,
// If there are conflicting transactions, we note them down and may attempt to retry
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
Expand Down Expand Up @@ -1046,7 +1061,7 @@ where
object_or_package_not_found_stake: 0,
non_retryable_stake: 0,
overloaded_stake: 0,
retryable_overloaded_stake: 0,
retryable_overload_info: Default::default(),
retryable: true,
conflicting_tx_digests: Default::default(),
conflicting_tx_total_stake: 0,
Expand Down Expand Up @@ -1119,7 +1134,8 @@ where
//
// TODO: currently retryable overload and above overload error look redundant. We want to have a unified
// code path to handle both overload scenarios.
state.retryable_overloaded_stake += weight;
state.retryable_overload_info.stake += weight;
state.retryable_overload_info.requested_retry_after = state.retryable_overload_info.requested_retry_after.max(Duration::from_secs(err.retry_after_secs()));
}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
Expand Down Expand Up @@ -1264,13 +1280,17 @@ where
// we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects)
// are locked in validators. If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overloaded_stake >= quorum_threshold
if state.tx_signatures.total_votes() + state.retryable_overload_info.stake
>= quorum_threshold
{
// TODO: make use of retry_after_secs, which is currently not used.
return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake: state.retryable_overloaded_stake,
overload_stake: state.retryable_overload_info.stake,
errors: group_errors(state.errors),
retry_after_secs: 0,
retry_after_secs: state
.retryable_overload_info
.requested_retry_after
.as_secs(),
};
}

Expand Down
19 changes: 14 additions & 5 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<A: Clone> QuorumDriver<A> {
);
return Ok(());
}
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr)
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
.await
}

Expand All @@ -168,9 +168,11 @@ impl<A: Clone> QuorumDriver<A> {
tx_cert: Option<CertifiedTransaction>,
old_retry_times: u32,
client_addr: Option<SocketAddr>,
retry_after: Option<Duration>,
) -> SuiResult<()> {
let next_retry_after =
Instant::now() + Duration::from_millis(200 * u64::pow(2, old_retry_times));
let next_retry_after = Instant::now()
+ Duration::from_millis(200 * u64::pow(2, old_retry_times))
.max(retry_after.unwrap_or(Duration::from_secs(0)));
sleep_until(next_retry_after).await;

let tx_cert = match tx_cert {
Expand Down Expand Up @@ -373,7 +375,11 @@ where
retry_after_secs,
}) => {
self.metrics.total_retryable_overload_errors.inc();
debug!(?tx_digest, ?errors, "System overload and retry");
debug!(
?tx_digest,
?errors,
"System overload and retry after secs {retry_after_secs}",
);
Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
overload_stake,
errors,
Expand Down Expand Up @@ -875,7 +881,9 @@ where
client_addr,
));
}
Some(QuorumDriverError::SystemOverloadRetryAfter { .. }) => {
Some(QuorumDriverError::SystemOverloadRetryAfter {
retry_after_secs, ..
}) => {
// Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already
// locked inside validators, we need to perform continuous retry and ignore `max_retry_times`.
// TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number
Expand All @@ -887,6 +895,7 @@ where
tx_cert,
old_retry_times,
client_addr,
Some(Duration::from_secs(retry_after_secs)),
));
}
Some(qd_error) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ async fn test_quorum_driver_handling_overload_and_retry() {

// Make local authority client to always return SystemOverloadedRetryAfter error.
let fault_config = LocalAuthorityClientFaultConfig {
overload_retry_after_handle_transaction: true,
overload_retry_after_handle_transaction: Some(Duration::from_secs(30)),
..Default::default()
};
let mut clients = aggregator.clone_inner_clients_test_only();
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct LocalAuthorityClientFaultConfig {
pub fail_after_handle_transaction: bool,
pub fail_before_handle_confirmation: bool,
pub fail_after_handle_confirmation: bool,
pub overload_retry_after_handle_transaction: bool,
pub overload_retry_after_handle_transaction: Option<Duration>,
}

impl LocalAuthorityClientFaultConfig {
Expand Down Expand Up @@ -76,9 +76,9 @@ impl AuthorityAPI for LocalAuthorityClient {
error: "Mock error after handle_transaction".to_owned(),
});
}
if self.fault_config.overload_retry_after_handle_transaction {
if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
return Err(SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
retry_after_secs: duration.as_secs(),
});
}
result
Expand Down
16 changes: 11 additions & 5 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1958,14 +1958,17 @@ async fn test_handle_overload_retry_response() {
666, // this is a dummy value which does not matter
);

let overload_error = SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
};
let rpc_error = SuiError::RpcError("RPC".into(), "Error".into());

// Have 2f + 1 validators return the overload error and we should get the `SystemOverload` error.
set_retryable_tx_info_response_error(&mut clients, &authority_keys);
set_tx_info_response_with_error(&mut clients, authority_keys.iter().skip(1), overload_error);
for (index, (name, _)) in authority_keys.iter().skip(1).enumerate() {
clients.get_mut(name).unwrap().set_tx_info_response_error(
SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: index as u64,
},
);
}

let agg = get_genesis_agg(authorities.clone(), clients.clone());
assert_resp_err(
Expand All @@ -1974,7 +1977,10 @@ async fn test_handle_overload_retry_response() {
|e| {
matches!(
e,
AggregatorProcessTransactionError::SystemOverloadRetryAfter { .. }
AggregatorProcessTransactionError::SystemOverloadRetryAfter {
retry_after_secs,
..
} if *retry_after_secs == (authority_keys.len() as u64 - 2)
)
},
|e| {
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,13 @@ impl SuiError {
pub fn is_retryable_overload(&self) -> bool {
matches!(self, SuiError::ValidatorOverloadedRetryAfter { .. })
}

pub fn retry_after_secs(&self) -> u64 {
match self {
SuiError::ValidatorOverloadedRetryAfter { retry_after_secs } => *retry_after_secs,
_ => 0,
}
}
}

impl Ord for SuiError {
Expand Down

0 comments on commit 30bae9d

Please sign in to comment.