Skip to content

Commit

Permalink
remove retry logic from tpu-client and put in LocalCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Jun 11, 2024
1 parent 943f8ed commit 73dbfe3
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 103 deletions.
32 changes: 19 additions & 13 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/// discover the rest of the network.
use log::*;
use {
crate::cluster::QuicTpuClient,
crate::{cluster::QuicTpuClient, local_cluster::LocalCluster},
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
solana_client::connection_cache::{ConnectionCache, Protocol},
Expand Down Expand Up @@ -104,14 +104,14 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
let mut transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = client
.send_and_confirm_transaction_with_retries(
&[funding_keypair],
&mut transaction,
10,
confs,
)
.unwrap();
let sig = LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
10,
confs,
)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
continue;
Expand Down Expand Up @@ -172,9 +172,14 @@ pub fn send_many_transactions(
blockhash,
);

client
.send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 5, 0)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
}
Expand Down Expand Up @@ -306,7 +311,8 @@ pub fn kill_entry_and_spend_and_verify_rest(

let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = {
let sig = client.send_and_confirm_transaction_with_retries(
let sig = LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
5,
Expand Down
90 changes: 75 additions & 15 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ use {
},
solana_sdk::{
account::{Account, AccountSharedData},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE},
commitment_config::CommitmentConfig,
epoch_schedule::EpochSchedule,
feature_set,
genesis_config::{ClusterType, GenesisConfig},
message::Message,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
signature::{Keypair, Signature, Signer},
signers::Signers,
stake::{
instruction as stake_instruction,
state::{Authorized, Lockup},
},
system_transaction,
transaction::Transaction,
transport::TransportError,
},
solana_stake_program::stake_state,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
Expand All @@ -61,6 +63,7 @@ use {
net::{IpAddr, Ipv4Addr, UdpSocket},
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::Instant,
},
};

Expand Down Expand Up @@ -663,6 +666,58 @@ impl LocalCluster {
info!("{} done waiting for roots", test_name);
}

/// Send transaction to the current and upcoming leader TPUs according to fanout size
/// Attempt to send and confirm tx "attempts" times
/// Wait for signature confirmation before returning
/// Return the transaction signature
pub fn send_transaction_with_retries<T: Signers + ?Sized>(
client: &QuicTpuClient,
keypairs: &T,
transaction: &mut Transaction,
attempts: usize,
pending_confirmations: usize,
) -> std::result::Result<Signature, TransportError> {
for attempt in 0..attempts {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;

while now.elapsed().as_secs() < wait_time as u64 {
// resend the same transaction until the transaction has no chance of succeeding
if num_confirmed == 0 {
client
.send_transaction_to_upcoming_leaders(transaction)
.expect("client transfer with retries should succeed");
}

// Check for confirmation
if let Ok(confirmed_blocks) = client.rpc_client().poll_for_signature_confirmation(
&transaction.signatures[0],
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
// Since network has seen the transaction, wait longer to receive
// all pending confirmations. Resending the transaction could result into
// extra transaction fees
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
}
}
info!("{attempt} tries failed transfer");
let blockhash = client.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
}
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to confirm transaction".to_string(),
)
.into())
}

fn transfer_with_client(
client: &QuicTpuClient,
source_keypair: &Keypair,
Expand All @@ -681,8 +736,8 @@ impl LocalCluster {
source_keypair.pubkey(),
*dest_pubkey
);
client
.send_and_confirm_transaction_with_retries(&[source_keypair], &mut tx, 10, 0)

LocalCluster::send_transaction_with_retries(client, &[source_keypair], &mut tx, 10, 0)
.expect("client transfer should succeed");
client
.rpc_client()
Expand Down Expand Up @@ -755,9 +810,14 @@ impl LocalCluster {
.unwrap()
.0,
);
client
.send_and_confirm_transaction_with_retries(&[from_account], &mut transaction, 10, 0)
.expect("fund vote");
LocalCluster::send_transaction_with_retries(
client,
&[from_account],
&mut transaction,
10,
0,
)
.expect("fund vote");
client
.rpc_client()
.wait_for_balance_with_commitment(
Expand Down Expand Up @@ -786,14 +846,14 @@ impl LocalCluster {
.0,
);

client
.send_and_confirm_transaction_with_retries(
&[from_account.as_ref(), &stake_account_keypair],
&mut transaction,
5,
0,
)
.expect("delegate stake");
LocalCluster::send_transaction_with_retries(
client,
&[from_account.as_ref(), &stake_account_keypair],
&mut transaction,
5,
0,
)
.expect("delegate stake");
client
.rpc_client()
.wait_for_balance_with_commitment(
Expand Down
32 changes: 16 additions & 16 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,14 @@ fn test_local_cluster_signature_subscribe() {
)
.unwrap();

tx_client
.send_and_confirm_transaction_with_retries(
&[&cluster.funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();
LocalCluster::send_transaction_with_retries(
&tx_client,
&[&cluster.funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();

let mut got_received_notification = false;
loop {
Expand Down Expand Up @@ -2719,14 +2719,14 @@ fn test_oc_bad_signatures() {
&bad_authorized_signer_keypair,
None,
);
client
.send_and_confirm_transaction_with_retries(
&[&cluster_funding_keypair],
&mut vote_tx,
5,
0,
)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[&cluster_funding_keypair],
&mut vote_tx,
5,
0,
)
.unwrap();

num_votes_simulated.fetch_add(1, Ordering::Relaxed);
}
Expand Down
77 changes: 18 additions & 59 deletions tpu-client/src/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub use crate::nonblocking::tpu_client::TpuSenderError;
use {
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::{
client_connection::ClientConnection,
Expand All @@ -12,7 +11,7 @@ use {
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
client::AsyncClient,
clock::{Slot, MAX_PROCESSING_AGE},
clock::Slot,
signature::Signature,
transaction::{Transaction, VersionedTransaction},
transport::Result as TransportResult,
Expand All @@ -21,7 +20,6 @@ use {
collections::VecDeque,
net::UdpSocket,
sync::{Arc, RwLock},
time::Instant,
},
};
#[cfg(feature = "spinner")]
Expand Down Expand Up @@ -106,68 +104,29 @@ where
self.invoke(self.tpu_client.try_send_transaction(transaction))
}

/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
/// Attempt to send and confirm tx "attempts" times
/// Wait for signature confirmation before returning
/// Return the transaction signature
/// NOTE: send_wire_transaction() and try_send_transaction() above both fail in a specific case when used in LocalCluster
/// They both invoke the nonblocking TPUClient and both fail when calling "transfer_with_client()" multiple times
/// I do not full understand WHY the nonblocking TPUClient fails in this specific case. But the method defined below
/// does work although it has only been tested in LocalCluster integration tests
pub fn send_and_confirm_transaction_with_retries<T: Signers + ?Sized>(
pub fn send_transaction_to_upcoming_leaders(
&self,
keypairs: &T,
transaction: &mut Transaction,
attempts: usize,
pending_confirmations: usize,
) -> TransportResult<Signature> {
for attempt in 0..attempts {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;
// resend the same transaction until the transaction has no chance of succeeding
let wire_transaction =
bincode::serialize(&transaction).expect("should serialize transaction");

while now.elapsed().as_secs() < wait_time as u64 {
let leaders = self
.tpu_client
.get_leader_tpu_service()
.leader_tpu_sockets(self.tpu_client.get_fanout_slots());
if num_confirmed == 0 {
for tpu_address in &leaders {
let cache = self.tpu_client.get_connection_cache();
let conn = cache.get_connection(tpu_address);
conn.send_data_async(wire_transaction.clone())?;
}
}

if let Ok(confirmed_blocks) = self.rpc_client().poll_for_signature_confirmation(
&transaction.signatures[0],
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
// Since network has seen the transaction, wait longer to receive
// all pending confirmations. Resending the transaction could result into
// extra transaction fees
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
}
}
info!("{attempt} tries failed transfer");
let blockhash = self.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
transaction: &Transaction,
) -> TransportResult<()> {
let wire_transaction =
bincode::serialize(&transaction).expect("should serialize transaction");

let leaders = self
.tpu_client
.get_leader_tpu_service()
.leader_tpu_sockets(self.tpu_client.get_fanout_slots());

for tpu_address in &leaders {
let cache = self.tpu_client.get_connection_cache();
let conn = cache.get_connection(tpu_address);
conn.send_data_async(wire_transaction.clone())?;
}
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to confirm transaction".to_string(),
)
.into())

Ok(())
}

/// Serialize and send a batch of transactions to the current and upcoming leader TPUs according
Expand Down

0 comments on commit 73dbfe3

Please sign in to comment.