Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quorum driver take server requested retry interval into consideration #17520

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 52 additions & 9 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,45 @@ pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> G
.collect()
}

#[derive(Debug, Default)]
pub struct RetryableOverloadInfo {
// Total stake of validators that are overloaded and request client to retry.
pub total_stake: StakeUnit,

// Records requested retry duration by stakes.
pub stake_requested_retry_after: BTreeMap<Duration, StakeUnit>,
}

impl RetryableOverloadInfo {
pub fn add_stake_retryable_overload(&mut self, stake: StakeUnit, retry_after: Duration) {
self.total_stake += stake;
self.stake_requested_retry_after
.entry(retry_after)
.and_modify(|s| *s += stake)
.or_insert(stake);
}

// Gets the duration of retry requested by a quorum of validators with smallest retry durations.
pub fn get_quorum_retry_after(
&self,
good_stake: StakeUnit,
quorum_threshold: StakeUnit,
) -> Duration {
if self.stake_requested_retry_after.is_empty() {
return Duration::from_secs(0);
}

let mut quorum_stake = good_stake;
for (retry_after, stake) in self.stake_requested_retry_after.iter() {
quorum_stake += *stake;
if quorum_stake >= quorum_threshold {
return *retry_after;
}
}
*self.stake_requested_retry_after.last_key_value().unwrap().0
}
}

#[derive(Debug)]
struct ProcessTransactionState {
// The list of signatures gathered at any point
Expand All @@ -309,7 +348,7 @@ struct ProcessTransactionState {
// Validators that are overloaded with txns pending execution.
overloaded_stake: StakeUnit,
// Validators that are overloaded and request client to retry.
retryable_overloaded_stake: StakeUnit,
retryable_overload_info: RetryableOverloadInfo,
// If there are conflicting transactions, we note them down and may attempt to retry
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
Expand Down Expand Up @@ -1046,7 +1085,7 @@ where
object_or_package_not_found_stake: 0,
non_retryable_stake: 0,
overloaded_stake: 0,
retryable_overloaded_stake: 0,
retryable_overload_info: Default::default(),
retryable: true,
conflicting_tx_digests: Default::default(),
conflicting_tx_total_stake: 0,
Expand Down Expand Up @@ -1119,7 +1158,7 @@ where
//
// TODO: currently retryable overload and above overload error look redundant. We want to have a unified
// code path to handle both overload scenarios.
state.retryable_overloaded_stake += weight;
state.retryable_overload_info.add_stake_retryable_overload(weight, Duration::from_secs(err.retry_after_secs()));
}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
Expand Down Expand Up @@ -1262,15 +1301,19 @@ where

// When state is in a retryable state and process transaction was not successful, it indicates that
// we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects)
// are locked in validators. If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overloaded_stake >= quorum_threshold
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects
// are locked in validators). If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overload_info.total_stake
>= quorum_threshold
{
// TODO: make use of retry_after_secs, which is currently not used.
let retry_after_secs = state
.retryable_overload_info
.get_quorum_retry_after(state.tx_signatures.total_votes(), quorum_threshold)
.as_secs();
return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake: state.retryable_overloaded_stake,
overload_stake: state.retryable_overload_info.total_stake,
errors: group_errors(state.errors),
retry_after_secs: 0,
retry_after_secs,
};
}

Expand Down
23 changes: 18 additions & 5 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mysten_metrics::{
spawn_monitored_task, GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX,
};
use std::fmt::Write;
use sui_macros::fail_point;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages_safe_client::PlainTransactionInfoResponse;
use sui_types::transaction::{CertifiedTransaction, Transaction};
Expand Down Expand Up @@ -157,22 +158,27 @@ impl<A: Clone> QuorumDriver<A> {
);
return Ok(());
}
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr)
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
.await
}

/// Performs exponential backoff and enqueue the `transaction` to the execution queue.
/// When `min_backoff_duration` is provided, the backoff duration will be at least `min_backoff_duration`.
async fn backoff_and_enqueue(
&self,
request: ExecuteTransactionRequestV3,
tx_cert: Option<CertifiedTransaction>,
old_retry_times: u32,
client_addr: Option<SocketAddr>,
min_backoff_duration: Option<Duration>,
) -> SuiResult<()> {
let next_retry_after =
Instant::now() + Duration::from_millis(200 * u64::pow(2, old_retry_times));
let next_retry_after = Instant::now()
+ Duration::from_millis(200 * u64::pow(2, old_retry_times))
.max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
sleep_until(next_retry_after).await;

fail_point!("count_retry_times");

let tx_cert = match tx_cert {
// TxCert is only valid when its epoch matches current epoch.
// Note, it's impossible that TxCert's epoch is larger than current epoch
Expand Down Expand Up @@ -373,7 +379,11 @@ where
retry_after_secs,
}) => {
self.metrics.total_retryable_overload_errors.inc();
debug!(?tx_digest, ?errors, "System overload and retry");
debug!(
?tx_digest,
?errors,
"System overload and retry after secs {retry_after_secs}",
);
Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
overload_stake,
errors,
Expand Down Expand Up @@ -875,7 +885,9 @@ where
client_addr,
));
}
Some(QuorumDriverError::SystemOverloadRetryAfter { .. }) => {
Some(QuorumDriverError::SystemOverloadRetryAfter {
retry_after_secs, ..
}) => {
// Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already
// locked inside validators, we need to perform continuous retry and ignore `max_retry_times`.
// TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number
Expand All @@ -887,6 +899,7 @@ where
tx_cert,
old_retry_times,
client_addr,
Some(Duration::from_secs(retry_after_secs)),
));
}
Some(qd_error) => {
Expand Down
22 changes: 18 additions & 4 deletions crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::test_utils::make_transfer_sui_transaction;
use crate::{quorum_driver::QuorumDriverMetrics, test_utils::init_local_authorities};
use mysten_common::sync::notify_read::{NotifyRead, Registration};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sui_macros::{register_fail_point, sim_test};
use sui_types::base_types::SuiAddress;
use sui_types::base_types::TransactionDigest;
use sui_types::crypto::{deterministic_random_account_key, get_key_pair, AccountKeyPair};
Expand Down Expand Up @@ -535,7 +537,7 @@ async fn test_quorum_driver_object_locked() -> Result<(), anyhow::Error> {
}

// Tests that quorum driver can continuously retry txn with SystemOverloadedRetryAfter error.
#[tokio::test(flavor = "current_thread", start_paused = true)]
#[sim_test]
async fn test_quorum_driver_handling_overload_and_retry() {
telemetry_subscribers::init_for_testing();

Expand All @@ -547,7 +549,7 @@ async fn test_quorum_driver_handling_overload_and_retry() {

// Make local authority client to always return SystemOverloadedRetryAfter error.
let fault_config = LocalAuthorityClientFaultConfig {
overload_retry_after_handle_transaction: true,
overload_retry_after_handle_transaction: Some(Duration::from_secs(30)),
..Default::default()
};
let mut clients = aggregator.clone_inner_clients_test_only();
Expand All @@ -570,6 +572,14 @@ async fn test_quorum_driver_handling_overload_and_retry() {
.unwrap();
let tx = make_tx(gas_object, sender, &keypair, rgp);

// Use a fail point to count the number of retries to test that the quorum backoff logic
// respects above `overload_retry_after_handle_transaction`.
let retry_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let retry_count_clone = retry_count.clone();
register_fail_point("count_retry_times", move || {
retry_count_clone.fetch_add(1, Ordering::SeqCst);
});

// Create a quorum driver with max_retry_times = 0.
let arc_aggregator = Arc::new(aggregator.clone());
let quorum_driver_handler = Arc::new(
Expand All @@ -582,13 +592,17 @@ async fn test_quorum_driver_handling_overload_and_retry() {
.start(),
);

// Submit the transaction, and check that it shouldn't return.
// Submit the transaction, and check that it shouldn't return, and the number of retries is within
// 300s timeout / 30s retry after duration = 10 times.
let ticket = quorum_driver_handler
.submit_transaction(ExecuteTransactionRequestV3::new_v2(tx))
.await
.unwrap();
match timeout(Duration::from_secs(300), ticket).await {
Ok(result) => panic!("Process transaction should timeout! {:?}", result),
Err(_) => eprintln!("Waiting for txn timed out! This is desired behavior."),
Err(_) => {
assert!(retry_count.load(Ordering::SeqCst) <= 10);
println!("Waiting for txn timed out! This is desired behavior.")
}
}
}
6 changes: 3 additions & 3 deletions crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct LocalAuthorityClientFaultConfig {
pub fail_after_handle_transaction: bool,
pub fail_before_handle_confirmation: bool,
pub fail_after_handle_confirmation: bool,
pub overload_retry_after_handle_transaction: bool,
pub overload_retry_after_handle_transaction: Option<Duration>,
}

impl LocalAuthorityClientFaultConfig {
Expand Down Expand Up @@ -76,9 +76,9 @@ impl AuthorityAPI for LocalAuthorityClient {
error: "Mock error after handle_transaction".to_owned(),
});
}
if self.fault_config.overload_retry_after_handle_transaction {
if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
return Err(SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
retry_after_secs: duration.as_secs(),
});
}
result
Expand Down
89 changes: 79 additions & 10 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1958,23 +1958,59 @@ async fn test_handle_overload_retry_response() {
666, // this is a dummy value which does not matter
);

let overload_error = SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
};
let rpc_error = SuiError::RpcError("RPC".into(), "Error".into());

// Have 2f + 1 validators return the overload error and we should get the `SystemOverload` error.
set_retryable_tx_info_response_error(&mut clients, &authority_keys);
set_tx_info_response_with_error(&mut clients, authority_keys.iter().skip(1), overload_error);
// Have all validators return the overload error and we should get the `SystemOverload` error.
// Uses different retry_after_secs for each validator.
for (index, (name, _)) in authority_keys.iter().enumerate() {
clients.get_mut(name).unwrap().set_tx_info_response_error(
SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: index as u64,
},
);
}
let agg = get_genesis_agg(authorities.clone(), clients.clone());
// We should get the `SystemOverloadRetryAfter` error with the retry_after_secs corresponding to the quorum
// threshold of validators.
assert_resp_err(
&agg,
txn.clone(),
|e| {
matches!(
e,
AggregatorProcessTransactionError::SystemOverloadRetryAfter {
retry_after_secs,
..
} if *retry_after_secs == (authority_keys.len() as u64 - 2)
)
},
|e| {
matches!(
e,
SuiError::ValidatorOverloadedRetryAfter { .. } | SuiError::RpcError(..)
)
},
)
.await;

// Have 2f + 1 validators return the overload error (by setting one authority returning RPC error) and we
// should still get the `SystemOverload` error. The retry_after_secs corresponding to the quorum threshold
// now is the max of the retry_after_secs of the validators.
clients
.get_mut(&authority_keys[0].0)
.unwrap()
.set_tx_info_response_error(rpc_error.clone());
let agg = get_genesis_agg(authorities.clone(), clients.clone());
assert_resp_err(
&agg,
txn.clone(),
|e| {
matches!(
e,
AggregatorProcessTransactionError::SystemOverloadRetryAfter { .. }
AggregatorProcessTransactionError::SystemOverloadRetryAfter {
retry_after_secs,
..
} if *retry_after_secs == (authority_keys.len() as u64 - 1)
)
},
|e| {
Expand All @@ -1986,15 +2022,13 @@ async fn test_handle_overload_retry_response() {
)
.await;

// Change one of the valdiators' errors to RPC error so the system is considered not overloaded now and a `RetryableTransaction`
// Change another valdiators' errors to RPC error so the system is considered not overloaded now and a `RetryableTransaction`
// should be returned.
clients
.get_mut(&authority_keys[1].0)
.unwrap()
.set_tx_info_response_error(rpc_error);

let agg = get_genesis_agg(authorities.clone(), clients.clone());

assert_resp_err(
&agg,
txn.clone(),
Expand Down Expand Up @@ -2496,3 +2530,38 @@ fn set_tx_info_response_with_error<'a>(
.set_tx_info_response_error(error.clone());
}
}

#[test]
fn test_retryable_overload_info() {
let mut retryable_overload_info = RetryableOverloadInfo::default();
assert_eq!(
retryable_overload_info.get_quorum_retry_after(3000, 7000),
Duration::from_secs(0)
);

for _ in 0..4 {
retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(1));
}
assert_eq!(
retryable_overload_info.get_quorum_retry_after(3000, 7000),
Duration::from_secs(1)
);

retryable_overload_info = RetryableOverloadInfo::default();
retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(1));
retryable_overload_info.add_stake_retryable_overload(3000, Duration::from_secs(10));
retryable_overload_info.add_stake_retryable_overload(2000, Duration::from_secs(1));
assert_eq!(
retryable_overload_info.get_quorum_retry_after(4000, 7000),
Duration::from_secs(1)
);

retryable_overload_info = RetryableOverloadInfo::default();
for i in 0..10 {
retryable_overload_info.add_stake_retryable_overload(1000, Duration::from_secs(i));
}
assert_eq!(
retryable_overload_info.get_quorum_retry_after(0, 7000),
Duration::from_secs(6)
);
}
7 changes: 7 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,13 @@ impl SuiError {
pub fn is_retryable_overload(&self) -> bool {
matches!(self, SuiError::ValidatorOverloadedRetryAfter { .. })
}

pub fn retry_after_secs(&self) -> u64 {
match self {
SuiError::ValidatorOverloadedRetryAfter { retry_after_secs } => *retry_after_secs,
_ => 0,
}
}
}

impl Ord for SuiError {
Expand Down
Loading