From 93fdf039c11b062074319f210b1eec1ae41b245f Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Sun, 12 May 2024 07:39:36 -0700 Subject: [PATCH] Quorum driver take server requested retry interval into consideration (#17520) ## Description ValidatorOverloadedRetryAfter error contains a server suggested retry after duration. So when the quorum driver retries under SystemOverloadRetryAfter error, it should take the suggested retry duration into consideration. ## Test plan Unit tests added. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: --- crates/sui-core/src/authority_aggregator.rs | 61 +++++++++++-- crates/sui-core/src/quorum_driver/mod.rs | 23 +++-- crates/sui-core/src/quorum_driver/tests.rs | 22 ++++- crates/sui-core/src/test_authority_clients.rs | 6 +- .../unit_tests/authority_aggregator_tests.rs | 89 ++++++++++++++++--- crates/sui-types/src/error.rs | 7 ++ 6 files changed, 177 insertions(+), 31 deletions(-) diff --git a/crates/sui-core/src/authority_aggregator.rs b/crates/sui-core/src/authority_aggregator.rs index e27ed8486598b..1423c58cc9ded 100644 --- a/crates/sui-core/src/authority_aggregator.rs +++ b/crates/sui-core/src/authority_aggregator.rs @@ -295,6 +295,45 @@ pub fn group_errors(errors: Vec<(SuiError, Vec, StakeUnit)>) -> G .collect() } +#[derive(Debug, Default)] +pub struct RetryableOverloadInfo { + // Total stake of validators that are overloaded and request client to retry. + pub total_stake: StakeUnit, + + // Records requested retry duration by stakes. + pub stake_requested_retry_after: BTreeMap, +} + +impl RetryableOverloadInfo { + pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) { + self.total_stake += stake; + self.stake_requested_retry_after + .entry(retry_after) + .and_modify(|s| *s += stake) + .or_insert(stake); + } + + // Gets the duration of retry requested by a quorum of validators with smallest retry durations. + pub fn get_quorum_retry_after( + &self, + good_stake: StakeUnit, + quorum_threshold: StakeUnit, + ) -> Duration { + if self.stake_requested_retry_after.is_empty() { + return Duration::from_secs(0); + } + + let mut quorum_stake = good_stake; + for (retry_after, stake) in self.stake_requested_retry_after.iter() { + quorum_stake += *stake; + if quorum_stake >= quorum_threshold { + return *retry_after; + } + } + *self.stake_requested_retry_after.last_key_value().unwrap().0 + } +} + #[derive(Debug)] struct ProcessTransactionState { // The list of signatures gathered at any point @@ -309,7 +348,7 @@ struct ProcessTransactionState { // Validators that are overloaded with txns pending execution. overloaded_stake: StakeUnit, // Validators that are overloaded and request client to retry. - retryable_overloaded_stake: StakeUnit, + retryable_overload_info: RetryableOverloadInfo, // If there are conflicting transactions, we note them down and may attempt to retry conflicting_tx_digests: BTreeMap, StakeUnit)>, @@ -1046,7 +1085,7 @@ where object_or_package_not_found_stake: 0, non_retryable_stake: 0, overloaded_stake: 0, - retryable_overloaded_stake: 0, + retryable_overload_info: Default::default(), retryable: true, conflicting_tx_digests: Default::default(), conflicting_tx_total_stake: 0, @@ -1119,7 +1158,7 @@ where // // TODO: currently retryable overload and above overload error look redundant. We want to have a unified // code path to handle both overload scenarios. - state.retryable_overloaded_stake += weight; + state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs())); } else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) { // We don't count conflicting transactions as non-retryable errors here @@ -1262,15 +1301,19 @@ where // When state is in a retryable state and process transaction was not successful, it indicates that // we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn - // to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects) - // are locked in validators. If not, retry regular RetryableTransaction error. - if state.tx_signatures.total_votes() + state.retryable_overloaded_stake >= quorum_threshold + // to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects + // are locked in validators). If not, retry regular RetryableTransaction error. + if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake + >= quorum_threshold { - // TODO: make use of retry_after_secs, which is currently not used. + let retry_after_secs = state + .retryable_overload_info + .get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold) + .as_secs(); return AggregatorProcessTransactionError::SystemOverloadRetryAfter { - overload_stake: state.retryable_overloaded_stake, + overload_stake: state.retryable_overload_info.total_stake, errors: group_errors(state.errors), - retry_after_secs: 0, + retry_after_secs, }; } diff --git a/crates/sui-core/src/quorum_driver/mod.rs b/crates/sui-core/src/quorum_driver/mod.rs index 449247c6b79ec..bd641f7f9295e 100644 --- a/crates/sui-core/src/quorum_driver/mod.rs +++ b/crates/sui-core/src/quorum_driver/mod.rs @@ -38,6 +38,7 @@ use mysten_metrics::{ spawn_monitored_task, GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX, }; use std::fmt::Write; +use sui_macros::fail_point; use sui_types::error::{SuiError, SuiResult}; use sui_types::messages_safe_client::PlainTransactionInfoResponse; use sui_types::transaction::{CertifiedTransaction, Transaction}; @@ -157,22 +158,27 @@ impl QuorumDriver { ); return Ok(()); } - self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr) + self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None) .await } /// Performs exponential backoff and enqueue the `transaction` to the execution queue. + /// When `min_backoff_duration` is provided, the backoff duration will be at least `min_backoff_duration`. async fn backoff_and_enqueue( &self, request: ExecuteTransactionRequestV3, tx_cert: Option, old_retry_times: u32, client_addr: Option, + min_backoff_duration: Option, ) -> SuiResult<()> { - let next_retry_after = - Instant::now() + Duration::from_millis(200 * u64::pow(2, old_retry_times)); + let next_retry_after = Instant::now() + + Duration::from_millis(200 * u64::pow(2, old_retry_times)) + .max(min_backoff_duration.unwrap_or(Duration::from_secs(0))); sleep_until(next_retry_after).await; + fail_point!("count_retry_times"); + let tx_cert = match tx_cert { // TxCert is only valid when its epoch matches current epoch. // Note, it's impossible that TxCert's epoch is larger than current epoch @@ -373,7 +379,11 @@ where retry_after_secs, }) => { self.metrics.total_retryable_overload_errors.inc(); - debug!(?tx_digest, ?errors, "System overload and retry"); + debug!( + ?tx_digest, + ?errors, + "System overload and retry after secs {retry_after_secs}", + ); Err(Some(QuorumDriverError::SystemOverloadRetryAfter { overload_stake, errors, @@ -875,7 +885,9 @@ where client_addr, )); } - Some(QuorumDriverError::SystemOverloadRetryAfter { .. }) => { + Some(QuorumDriverError::SystemOverloadRetryAfter { + retry_after_secs, .. + }) => { // Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already // locked inside validators, we need to perform continuous retry and ignore `max_retry_times`. // TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number @@ -887,6 +899,7 @@ where tx_cert, old_retry_times, client_addr, + Some(Duration::from_secs(retry_after_secs)), )); } Some(qd_error) => { diff --git a/crates/sui-core/src/quorum_driver/tests.rs b/crates/sui-core/src/quorum_driver/tests.rs index 3dce375ce0dd3..049fc9d0a8c9e 100644 --- a/crates/sui-core/src/quorum_driver/tests.rs +++ b/crates/sui-core/src/quorum_driver/tests.rs @@ -9,8 +9,10 @@ use crate::test_utils::make_transfer_sui_transaction; use crate::{quorum_driver::QuorumDriverMetrics, test_utils::init_local_authorities}; use mysten_common::sync::notify_read::{NotifyRead, Registration}; use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; +use sui_macros::{register_fail_point, sim_test}; use sui_types::base_types::SuiAddress; use sui_types::base_types::TransactionDigest; use sui_types::crypto::{deterministic_random_account_key, get_key_pair, AccountKeyPair}; @@ -535,7 +537,7 @@ async fn test_quorum_driver_object_locked() -> Result<(), anyhow::Error> { } // Tests that quorum driver can continuously retry txn with SystemOverloadedRetryAfter error. -#[tokio::test(flavor = "current_thread", start_paused = true)] +#[sim_test] async fn test_quorum_driver_handling_overload_and_retry() { telemetry_subscribers::init_for_testing(); @@ -547,7 +549,7 @@ async fn test_quorum_driver_handling_overload_and_retry() { // Make local authority client to always return SystemOverloadedRetryAfter error. let fault_config = LocalAuthorityClientFaultConfig { - overload_retry_after_handle_transaction: true, + overload_retry_after_handle_transaction: Some(Duration::from_secs(30)), ..Default::default() }; let mut clients = aggregator.clone_inner_clients_test_only(); @@ -570,6 +572,14 @@ async fn test_quorum_driver_handling_overload_and_retry() { .unwrap(); let tx = make_tx(gas_object, sender, &keypair, rgp); + // Use a fail point to count the number of retries to test that the quorum backoff logic + // respects above `overload_retry_after_handle_transaction`. + let retry_count: Arc = Arc::new(AtomicUsize::new(0)); + let retry_count_clone = retry_count.clone(); + register_fail_point("count_retry_times", move || { + retry_count_clone.fetch_add(1, Ordering::SeqCst); + }); + // Create a quorum driver with max_retry_times = 0. let arc_aggregator = Arc::new(aggregator.clone()); let quorum_driver_handler = Arc::new( @@ -582,13 +592,17 @@ async fn test_quorum_driver_handling_overload_and_retry() { .start(), ); - // Submit the transaction, and check that it shouldn't return. + // Submit the transaction, and check that it shouldn't return, and the number of retries is within + // 300s timeout / 30s retry after duration = 10 times. let ticket = quorum_driver_handler .submit_transaction(ExecuteTransactionRequestV3::new_v2(tx)) .await .unwrap(); match timeout(Duration::from_secs(300), ticket).await { Ok(result) => panic!("Process transaction should timeout! {:?}", result), - Err(_) => eprintln!("Waiting for txn timed out! This is desired behavior."), + Err(_) => { + assert!(retry_count.load(Ordering::SeqCst) <= 10); + println!("Waiting for txn timed out! This is desired behavior.") + } } } diff --git a/crates/sui-core/src/test_authority_clients.rs b/crates/sui-core/src/test_authority_clients.rs index bf08091c5a8a7..5e3a6b50821d6 100644 --- a/crates/sui-core/src/test_authority_clients.rs +++ b/crates/sui-core/src/test_authority_clients.rs @@ -39,7 +39,7 @@ pub struct LocalAuthorityClientFaultConfig { pub fail_after_handle_transaction: bool, pub fail_before_handle_confirmation: bool, pub fail_after_handle_confirmation: bool, - pub overload_retry_after_handle_transaction: bool, + pub overload_retry_after_handle_transaction: Option, } impl LocalAuthorityClientFaultConfig { @@ -76,9 +76,9 @@ impl AuthorityAPI for LocalAuthorityClient { error: "Mock error after handle_transaction".to_owned(), }); } - if self.fault_config.overload_retry_after_handle_transaction { + if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction { return Err(SuiError::ValidatorOverloadedRetryAfter { - retry_after_secs: 0, + retry_after_secs: duration.as_secs(), }); } result diff --git a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs index bc2e7291e4e43..580cc275257dd 100644 --- a/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_aggregator_tests.rs @@ -1958,15 +1958,48 @@ async fn test_handle_overload_retry_response() { 666, // this is a dummy value which does not matter ); - let overload_error = SuiError::ValidatorOverloadedRetryAfter { - retry_after_secs: 0, - }; let rpc_error = SuiError::RpcError("RPC".into(), "Error".into()); - // Have 2f + 1 validators return the overload error and we should get the `SystemOverload` error. - set_retryable_tx_info_response_error(&mut clients, &authority_keys); - set_tx_info_response_with_error(&mut clients, authority_keys.iter().skip(1), overload_error); + // Have all validators return the overload error and we should get the `SystemOverload` error. + // Uses different retry_after_secs for each validator. + for (index, (name, _)) in authority_keys.iter().enumerate() { + clients.get_mut(name).unwrap().set_tx_info_response_error( + SuiError::ValidatorOverloadedRetryAfter { + retry_after_secs: index as u64, + }, + ); + } + let agg = get_genesis_agg(authorities.clone(), clients.clone()); + // We should get the `SystemOverloadRetryAfter` error with the retry_after_secs corresponding to the quorum + // threshold of validators. + assert_resp_err( + &agg, + txn.clone(), + |e| { + matches!( + e, + AggregatorProcessTransactionError::SystemOverloadRetryAfter { + retry_after_secs, + .. + } if *retry_after_secs == (authority_keys.len() as u64 - 2) + ) + }, + |e| { + matches!( + e, + SuiError::ValidatorOverloadedRetryAfter { .. } | SuiError::RpcError(..) + ) + }, + ) + .await; + // Have 2f + 1 validators return the overload error (by setting one authority returning RPC error) and we + // should still get the `SystemOverload` error. The retry_after_secs corresponding to the quorum threshold + // now is the max of the retry_after_secs of the validators. + clients + .get_mut(&authority_keys[0].0) + .unwrap() + .set_tx_info_response_error(rpc_error.clone()); let agg = get_genesis_agg(authorities.clone(), clients.clone()); assert_resp_err( &agg, @@ -1974,7 +2007,10 @@ async fn test_handle_overload_retry_response() { |e| { matches!( e, - AggregatorProcessTransactionError::SystemOverloadRetryAfter { .. } + AggregatorProcessTransactionError::SystemOverloadRetryAfter { + retry_after_secs, + .. + } if *retry_after_secs == (authority_keys.len() as u64 - 1) ) }, |e| { @@ -1986,15 +2022,13 @@ async fn test_handle_overload_retry_response() { ) .await; - // Change one of the valdiators' errors to RPC error so the system is considered not overloaded now and a `RetryableTransaction` + // Change another valdiators' errors to RPC error so the system is considered not overloaded now and a `RetryableTransaction` // should be returned. clients .get_mut(&authority_keys[1].0) .unwrap() .set_tx_info_response_error(rpc_error); - let agg = get_genesis_agg(authorities.clone(), clients.clone()); - assert_resp_err( &agg, txn.clone(), @@ -2496,3 +2530,38 @@ fn set_tx_info_response_with_error<'a>( .set_tx_info_response_error(error.clone()); } } + +#[test] +fn test_retryable_overload_info() { + let mut retryable_overload_info = RetryableOverloadInfo::default(); + assert_eq!( + retryable_overload_info.get_quorum_retry_after(3000, 7000), + Duration::from_secs(0) + ); + + for _ in 0..4 { + retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(1)); + } + assert_eq!( + retryable_overload_info.get_quorum_retry_after(3000, 7000), + Duration::from_secs(1) + ); + + retryable_overload_info = RetryableOverloadInfo::default(); + retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(1)); + retryable_overload_info.add_stake_retryable_overload(3000, Duration::from_secs(10)); + retryable_overload_info.add_stake_retryable_overload(2000, Duration::from_secs(1)); + assert_eq!( + retryable_overload_info.get_quorum_retry_after(4000, 7000), + Duration::from_secs(1) + ); + + retryable_overload_info = RetryableOverloadInfo::default(); + for i in 0..10 { + retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(i)); + } + assert_eq!( + retryable_overload_info.get_quorum_retry_after(0, 7000), + Duration::from_secs(6) + ); +} diff --git a/crates/sui-types/src/error.rs b/crates/sui-types/src/error.rs index 5fe67e0b8c2e6..17087a07d0fd2 100644 --- a/crates/sui-types/src/error.rs +++ b/crates/sui-types/src/error.rs @@ -826,6 +826,13 @@ impl SuiError { pub fn is_retryable_overload(&self) -> bool { matches!(self, SuiError::ValidatorOverloadedRetryAfter { .. }) } + + pub fn retry_after_secs(&self) -> u64 { + match self { + SuiError::ValidatorOverloadedRetryAfter { retry_after_secs } => *retry_after_secs, + _ => 0, + } + } } impl Ord for SuiError {