From f614727e3f615cb17e5a46f449c5d585360bf616 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 22 Mar 2024 21:17:42 +0100 Subject: [PATCH] v1.18: client: Timeout resends during `send_and_confirm_in_parallel` (backport of #358) (#384) client: Timeout resends during `send_and_confirm_in_parallel` (#358) * client: Timeout resends during `send_and_confirm_in_parallel` * Clarify constant (cherry picked from commit 36c66f5111044187bbfcb952e642b08c151db278) Co-authored-by: Jon C --- ...nd_and_confirm_transactions_in_parallel.rs | 54 +++++++++++-------- 1 file changed, 32 insertions(+), 22 deletions(-) diff --git a/client/src/send_and_confirm_transactions_in_parallel.rs b/client/src/send_and_confirm_transactions_in_parallel.rs index 43196d05a8a519..976539c4a48b5d 100644 --- a/client/src/send_and_confirm_transactions_in_parallel.rs +++ b/client/src/send_and_confirm_transactions_in_parallel.rs @@ -31,7 +31,7 @@ use { tokio::{sync::RwLock, task::JoinHandle, time::Instant}, }; -const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(10); +const BLOCKHASH_REFRESH_RATE: Duration = Duration::from_secs(5); const TPU_RESEND_REFRESH_RATE: Duration = Duration::from_secs(2); const SEND_INTERVAL: Duration = Duration::from_millis(10); type QuicTpuClient = TpuClient; @@ -326,21 +326,20 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction ); } + if let Some(progress_bar) = progress_bar { + let progress = progress_from_context_and_block_height(context, max_valid_block_height); + progress.set_message_for_confirmed_transactions( + progress_bar, + "Checking transaction status...", + ); + } + // wait till all transactions are confirmed or we have surpassed max processing age for the last sent transaction while !unconfirmed_transaction_map.is_empty() && current_block_height.load(Ordering::Relaxed) <= max_valid_block_height { let block_height = current_block_height.load(Ordering::Relaxed); - if let Some(progress_bar) = progress_bar { - let progress = - progress_from_context_and_block_height(context, max_valid_block_height); - progress.set_message_for_confirmed_transactions( - progress_bar, - "Checking transaction status...", - ); - } - if let Some(tpu_client) = tpu_client { let instant = Instant::now(); // retry sending transaction only over TPU port @@ -349,10 +348,29 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction .iter() .filter(|x| block_height < x.last_valid_block_height) .map(|x| x.serialized_transaction.clone()) - .collect(); - let _ = tpu_client - .try_send_wire_transaction_batch(txs_to_resend_over_tpu) - .await; + .collect::>(); + let num_txs_to_resend = txs_to_resend_over_tpu.len(); + // This is a "reasonable" constant for how long it should + // take to fan the transactions out, taken from + // `solana_tpu_client::nonblocking::tpu_client::send_wire_transaction_futures` + const SEND_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); + let message = if tokio::time::timeout( + SEND_TIMEOUT_INTERVAL, + tpu_client.try_send_wire_transaction_batch(txs_to_resend_over_tpu), + ) + .await + .is_err() + { + format!("Timed out resending {num_txs_to_resend} transactions...") + } else { + format!("Resent {num_txs_to_resend} transactions...") + }; + + if let Some(progress_bar) = progress_bar { + let progress = + progress_from_context_and_block_height(context, max_valid_block_height); + progress.set_message_for_confirmed_transactions(progress_bar, &message); + } let elapsed = instant.elapsed(); if elapsed < TPU_RESEND_REFRESH_RATE { @@ -370,14 +388,6 @@ async fn confirm_transactions_till_block_height_and_resend_unexpired_transaction max_valid_block_height = max_valid_block_height_in_remaining_transaction; } } - - if let Some(progress_bar) = progress_bar { - let progress = progress_from_context_and_block_height(context, max_valid_block_height); - progress.set_message_for_confirmed_transactions( - progress_bar, - "Checking transaction status...", - ); - } } }