Skip to content

Commit

Permalink
Quorum driver take server requested retry interval into consideration
Browse files Browse the repository at this point in the history
  • Loading branch information
halfprice committed May 6, 2024
1 parent c1dfdcd commit 24fbd0a
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 24 deletions.
27 changes: 20 additions & 7 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> G
.collect()
}

#[derive(Debug, Default)]
struct RetryableOverloadInfo {
// Total stake of validators that are overloaded and request client to retry.
stake: StakeUnit,

// The maximum retry_after_secs requested by overloaded validators.
requested_retry_after: Duration,
}

#[derive(Debug)]
struct ProcessTransactionState {
// The list of signatures gathered at any point
Expand All @@ -309,7 +318,7 @@ struct ProcessTransactionState {
// Validators that are overloaded with txns pending execution.
overloaded_stake: StakeUnit,
// Validators that are overloaded and request client to retry.
retryable_overloaded_stake: StakeUnit,
retryable_overload_info: RetryableOverloadInfo,
// If there are conflicting transactions, we note them down and may attempt to retry
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
Expand Down Expand Up @@ -1046,7 +1055,7 @@ where
object_or_package_not_found_stake: 0,
non_retryable_stake: 0,
overloaded_stake: 0,
retryable_overloaded_stake: 0,
retryable_overload_info: Default::default(),
retryable: true,
conflicting_tx_digests: Default::default(),
conflicting_tx_total_stake: 0,
Expand Down Expand Up @@ -1119,7 +1128,8 @@ where
//
// TODO: currently retryable overload and above overload error look redundant. We want to have a unified
// code path to handle both overload scenarios.
state.retryable_overloaded_stake += weight;
state.retryable_overload_info.stake += weight;
state.retryable_overload_info.requested_retry_after = state.retryable_overload_info.requested_retry_after.max(Duration::from_secs(err.retry_after_secs()));
}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
Expand Down Expand Up @@ -1264,13 +1274,16 @@ where
// we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects)
// are locked in validators. If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overloaded_stake >= quorum_threshold
if state.tx_signatures.total_votes() + state.retryable_overload_info.stake
>= quorum_threshold
{
// TODO: make use of retry_after_secs, which is currently not used.
return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake: state.retryable_overloaded_stake,
overload_stake: state.retryable_overload_info.stake,
errors: group_errors(state.errors),
retry_after_secs: 0,
retry_after_secs: state
.retryable_overload_info
.requested_retry_after
.as_secs(),
};
}

Expand Down
22 changes: 17 additions & 5 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mysten_metrics::{
spawn_monitored_task, GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX,
};
use std::fmt::Write;
use sui_macros::fail_point;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages_safe_client::PlainTransactionInfoResponse;
use sui_types::transaction::{CertifiedTransaction, Transaction};
Expand Down Expand Up @@ -157,21 +158,25 @@ impl<A: Clone> QuorumDriver<A> {
);
return Ok(());
}
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr)
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
.await
}

/// Performs exponential backoff and enqueue the `transaction` to the execution queue.
/// When `min_backoff_duration` is provided, the backoff duration will be at least `min_backoff_duration`.
async fn backoff_and_enqueue(
&self,
request: ExecuteTransactionRequestV3,
tx_cert: Option<CertifiedTransaction>,
old_retry_times: u32,
client_addr: Option<SocketAddr>,
min_backoff_duration: Option<Duration>,
) -> SuiResult<()> {
let next_retry_after =
Instant::now() + Duration::from_millis(200 * u64::pow(2, old_retry_times));
let next_retry_after = Instant::now()
+ Duration::from_millis(200 * u64::pow(2, old_retry_times))
.max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
sleep_until(next_retry_after).await;
fail_point!("count_retry_times");

let tx_cert = match tx_cert {
// TxCert is only valid when its epoch matches current epoch.
Expand Down Expand Up @@ -373,7 +378,11 @@ where
retry_after_secs,
}) => {
self.metrics.total_retryable_overload_errors.inc();
debug!(?tx_digest, ?errors, "System overload and retry");
debug!(
?tx_digest,
?errors,
"System overload and retry after secs {retry_after_secs}",
);
Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
overload_stake,
errors,
Expand Down Expand Up @@ -875,7 +884,9 @@ where
client_addr,
));
}
Some(QuorumDriverError::SystemOverloadRetryAfter { .. }) => {
Some(QuorumDriverError::SystemOverloadRetryAfter {
retry_after_secs, ..
}) => {
// Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already
// locked inside validators, we need to perform continuous retry and ignore `max_retry_times`.
// TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number
Expand All @@ -887,6 +898,7 @@ where
tx_cert,
old_retry_times,
client_addr,
Some(Duration::from_secs(retry_after_secs)),
));
}
Some(qd_error) => {
Expand Down
22 changes: 18 additions & 4 deletions crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::test_utils::make_transfer_sui_transaction;
use crate::{quorum_driver::QuorumDriverMetrics, test_utils::init_local_authorities};
use mysten_common::sync::notify_read::{NotifyRead, Registration};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sui_macros::{register_fail_point, sim_test};
use sui_types::base_types::SuiAddress;
use sui_types::base_types::TransactionDigest;
use sui_types::crypto::{deterministic_random_account_key, get_key_pair, AccountKeyPair};
Expand Down Expand Up @@ -535,7 +537,7 @@ async fn test_quorum_driver_object_locked() -> Result<(), anyhow::Error> {
}

// Tests that quorum driver can continuously retry txn with SystemOverloadedRetryAfter error.
#[tokio::test(flavor = "current_thread", start_paused = true)]
#[sim_test]
async fn test_quorum_driver_handling_overload_and_retry() {
telemetry_subscribers::init_for_testing();

Expand All @@ -547,7 +549,7 @@ async fn test_quorum_driver_handling_overload_and_retry() {

// Make local authority client to always return SystemOverloadedRetryAfter error.
let fault_config = LocalAuthorityClientFaultConfig {
overload_retry_after_handle_transaction: true,
overload_retry_after_handle_transaction: Some(Duration::from_secs(30)),
..Default::default()
};
let mut clients = aggregator.clone_inner_clients_test_only();
Expand All @@ -570,6 +572,14 @@ async fn test_quorum_driver_handling_overload_and_retry() {
.unwrap();
let tx = make_tx(gas_object, sender, &keypair, rgp);

// Use a fail point to count the number of retries to test that the quorum backoff logic
// respects above `overload_retry_after_handle_transaction`.
let retry_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let retry_count_clone = retry_count.clone();
register_fail_point("count_retry_times", move || {
retry_count_clone.fetch_add(1, Ordering::SeqCst);
});

// Create a quorum driver with max_retry_times = 0.
let arc_aggregator = Arc::new(aggregator.clone());
let quorum_driver_handler = Arc::new(
Expand All @@ -582,13 +592,17 @@ async fn test_quorum_driver_handling_overload_and_retry() {
.start(),
);

// Submit the transaction, and check that it shouldn't return.
// Submit the transaction, and check that it shouldn't return, and the number of retries is within
// 300s timeout / 30s retry after duration = 10 times.
let ticket = quorum_driver_handler
.submit_transaction(ExecuteTransactionRequestV3::new_v2(tx))
.await
.unwrap();
match timeout(Duration::from_secs(300), ticket).await {
Ok(result) => panic!("Process transaction should timeout! {:?}", result),
Err(_) => eprintln!("Waiting for txn timed out! This is desired behavior."),
Err(_) => {
assert!(retry_count.load(Ordering::SeqCst) <= 10);
println!("Waiting for txn timed out! This is desired behavior.")
}
}
}
6 changes: 3 additions & 3 deletions crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct LocalAuthorityClientFaultConfig {
pub fail_after_handle_transaction: bool,
pub fail_before_handle_confirmation: bool,
pub fail_after_handle_confirmation: bool,
pub overload_retry_after_handle_transaction: bool,
pub overload_retry_after_handle_transaction: Option<Duration>,
}

impl LocalAuthorityClientFaultConfig {
Expand Down Expand Up @@ -76,9 +76,9 @@ impl AuthorityAPI for LocalAuthorityClient {
error: "Mock error after handle_transaction".to_owned(),
});
}
if self.fault_config.overload_retry_after_handle_transaction {
if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
return Err(SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
retry_after_secs: duration.as_secs(),
});
}
result
Expand Down
19 changes: 14 additions & 5 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1958,23 +1958,32 @@ async fn test_handle_overload_retry_response() {
666, // this is a dummy value which does not matter
);

let overload_error = SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
};
let rpc_error = SuiError::RpcError("RPC".into(), "Error".into());

// Have 2f + 1 validators return the overload error and we should get the `SystemOverload` error.
// Uses different retry_after_secs for each validator.
set_retryable_tx_info_response_error(&mut clients, &authority_keys);
set_tx_info_response_with_error(&mut clients, authority_keys.iter().skip(1), overload_error);
for (index, (name, _)) in authority_keys.iter().skip(1).enumerate() {
clients.get_mut(name).unwrap().set_tx_info_response_error(
SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: index as u64,
},
);
}

let agg = get_genesis_agg(authorities.clone(), clients.clone());

// We should get the `SystemOverloadRetryAfter` error with the highest retry_after_secs.
assert_resp_err(
&agg,
txn.clone(),
|e| {
matches!(
e,
AggregatorProcessTransactionError::SystemOverloadRetryAfter { .. }
AggregatorProcessTransactionError::SystemOverloadRetryAfter {
retry_after_secs,
..
} if *retry_after_secs == (authority_keys.len() as u64 - 2)
)
},
|e| {
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,13 @@ impl SuiError {
pub fn is_retryable_overload(&self) -> bool {
matches!(self, SuiError::ValidatorOverloadedRetryAfter { .. })
}

pub fn retry_after_secs(&self) -> u64 {
match self {
SuiError::ValidatorOverloadedRetryAfter { retry_after_secs } => *retry_after_secs,
_ => 0,
}
}
}

impl Ord for SuiError {
Expand Down

0 comments on commit 24fbd0a

Please sign in to comment.