Skip to content

Commit

Permalink
remove retry logic from tpu-client. Send directly to upcoming leaders…
Browse files Browse the repository at this point in the history
… without retry.
  • Loading branch information
gregcusack committed Jun 12, 2024
1 parent 943f8ed commit d7a650d
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 113 deletions.
41 changes: 12 additions & 29 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,11 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.unwrap();
let mut transaction =
let transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = client
.send_and_confirm_transaction_with_retries(
&[funding_keypair],
&mut transaction,
10,
confs,
)
client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
Expand All @@ -119,7 +114,7 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(&sig, confs)
.poll_for_signature_confirmation(&transaction.signatures[0], confs)
.unwrap();
}
});
Expand Down Expand Up @@ -165,15 +160,15 @@ pub fn send_many_transactions(
.unwrap();
let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer);

let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
transfer_amount,
blockhash,
);

client
.send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 5, 0)
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
Expand Down Expand Up @@ -297,36 +292,24 @@ pub fn kill_entry_and_spend_and_verify_rest(
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
1,
blockhash,
);

let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = {
let sig = client.send_and_confirm_transaction_with_retries(
&[funding_keypair],
&mut transaction,
5,
confs,
);
match sig {
Err(e) => {
result = Err(e);
continue;
}

Ok(sig) => sig,
}
};
if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) {
result = Err(e);
continue;
}
info!("poll_all_nodes_for_signature()");
match poll_all_nodes_for_signature(
entry_point_info,
&cluster_nodes,
connection_cache,
&sig,
&transaction.signatures[0],
confs,
) {
Err(e) => {
Expand Down
18 changes: 7 additions & 11 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,15 +674,16 @@ impl LocalCluster {
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash);
let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash);
info!(
"executing transfer of {} from {} to {}",
lamports,
source_keypair.pubkey(),
*dest_pubkey
);

client
.send_and_confirm_transaction_with_retries(&[source_keypair], &mut tx, 10, 0)
.send_transaction_to_upcoming_leaders(&tx)
.expect("client transfer should succeed");
client
.rpc_client()
Expand Down Expand Up @@ -746,7 +747,7 @@ impl LocalCluster {
},
);
let message = Message::new(&instructions, Some(&from_account.pubkey()));
let mut transaction = Transaction::new(
let transaction = Transaction::new(
&[from_account.as_ref(), vote_account],
message,
client
Expand All @@ -756,7 +757,7 @@ impl LocalCluster {
.0,
);
client
.send_and_confirm_transaction_with_retries(&[from_account], &mut transaction, 10, 0)
.send_transaction_to_upcoming_leaders(&transaction)
.expect("fund vote");
client
.rpc_client()
Expand All @@ -776,7 +777,7 @@ impl LocalCluster {
amount,
);
let message = Message::new(&instructions, Some(&from_account.pubkey()));
let mut transaction = Transaction::new(
let transaction = Transaction::new(
&[from_account.as_ref(), &stake_account_keypair],
message,
client
Expand All @@ -787,12 +788,7 @@ impl LocalCluster {
);

client
.send_and_confirm_transaction_with_retries(
&[from_account.as_ref(), &stake_account_keypair],
&mut transaction,
5,
0,
)
.send_transaction_to_upcoming_leaders(&transaction)
.expect("delegate stake");
client
.rpc_client()
Expand Down
19 changes: 4 additions & 15 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn test_local_cluster_signature_subscribe() {
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();

let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
&cluster.funding_keypair,
&solana_sdk::pubkey::new_rand(),
10,
Expand All @@ -240,12 +240,7 @@ fn test_local_cluster_signature_subscribe() {
.unwrap();

tx_client
.send_and_confirm_transaction_with_retries(
&[&cluster.funding_keypair],
&mut transaction,
5,
0,
)
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();

let mut got_received_notification = false;
Expand Down Expand Up @@ -2674,7 +2669,6 @@ fn test_oc_bad_signatures() {

// 3) Start up a spy to listen for and push votes to leader TPU
let client = cluster.build_tpu_quic_client().unwrap();
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();
let voter_thread_sleep_ms: usize = 100;
let num_votes_simulated = Arc::new(AtomicUsize::new(0));
let gossip_voter = cluster_tests::start_gossip_voter(
Expand Down Expand Up @@ -2709,7 +2703,7 @@ fn test_oc_bad_signatures() {
let vote_slots: Vec<Slot> = vec![vote_slot];

let bad_authorized_signer_keypair = Keypair::new();
let mut vote_tx = vote_transaction::new_vote_transaction(
let vote_tx = vote_transaction::new_vote_transaction(
vote_slots,
vote_hash,
leader_vote_tx.message.recent_blockhash,
Expand All @@ -2720,12 +2714,7 @@ fn test_oc_bad_signatures() {
None,
);
client
.send_and_confirm_transaction_with_retries(
&[&cluster_funding_keypair],
&mut vote_tx,
5,
0,
)
.send_transaction_to_upcoming_leaders(&vote_tx)
.unwrap();

num_votes_simulated.fetch_add(1, Ordering::Relaxed);
Expand Down
76 changes: 18 additions & 58 deletions tpu-client/src/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub use crate::nonblocking::tpu_client::TpuSenderError;
use {
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::{
client_connection::ClientConnection,
Expand All @@ -12,7 +11,7 @@ use {
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
client::AsyncClient,
clock::{Slot, MAX_PROCESSING_AGE},
clock::Slot,
signature::Signature,
transaction::{Transaction, VersionedTransaction},
transport::Result as TransportResult,
Expand All @@ -21,7 +20,6 @@ use {
collections::VecDeque,
net::UdpSocket,
sync::{Arc, RwLock},
time::Instant,
},
};
#[cfg(feature = "spinner")]
Expand Down Expand Up @@ -107,67 +105,29 @@ where
}

/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
/// Attempt to send and confirm tx "attempts" times
/// Wait for signature confirmation before returning
/// Return the transaction signature
/// NOTE: send_wire_transaction() and try_send_transaction() above both fail in a specific case when used in LocalCluster
/// They both invoke the nonblocking TPUClient and both fail when calling "transfer_with_client()" multiple times
/// I do not full understand WHY the nonblocking TPUClient fails in this specific case. But the method defined below
/// does work although it has only been tested in LocalCluster integration tests
pub fn send_and_confirm_transaction_with_retries<T: Signers + ?Sized>(
pub fn send_transaction_to_upcoming_leaders(
&self,
keypairs: &T,
transaction: &mut Transaction,
attempts: usize,
pending_confirmations: usize,
) -> TransportResult<Signature> {
for attempt in 0..attempts {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;
// resend the same transaction until the transaction has no chance of succeeding
let wire_transaction =
bincode::serialize(&transaction).expect("should serialize transaction");

while now.elapsed().as_secs() < wait_time as u64 {
let leaders = self
.tpu_client
.get_leader_tpu_service()
.leader_tpu_sockets(self.tpu_client.get_fanout_slots());
if num_confirmed == 0 {
for tpu_address in &leaders {
let cache = self.tpu_client.get_connection_cache();
let conn = cache.get_connection(tpu_address);
conn.send_data_async(wire_transaction.clone())?;
}
}

if let Ok(confirmed_blocks) = self.rpc_client().poll_for_signature_confirmation(
&transaction.signatures[0],
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
// Since network has seen the transaction, wait longer to receive
// all pending confirmations. Resending the transaction could result into
// extra transaction fees
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
}
}
info!("{attempt} tries failed transfer");
let blockhash = self.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
transaction: &Transaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(&transaction).expect("should serialize transaction");

let leaders = self
.tpu_client
.get_leader_tpu_service()
.leader_tpu_sockets(self.tpu_client.get_fanout_slots());

for tpu_address in &leaders {
let cache = self.tpu_client.get_connection_cache();
let conn = cache.get_connection(tpu_address);
conn.send_data_async(wire_transaction.clone())?;
}
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to confirm transaction".to_string(),
)
.into())

Ok(())
}

/// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
Expand Down

0 comments on commit d7a650d

Please sign in to comment.