Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quorum driver take server requested retry interval into consideration #17520

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions crates/sui-core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ pub fn group_errors(errors: Vec<(SuiError, Vec<AuthorityName>, StakeUnit)>) -> G
.collect()
}

#[derive(Debug, Default)]
struct RetryableOverloadInfo {
// Total stake of validators that are overloaded and request client to retry.
stake: StakeUnit,

// The maximum retry_after_secs requested by overloaded validators.
requested_retry_after: Duration,
}

#[derive(Debug)]
struct ProcessTransactionState {
// The list of signatures gathered at any point
Expand All @@ -309,7 +318,7 @@ struct ProcessTransactionState {
// Validators that are overloaded with txns pending execution.
overloaded_stake: StakeUnit,
// Validators that are overloaded and request client to retry.
retryable_overloaded_stake: StakeUnit,
retryable_overload_info: RetryableOverloadInfo,
// If there are conflicting transactions, we note them down and may attempt to retry
conflicting_tx_digests:
BTreeMap<TransactionDigest, (Vec<(AuthorityName, ObjectRef)>, StakeUnit)>,
Expand Down Expand Up @@ -1046,7 +1055,7 @@ where
object_or_package_not_found_stake: 0,
non_retryable_stake: 0,
overloaded_stake: 0,
retryable_overloaded_stake: 0,
retryable_overload_info: Default::default(),
retryable: true,
conflicting_tx_digests: Default::default(),
conflicting_tx_total_stake: 0,
Expand Down Expand Up @@ -1119,7 +1128,8 @@ where
//
// TODO: currently retryable overload and above overload error look redundant. We want to have a unified
// code path to handle both overload scenarios.
state.retryable_overloaded_stake += weight;
state.retryable_overload_info.stake += weight;
state.retryable_overload_info.requested_retry_after = state.retryable_overload_info.requested_retry_after.max(Duration::from_secs(err.retry_after_secs()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just thinking out loud, should we use 67 percentile of all suggested values instead of the highest one? cuz it could be MAX by a byzantine validator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point! For a second, I was thinking the malicious can go both ways: by sending a duration that is too big or too small. But I guess the too big case is more damaging since we already perform exponential backoff in the client.

I changed the code to make the retry_after corresponding to a good quorum threshold of validators with the smallest retry after duration.

}
else if !retryable && !state.record_conflicting_transaction_if_any(name, weight, &err) {
// We don't count conflicting transactions as non-retryable errors here
Expand Down Expand Up @@ -1262,15 +1272,18 @@ where

// When state is in a retryable state and process transaction was not successful, it indicates that
// we have heard from *all* validators. Check if any SystemOverloadRetryAfter error caused the txn
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects)
// are locked in validators. If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overloaded_stake >= quorum_threshold
// to fail. If so, return explicit SystemOverloadRetryAfter error for continuous retry (since objects
// are locked in validators). If not, retry regular RetryableTransaction error.
if state.tx_signatures.total_votes() + state.retryable_overload_info.stake
>= quorum_threshold
{
// TODO: make use of retry_after_secs, which is currently not used.
return AggregatorProcessTransactionError::SystemOverloadRetryAfter {
overload_stake: state.retryable_overloaded_stake,
overload_stake: state.retryable_overload_info.stake,
errors: group_errors(state.errors),
retry_after_secs: 0,
retry_after_secs: state
.retryable_overload_info
.requested_retry_after
.as_secs(),
};
}

Expand Down
23 changes: 18 additions & 5 deletions crates/sui-core/src/quorum_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use mysten_metrics::{
spawn_monitored_task, GaugeGuard, TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX,
};
use std::fmt::Write;
use sui_macros::fail_point;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages_safe_client::PlainTransactionInfoResponse;
use sui_types::transaction::{CertifiedTransaction, Transaction};
Expand Down Expand Up @@ -157,22 +158,27 @@ impl<A: Clone> QuorumDriver<A> {
);
return Ok(());
}
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr)
self.backoff_and_enqueue(request, tx_cert, old_retry_times, client_addr, None)
.await
}

/// Performs exponential backoff and enqueue the `transaction` to the execution queue.
/// When `min_backoff_duration` is provided, the backoff duration will be at least `min_backoff_duration`.
async fn backoff_and_enqueue(
&self,
request: ExecuteTransactionRequestV3,
tx_cert: Option<CertifiedTransaction>,
old_retry_times: u32,
client_addr: Option<SocketAddr>,
min_backoff_duration: Option<Duration>,
) -> SuiResult<()> {
let next_retry_after =
Instant::now() + Duration::from_millis(200 * u64::pow(2, old_retry_times));
let next_retry_after = Instant::now()
+ Duration::from_millis(200 * u64::pow(2, old_retry_times))
.max(min_backoff_duration.unwrap_or(Duration::from_secs(0)));
sleep_until(next_retry_after).await;

fail_point!("count_retry_times");

let tx_cert = match tx_cert {
// TxCert is only valid when its epoch matches current epoch.
// Note, it's impossible that TxCert's epoch is larger than current epoch
Expand Down Expand Up @@ -373,7 +379,11 @@ where
retry_after_secs,
}) => {
self.metrics.total_retryable_overload_errors.inc();
debug!(?tx_digest, ?errors, "System overload and retry");
debug!(
?tx_digest,
?errors,
"System overload and retry after secs {retry_after_secs}",
);
Err(Some(QuorumDriverError::SystemOverloadRetryAfter {
overload_stake,
errors,
Expand Down Expand Up @@ -875,7 +885,9 @@ where
client_addr,
));
}
Some(QuorumDriverError::SystemOverloadRetryAfter { .. }) => {
Some(QuorumDriverError::SystemOverloadRetryAfter {
retry_after_secs, ..
}) => {
// Special case for SystemOverloadRetryAfter error. In this case, due to that objects are already
// locked inside validators, we need to perform continuous retry and ignore `max_retry_times`.
// TODO: the txn can potentially be retried unlimited times, therefore, we need to bound the number
Expand All @@ -887,6 +899,7 @@ where
tx_cert,
old_retry_times,
client_addr,
Some(Duration::from_secs(retry_after_secs)),
));
}
Some(qd_error) => {
Expand Down
22 changes: 18 additions & 4 deletions crates/sui-core/src/quorum_driver/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::test_utils::make_transfer_sui_transaction;
use crate::{quorum_driver::QuorumDriverMetrics, test_utils::init_local_authorities};
use mysten_common::sync::notify_read::{NotifyRead, Registration};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sui_macros::{register_fail_point, sim_test};
use sui_types::base_types::SuiAddress;
use sui_types::base_types::TransactionDigest;
use sui_types::crypto::{deterministic_random_account_key, get_key_pair, AccountKeyPair};
Expand Down Expand Up @@ -535,7 +537,7 @@ async fn test_quorum_driver_object_locked() -> Result<(), anyhow::Error> {
}

// Tests that quorum driver can continuously retry txn with SystemOverloadedRetryAfter error.
#[tokio::test(flavor = "current_thread", start_paused = true)]
#[sim_test]
async fn test_quorum_driver_handling_overload_and_retry() {
telemetry_subscribers::init_for_testing();

Expand All @@ -547,7 +549,7 @@ async fn test_quorum_driver_handling_overload_and_retry() {

// Make local authority client to always return SystemOverloadedRetryAfter error.
let fault_config = LocalAuthorityClientFaultConfig {
overload_retry_after_handle_transaction: true,
overload_retry_after_handle_transaction: Some(Duration::from_secs(30)),
..Default::default()
};
let mut clients = aggregator.clone_inner_clients_test_only();
Expand All @@ -570,6 +572,14 @@ async fn test_quorum_driver_handling_overload_and_retry() {
.unwrap();
let tx = make_tx(gas_object, sender, &keypair, rgp);

// Use a fail point to count the number of retries to test that the quorum backoff logic
// respects above `overload_retry_after_handle_transaction`.
let retry_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let retry_count_clone = retry_count.clone();
register_fail_point("count_retry_times", move || {
retry_count_clone.fetch_add(1, Ordering::SeqCst);
});

// Create a quorum driver with max_retry_times = 0.
let arc_aggregator = Arc::new(aggregator.clone());
let quorum_driver_handler = Arc::new(
Expand All @@ -582,13 +592,17 @@ async fn test_quorum_driver_handling_overload_and_retry() {
.start(),
);

// Submit the transaction, and check that it shouldn't return.
// Submit the transaction, and check that it shouldn't return, and the number of retries is within
// 300s timeout / 30s retry after duration = 10 times.
let ticket = quorum_driver_handler
.submit_transaction(ExecuteTransactionRequestV3::new_v2(tx))
.await
.unwrap();
match timeout(Duration::from_secs(300), ticket).await {
Ok(result) => panic!("Process transaction should timeout! {:?}", result),
Err(_) => eprintln!("Waiting for txn timed out! This is desired behavior."),
Err(_) => {
assert!(retry_count.load(Ordering::SeqCst) <= 10);
println!("Waiting for txn timed out! This is desired behavior.")
}
}
}
6 changes: 3 additions & 3 deletions crates/sui-core/src/test_authority_clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct LocalAuthorityClientFaultConfig {
pub fail_after_handle_transaction: bool,
pub fail_before_handle_confirmation: bool,
pub fail_after_handle_confirmation: bool,
pub overload_retry_after_handle_transaction: bool,
pub overload_retry_after_handle_transaction: Option<Duration>,
}

impl LocalAuthorityClientFaultConfig {
Expand Down Expand Up @@ -76,9 +76,9 @@ impl AuthorityAPI for LocalAuthorityClient {
error: "Mock error after handle_transaction".to_owned(),
});
}
if self.fault_config.overload_retry_after_handle_transaction {
if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
return Err(SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
retry_after_secs: duration.as_secs(),
});
}
result
Expand Down
19 changes: 14 additions & 5 deletions crates/sui-core/src/unit_tests/authority_aggregator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1958,23 +1958,32 @@ async fn test_handle_overload_retry_response() {
666, // this is a dummy value which does not matter
);

let overload_error = SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: 0,
};
let rpc_error = SuiError::RpcError("RPC".into(), "Error".into());

// Have 2f + 1 validators return the overload error and we should get the `SystemOverload` error.
// Uses different retry_after_secs for each validator.
set_retryable_tx_info_response_error(&mut clients, &authority_keys);
set_tx_info_response_with_error(&mut clients, authority_keys.iter().skip(1), overload_error);
for (index, (name, _)) in authority_keys.iter().skip(1).enumerate() {
clients.get_mut(name).unwrap().set_tx_info_response_error(
SuiError::ValidatorOverloadedRetryAfter {
retry_after_secs: index as u64,
},
);
}

let agg = get_genesis_agg(authorities.clone(), clients.clone());

// We should get the `SystemOverloadRetryAfter` error with the highest retry_after_secs.
assert_resp_err(
&agg,
txn.clone(),
|e| {
matches!(
e,
AggregatorProcessTransactionError::SystemOverloadRetryAfter { .. }
AggregatorProcessTransactionError::SystemOverloadRetryAfter {
retry_after_secs,
..
} if *retry_after_secs == (authority_keys.len() as u64 - 2)
)
},
|e| {
Expand Down
7 changes: 7 additions & 0 deletions crates/sui-types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,13 @@ impl SuiError {
pub fn is_retryable_overload(&self) -> bool {
matches!(self, SuiError::ValidatorOverloadedRetryAfter { .. })
}

pub fn retry_after_secs(&self) -> u64 {
match self {
SuiError::ValidatorOverloadedRetryAfter { retry_after_secs } => *retry_after_secs,
_ => 0,
}
}
}

impl Ord for SuiError {
Expand Down
Loading