From 36504abff77a4524e8f4b33516a0cde2633926a2 Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 10 May 2024 19:38:08 +0000 Subject: [PATCH 1/7] setup tpu client methods required for localcluster to use TpuClient --- tpu-client/src/nonblocking/tpu_client.rs | 19 +++- tpu-client/src/tpu_client.rs | 115 ++++++++++++++++++++++- 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 57a9b0b4033c61..ce274b8245d4d5 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -676,6 +676,23 @@ where self.exit.store(true, Ordering::Relaxed); self.leader_tpu_service.join().await; } + + pub fn get_connection_cache(&self) -> &Arc> + where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, + { + &self.connection_cache + } + + pub fn get_leader_tpu_service(&self) -> &LeaderTpuService { + &self.leader_tpu_service + } + + pub fn get_fanout_slots(&self) -> u64 { + self.fanout_slots + } } impl Drop for TpuClient { @@ -752,7 +769,7 @@ impl LeaderTpuService { self.recent_slots.estimated_current_slot() } - fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { let current_slot = self.recent_slots.estimated_current_slot(); self.leader_tpu_cache .read() diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 6b1e7dbe44f7ac..9283af67ecdf09 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -2,15 +2,25 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_connection_cache::connection_cache::{ - ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + solana_connection_cache::{ + client_connection::ClientConnection, + connection_cache::{ + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + }, }, solana_rpc_client::rpc_client::RpcClient, - solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, + solana_sdk::{ + client::AsyncClient, + clock::{Slot, MAX_PROCESSING_AGE}, + signature::Signature, + transaction::{Transaction, VersionedTransaction}, + transport::Result as TransportResult, + }, std::{ collections::VecDeque, net::UdpSocket, sync::{Arc, RwLock}, + time::Instant, }, }; #[cfg(feature = "spinner")] @@ -95,6 +105,66 @@ where self.invoke(self.tpu_client.try_send_transaction(transaction)) } + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + /// Attempts to send and confirm tx "tries" times + /// Waits for signature confirmation before returning + /// Returns the transaction signature + pub fn send_and_confirm_transaction_with_retries( + &self, + keypairs: &T, + transaction: &mut Transaction, + tries: usize, + pending_confirmations: usize, + ) -> TransportResult { + for x in 0..tries { + let now = Instant::now(); + let mut num_confirmed = 0; + let mut wait_time = MAX_PROCESSING_AGE; + // resend the same transaction until the transaction has no chance of succeeding + let wire_transaction = + bincode::serialize(&transaction).expect("transaction serialization failed"); + + while now.elapsed().as_secs() < wait_time as u64 { + let leaders = self + .tpu_client + .get_leader_tpu_service() + .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); + if num_confirmed == 0 { + for tpu_address in &leaders { + let cache = self.tpu_client.get_connection_cache(); + let conn = cache.get_connection(tpu_address); + conn.send_data_async(wire_transaction.clone())?; + } + } + + if let Ok(confirmed_blocks) = self.rpc_client().poll_for_signature_confirmation( + &transaction.signatures[0], + pending_confirmations, + ) { + num_confirmed = confirmed_blocks; + if confirmed_blocks >= pending_confirmations { + return Ok(transaction.signatures[0]); + } + // Since network has seen the transaction, wait longer to receive + // all pending confirmations. Resending the transaction could result into + // extra transaction fees + wait_time = wait_time.max( + MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), + ); + } + } + log::info!("{x} tries failed transfer"); + let blockhash = self.rpc_client().get_latest_blockhash()?; + transaction.sign(keypairs, blockhash); + } + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to confirm transaction".to_string(), + ) + .into()) + } + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according /// to fanout size /// Returns the last error if all sends fail @@ -115,6 +185,16 @@ where self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) } + pub fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + self.invoke( + self.tpu_client + .try_send_wire_transaction_batch(wire_transactions), + ) + } + /// Create a new client that disconnects when dropped pub fn new( name: &'static str, @@ -187,6 +267,35 @@ where } } +impl AsyncClient for TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ + fn async_send_versioned_transaction( + &self, + transaction: VersionedTransaction, + ) -> TransportResult { + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + self.send_wire_transaction(wire_transaction); + Ok(transaction.signatures[0]) + } + + fn async_send_versioned_transaction_batch( + &self, + batch: Vec, + ) -> TransportResult<()> { + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.try_send_wire_transaction_batch(buffers)?; + Ok(()) + } +} + // 48 chosen because it's unlikely that 12 leaders in a row will miss their slots const MAX_SLOT_SKIP_DISTANCE: u64 = 48; From f3bafafcd76c89300fe104426c3b774867202aad Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 10 May 2024 19:46:03 +0000 Subject: [PATCH 2/7] add new_tpu_quic_client() for local cluster tests --- local-cluster/src/cluster_tests.rs | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 09ee39d2e47dd8..68ef50dfdb420d 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -4,6 +4,7 @@ /// discover the rest of the network. use log::*; use { + crate::cluster::QuicTpuClient, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, solana_client::{ @@ -24,6 +25,7 @@ use { gossip_service::{self, discover_cluster, GossipService}, }, solana_ledger::blockstore::Blockstore, + solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ client::SyncClient, clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, @@ -40,6 +42,7 @@ use { transport::TransportError, }, solana_streamer::socket::SocketAddrSpace, + solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuSenderError}, solana_vote::vote_transaction::VoteTransaction, solana_vote_program::vote_transaction, std::{ @@ -678,3 +681,23 @@ pub fn submit_vote_to_cluster_gossip( socket_addr_space, ) } + +pub fn new_tpu_quic_client( + contact_info: &ContactInfo, + connection_cache: Arc, +) -> Result { + let rpc_pubsub_url = format!("ws://{}/", contact_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", contact_info.rpc().unwrap()); + + let cache = match &*connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), + }; + + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) +} From e95b87ce4bcdc969bbf120d23d9e7bd8e2d9ed2c Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 10 May 2024 19:57:20 +0000 Subject: [PATCH 3/7] update local-cluster src files to use TpuClient. tests next --- local-cluster/src/cluster.rs | 3 +- local-cluster/src/local_cluster.rs | 114 ++++++++++-------- .../src/local_cluster_snapshot_utils.rs | 3 +- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 99f31bf93504f7..c2e3acc60751a3 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,5 +1,4 @@ use { - solana_client::thin_client::ThinClient, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::shred::Shred, @@ -41,7 +40,7 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; - fn get_validator_client(&self, pubkey: &Pubkey) -> Option; + fn get_validator_client(&self, pubkey: &Pubkey) -> Option; fn build_tpu_quic_client(&self) -> Result; fn build_tpu_quic_client_with_commitment( &self, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index fc511f83d5296d..553afe951ce965 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -8,9 +8,7 @@ use { itertools::izip, log::*, solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs, - solana_client::{ - connection_cache::ConnectionCache, rpc_client::RpcClient, thin_client::ThinClient, - }, + solana_client::{connection_cache::ConnectionCache, rpc_client::RpcClient}, solana_core::{ consensus::tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -30,7 +28,6 @@ use { }, solana_sdk::{ account::{Account, AccountSharedData}, - client::SyncClient, clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, @@ -481,11 +478,7 @@ impl LocalCluster { mut voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &self.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); + let client = self.build_tpu_quic_client().expect("tpu_client"); // Must have enough tokens to fund vote account and set delegate let should_create_vote_pubkey = voting_keypair.is_none(); @@ -577,11 +570,7 @@ impl LocalCluster { } pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &self.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); + let client = self.build_tpu_quic_client().expect("new tpu quic client"); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) } @@ -675,13 +664,14 @@ impl LocalCluster { } fn transfer_with_client( - client: &ThinClient, + client: &QuicTpuClient, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64, ) -> u64 { trace!("getting leader blockhash"); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); @@ -692,19 +682,20 @@ impl LocalCluster { *dest_pubkey ); client - .retry_transfer(source_keypair, &mut tx, 10) - .expect("client transfer"); + .send_and_confirm_transaction_with_retries(&[source_keypair], &mut tx, 10, 0) + .expect("client transfer should succeed"); client + .rpc_client() .wait_for_balance_with_commitment( dest_pubkey, Some(lamports), CommitmentConfig::processed(), ) - .expect("get balance") + .expect("get balance should succeed") } fn setup_vote_and_stake_accounts( - client: &ThinClient, + client: &QuicTpuClient, vote_account: &Keypair, from_account: &Arc, amount: u64, @@ -720,6 +711,7 @@ impl LocalCluster { // Create the vote account if necessary if client + .rpc_client() .poll_get_balance_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()) .unwrap_or(0) == 0 @@ -729,6 +721,7 @@ impl LocalCluster { // as the cluster is already running, and using the wrong account size will cause the // InitializeAccount tx to fail let use_current_vote_state = client + .rpc_client() .poll_get_balance_with_commitment( &feature_set::vote_state_add_vote_latency::id(), CommitmentConfig::processed(), @@ -757,14 +750,16 @@ impl LocalCluster { &[from_account.as_ref(), vote_account], message, client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap() .0, ); client - .retry_transfer(from_account, &mut transaction, 10) + .send_and_confirm_transaction_with_retries(&[from_account], &mut transaction, 10, 0) .expect("fund vote"); client + .rpc_client() .wait_for_balance_with_commitment( &vote_account_pubkey, Some(amount), @@ -785,13 +780,14 @@ impl LocalCluster { &[from_account.as_ref(), &stake_account_keypair], message, client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap() .0, ); client - .send_and_confirm_transaction( + .send_and_confirm_transaction_with_retries( &[from_account.as_ref(), &stake_account_keypair], &mut transaction, 5, @@ -799,6 +795,7 @@ impl LocalCluster { ) .expect("delegate stake"); client + .rpc_client() .wait_for_balance_with_commitment( &stake_account_pubkey, Some(amount), @@ -814,36 +811,58 @@ impl LocalCluster { info!("Checking for vote account registration of {}", node_pubkey); match ( client + .rpc_client() .get_account_with_commitment(&stake_account_pubkey, CommitmentConfig::processed()), - client.get_account_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()), + client + .rpc_client() + .get_account_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()), ) { - (Ok(Some(stake_account)), Ok(Some(vote_account))) => { - match ( - stake_state::stake_from(&stake_account), - vote_state::from(&vote_account), - ) { - (Some(stake_state), Some(vote_state)) => { - if stake_state.delegation.voter_pubkey != vote_account_pubkey - || stake_state.delegation.stake != amount - { - Err(Error::new(ErrorKind::Other, "invalid stake account state")) - } else if vote_state.node_pubkey != node_pubkey { - Err(Error::new(ErrorKind::Other, "invalid vote account state")) - } else { - info!("node {} {:?} {:?}", node_pubkey, stake_state, vote_state); - - Ok(()) + (Ok(stake_account), Ok(vote_account)) => { + match (stake_account.value, vote_account.value) { + (Some(stake_account), Some(vote_account)) => { + match ( + stake_state::stake_from(&stake_account), + vote_state::from(&vote_account), + ) { + (Some(stake_state), Some(vote_state)) => { + if stake_state.delegation.voter_pubkey != vote_account_pubkey + || stake_state.delegation.stake != amount + { + Err(Error::new(ErrorKind::Other, "invalid stake account state")) + } else if vote_state.node_pubkey != node_pubkey { + Err(Error::new(ErrorKind::Other, "invalid vote account state")) + } else { + info!( + "node {} {:?} {:?}", + node_pubkey, stake_state, vote_state + ); + + return Ok(()); + } + } + (None, _) => { + Err(Error::new(ErrorKind::Other, "invalid stake account data")) + } + (_, None) => { + Err(Error::new(ErrorKind::Other, "invalid vote account data")) + } } } - (None, _) => Err(Error::new(ErrorKind::Other, "invalid stake account data")), - (_, None) => Err(Error::new(ErrorKind::Other, "invalid vote account data")), + (None, _) => Err(Error::new( + ErrorKind::Other, + "unable to retrieve stake account data", + )), + (_, None) => Err(Error::new( + ErrorKind::Other, + "unable to retrieve vote account data", + )), } } - (Ok(None), _) | (Err(_), _) => Err(Error::new( + (Err(_), _) => Err(Error::new( ErrorKind::Other, "unable to retrieve stake account data", )), - (_, Ok(None)) | (_, Err(_)) => Err(Error::new( + (_, Err(_)) => Err(Error::new( ErrorKind::Other, "unable to retrieve vote account data", )), @@ -895,13 +914,10 @@ impl Cluster for LocalCluster { self.validators.keys().cloned().collect() } - fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.validators.get(pubkey).map(|f| { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &f.info.contact_info, - ); - ThinClient::new(rpc, tpu, self.connection_cache.clone()) + fn get_validator_client(&self, pubkey: &Pubkey) -> Option { + self.validators.get(pubkey).map(|_| { + self.build_tpu_quic_client() + .expect("should build tpu quic client") }) } diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 259b9e1559ab69..3ba9cda04d9489 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -7,7 +7,7 @@ use { }, snapshot_utils, }, - solana_sdk::{client::SyncClient, commitment_config::CommitmentConfig}, + solana_sdk::commitment_config::CommitmentConfig, std::{ path::Path, thread::sleep, @@ -76,6 +76,7 @@ impl LocalCluster { .get_validator_client(self.entry_point_info.pubkey()) .unwrap(); let last_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .expect("Couldn't get slot"); From dd7ad734635ae427916706438c3c0013165b1afe Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 10 May 2024 20:15:32 +0000 Subject: [PATCH 4/7] finish removing thinclient from localcluster --- local-cluster/src/cluster_tests.rs | 78 +++++++++++++++------------ local-cluster/tests/local_cluster.rs | 79 ++++++++++++++++++---------- 2 files changed, 95 insertions(+), 62 deletions(-) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 68ef50dfdb420d..c8343225606464 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -7,10 +7,7 @@ use { crate::cluster::QuicTpuClient, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, - solana_client::{ - connection_cache::{ConnectionCache, Protocol}, - thin_client::ThinClient, - }, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_core::consensus::{ tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, VOTE_THRESHOLD_DEPTH, @@ -27,7 +24,6 @@ use { solana_ledger::blockstore::Blockstore, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ - client::SyncClient, clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, @@ -92,9 +88,9 @@ pub fn spend_and_verify_all_nodes( return; } let random_keypair = Keypair::new(); - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let bal = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -102,21 +98,29 @@ pub fn spend_and_verify_all_nodes( .expect("balance in source"); assert!(bal > 0); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .unwrap(); let mut transaction = system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; let sig = client - .retry_transfer_until_confirmed(funding_keypair, &mut transaction, 10, confs) + .send_and_confirm_transaction_with_retries( + &[funding_keypair], + &mut transaction, + 10, + confs, + ) .unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(validator.pubkey()) { continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); - client.poll_for_signature_confirmation(&sig, confs).unwrap(); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + client + .rpc_client() + .poll_for_signature_confirmation(&sig, confs) + .unwrap(); } }); } @@ -126,10 +130,10 @@ pub fn verify_balances( node: &ContactInfo, connection_cache: Arc, ) { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); - let client = ThinClient::new(rpc, tpu, connection_cache); + let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap(); for (pk, b) in expected_balances { let bal = client + .rpc_client() .poll_get_balance_with_commitment(&pk, CommitmentConfig::processed()) .expect("balance in source"); assert_eq!(bal, b); @@ -143,12 +147,12 @@ pub fn send_many_transactions( max_tokens_per_transfer: u64, num_txs: u64, ) -> HashMap { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap(); let mut expected_balances = HashMap::new(); for _ in 0..num_txs { let random_keypair = Keypair::new(); let bal = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -156,6 +160,7 @@ pub fn send_many_transactions( .expect("balance in source"); assert!(bal > 0); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer); @@ -168,7 +173,7 @@ pub fn send_many_transactions( ); client - .retry_transfer(funding_keypair, &mut transaction, 5) + .send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 5, 0) .unwrap(); expected_balances.insert(random_keypair.pubkey(), transfer_amount); @@ -241,14 +246,14 @@ pub fn kill_entry_and_spend_and_verify_rest( ) .unwrap(); assert!(cluster_nodes.len() >= nodes); - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), entry_point_info); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(entry_point_info, connection_cache.clone()).unwrap(); // sleep long enough to make sure we are in epoch 3 let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1); for ingress_node in &cluster_nodes { client + .rpc_client() .poll_get_balance_with_commitment(ingress_node.pubkey(), CommitmentConfig::processed()) .unwrap_or_else(|err| panic!("Node {} has no balance: {}", ingress_node.pubkey(), err)); } @@ -269,9 +274,9 @@ pub fn kill_entry_and_spend_and_verify_rest( continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let balance = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -289,6 +294,7 @@ pub fn kill_entry_and_spend_and_verify_rest( let random_keypair = Keypair::new(); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); let mut transaction = system_transaction::transfer( @@ -300,8 +306,8 @@ pub fn kill_entry_and_spend_and_verify_rest( let confs = VOTE_THRESHOLD_DEPTH + 1; let sig = { - let sig = client.retry_transfer_until_confirmed( - funding_keypair, + let sig = client.send_and_confirm_transaction_with_retries( + &[funding_keypair], &mut transaction, 5, confs, @@ -356,10 +362,10 @@ pub fn check_min_slot_is_rooted( let loop_start = Instant::now(); let loop_timeout = Duration::from_secs(180); for ingress_node in contact_infos.iter() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); loop { let root_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); if root_slot >= min_slot || last_print.elapsed().as_secs() > 3 { @@ -397,9 +403,9 @@ pub fn check_for_new_roots( assert!(loop_start.elapsed() < loop_timeout); for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let root_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); roots[i].insert(root_slot); @@ -430,13 +436,14 @@ pub fn check_no_new_roots( .iter() .enumerate() .map(|(i, ingress_node)| { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let initial_root = client + .rpc_client() .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())); roots[i] = initial_root; client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())) }) @@ -449,9 +456,9 @@ pub fn check_no_new_roots( let mut reached_end_slot = false; loop { for contact_info in contact_infos { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(contact_info, connection_cache.clone()).unwrap(); current_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].pubkey())); if current_slot > end_slot { @@ -475,10 +482,10 @@ pub fn check_no_new_roots( } for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); assert_eq!( client + .rpc_client() .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())), roots[i] @@ -497,9 +504,10 @@ fn poll_all_nodes_for_signature( if validator.pubkey() == entry_point_info.pubkey() { continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); - client.poll_for_signature_confirmation(sig, confs)?; + let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap(); + client + .rpc_client() + .poll_for_signature_confirmation(sig, confs)?; } Ok(()) diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 5336f7c7151278..2fb5e58b6eb422 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,6 @@ use { solana_accounts_db::{ hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs, }, - solana_client::thin_client::ThinClient, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -31,7 +30,7 @@ use { use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup, }, solana_local_cluster::{ - cluster::{Cluster, ClusterValidatorInfo}, + cluster::{Cluster, ClusterValidatorInfo, QuicTpuClient}, cluster_tests, integration_tests::{ copy_blocks, create_custom_leader_schedule, @@ -62,7 +61,7 @@ use { }, solana_sdk::{ account::AccountSharedData, - client::{AsyncClient, SyncClient}, + client::AsyncClient, clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, epoch_schedule::{DEFAULT_SLOTS_PER_EPOCH, MINIMUM_SLOTS_PER_EPOCH}, @@ -216,13 +215,10 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - non_bootstrap_info, - ); - let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let tx_client = cluster.build_tpu_quic_client().unwrap(); let (blockhash, _) = tx_client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); @@ -244,7 +240,12 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); tx_client - .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) + .send_and_confirm_transaction_with_retries( + &[&cluster.funding_keypair], + &mut transaction, + 5, + 0, + ) .unwrap(); let mut got_received_notification = false; @@ -371,6 +372,7 @@ fn test_restart_node() { ticks_per_slot, slots_per_epoch, stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, ..ClusterConfig::default() }, SocketAddrSpace::Unspecified, @@ -422,11 +424,7 @@ fn test_mainnet_beta_cluster_type() { .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - &cluster.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let client = cluster.build_tpu_quic_client().unwrap(); // Programs that are available at epoch 0 for program_id in [ @@ -444,8 +442,10 @@ fn test_mainnet_beta_cluster_type() { ( program_id, client + .rpc_client() .get_account_with_commitment(program_id, CommitmentConfig::processed()) .unwrap() + .value ), (_program_id, Some(_)) ); @@ -457,8 +457,10 @@ fn test_mainnet_beta_cluster_type() { ( program_id, client + .rpc_client() .get_account_with_commitment(program_id, CommitmentConfig::processed()) .unwrap() + .value ), (program_id, None) ); @@ -995,6 +997,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st let validator_current_slot = cluster .get_validator_client(&validator_identity.pubkey()) .unwrap() + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap(); trace!("validator current slot: {validator_current_slot}"); @@ -1230,6 +1233,7 @@ fn test_snapshot_restart_tower() { safe_clone_config(&leader_snapshot_test_config.validator_config), safe_clone_config(&validator_snapshot_test_config.validator_config), ], + skip_warmup_slots: true, ..ClusterConfig::default() }; @@ -1373,7 +1377,10 @@ fn test_snapshots_blockstore_floor() { let target_slot = slot_floor + 40; while current_slot <= target_slot { trace!("current_slot: {}", current_slot); - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { + if let Ok(slot) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::processed()) + { current_slot = slot; } else { continue; @@ -1565,6 +1572,7 @@ fn test_no_voting() { .unwrap(); loop { let last_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .expect("Couldn't get slot"); if last_slot > 4 * VOTE_THRESHOLD_DEPTH as u64 { @@ -1628,6 +1636,7 @@ fn test_optimistic_confirmation_violation_detection() { let mut prev_voted_slot = 0; loop { let last_voted_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap(); if last_voted_slot > 50 { @@ -1681,6 +1690,7 @@ fn test_optimistic_confirmation_violation_detection() { let client = cluster.get_validator_client(&node_to_restart).unwrap(); loop { let last_root = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap(); if last_root > prev_voted_slot { @@ -1758,7 +1768,10 @@ fn test_validator_saves_tower() { // Wait for some votes to be generated loop { - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { + if let Ok(slot) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::processed()) + { trace!("current slot: {}", slot); if slot > 2 { break; @@ -1783,7 +1796,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!("current root: {}", root); if root > 0 { break root; @@ -1812,7 +1828,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!( "current root: {}, last_replayed_root: {}", root, @@ -1845,7 +1864,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!("current root: {}, last tower root: {}", root, tower3_root); if root > tower3_root { break root; @@ -2651,11 +2673,7 @@ fn test_oc_bad_signatures() { ); // 3) Start up a spy to listen for and push votes to leader TPU - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - &cluster.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let client = cluster.build_tpu_quic_client().unwrap(); let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); @@ -2702,7 +2720,12 @@ fn test_oc_bad_signatures() { None, ); client - .retry_transfer(&cluster_funding_keypair, &mut vote_tx, 5) + .send_and_confirm_transaction_with_retries( + &[&cluster_funding_keypair], + &mut vote_tx, + 5, + 0, + ) .unwrap(); num_votes_simulated.fetch_add(1, Ordering::Relaxed); @@ -2856,8 +2879,8 @@ fn setup_transfer_scan_threads( num_starting_accounts: usize, exit: Arc, scan_commitment: CommitmentConfig, - update_client_receiver: Receiver, - scan_client_receiver: Receiver, + update_client_receiver: Receiver, + scan_client_receiver: Receiver, ) -> ( JoinHandle<()>, JoinHandle<()>, @@ -2895,6 +2918,7 @@ fn setup_transfer_scan_threads( return; } let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); for i in 0..starting_keypairs_.len() { @@ -2941,6 +2965,7 @@ fn setup_transfer_scan_threads( return; } if let Some(total_scan_balance) = client + .rpc_client() .get_program_accounts_with_config( &system_program::id(), scan_commitment_config.clone(), From bb85f32b2ce1176930102e05e1c5bce89b917021 Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 14 May 2024 18:17:35 +0000 Subject: [PATCH 5/7] address comments --- tpu-client/src/tpu_client.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 9283af67ecdf09..a9dfafddfb0715 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -1,6 +1,7 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, + log::*, rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_connection_cache::{ client_connection::ClientConnection, @@ -107,23 +108,23 @@ where /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size - /// Attempts to send and confirm tx "tries" times - /// Waits for signature confirmation before returning - /// Returns the transaction signature + /// Attempt to send and confirm tx "attempts" times + /// Wait for signature confirmation before returning + /// Return the transaction signature pub fn send_and_confirm_transaction_with_retries( &self, keypairs: &T, transaction: &mut Transaction, - tries: usize, + attempts: usize, pending_confirmations: usize, ) -> TransportResult { - for x in 0..tries { + for attempt in 0..attempts { let now = Instant::now(); let mut num_confirmed = 0; let mut wait_time = MAX_PROCESSING_AGE; // resend the same transaction until the transaction has no chance of succeeding let wire_transaction = - bincode::serialize(&transaction).expect("transaction serialization failed"); + bincode::serialize(&transaction).expect("should serialize transaction"); while now.elapsed().as_secs() < wait_time as u64 { let leaders = self @@ -154,7 +155,7 @@ where ); } } - log::info!("{x} tries failed transfer"); + info!("{attempt} tries failed transfer"); let blockhash = self.rpc_client().get_latest_blockhash()?; transaction.sign(keypairs, blockhash); } @@ -267,6 +268,8 @@ where } } +// Methods below are required for calls to client.async_transfer() +// where client is of type TpuClient impl AsyncClient for TpuClient where P: ConnectionPool, From 943f8ed15eff0e2bc55f6735fd4b545811deceb0 Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 28 May 2024 19:31:27 +0000 Subject: [PATCH 6/7] add note for send_and_confirm_transaction_with_retries --- tpu-client/src/tpu_client.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index a9dfafddfb0715..c463f3f9fdcd82 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -111,6 +111,10 @@ where /// Attempt to send and confirm tx "attempts" times /// Wait for signature confirmation before returning /// Return the transaction signature + /// NOTE: send_wire_transaction() and try_send_transaction() above both fail in a specific case when used in LocalCluster + /// They both invoke the nonblocking TPUClient and both fail when calling "transfer_with_client()" multiple times + /// I do not full understand WHY the nonblocking TPUClient fails in this specific case. But the method defined below + /// does work although it has only been tested in LocalCluster integration tests pub fn send_and_confirm_transaction_with_retries( &self, keypairs: &T, From d7a650de4de737c51f371fbe8098b439ff6ac136 Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 11 Jun 2024 00:09:16 +0000 Subject: [PATCH 7/7] remove retry logic from tpu-client. Send directly to upcoming leaders without retry. --- local-cluster/src/cluster_tests.rs | 41 +++++---------- local-cluster/src/local_cluster.rs | 18 +++---- local-cluster/tests/local_cluster.rs | 19 ++----- tpu-client/src/tpu_client.rs | 76 +++++++--------------------- 4 files changed, 41 insertions(+), 113 deletions(-) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index c8343225606464..b31c874b9e6db3 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -101,16 +101,11 @@ pub fn spend_and_verify_all_nodes( .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .unwrap(); - let mut transaction = + let transaction = system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; - let sig = client - .send_and_confirm_transaction_with_retries( - &[funding_keypair], - &mut transaction, - 10, - confs, - ) + client + .send_transaction_to_upcoming_leaders(&transaction) .unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(validator.pubkey()) { @@ -119,7 +114,7 @@ pub fn spend_and_verify_all_nodes( let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); client .rpc_client() - .poll_for_signature_confirmation(&sig, confs) + .poll_for_signature_confirmation(&transaction.signatures[0], confs) .unwrap(); } }); @@ -165,7 +160,7 @@ pub fn send_many_transactions( .unwrap(); let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), transfer_amount, @@ -173,7 +168,7 @@ pub fn send_many_transactions( ); client - .send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 5, 0) + .send_transaction_to_upcoming_leaders(&transaction) .unwrap(); expected_balances.insert(random_keypair.pubkey(), transfer_amount); @@ -297,7 +292,7 @@ pub fn kill_entry_and_spend_and_verify_rest( .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), 1, @@ -305,28 +300,16 @@ pub fn kill_entry_and_spend_and_verify_rest( ); let confs = VOTE_THRESHOLD_DEPTH + 1; - let sig = { - let sig = client.send_and_confirm_transaction_with_retries( - &[funding_keypair], - &mut transaction, - 5, - confs, - ); - match sig { - Err(e) => { - result = Err(e); - continue; - } - - Ok(sig) => sig, - } - }; + if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) { + result = Err(e); + continue; + } info!("poll_all_nodes_for_signature()"); match poll_all_nodes_for_signature( entry_point_info, &cluster_nodes, connection_cache, - &sig, + &transaction.signatures[0], confs, ) { Err(e) => { diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 553afe951ce965..c6f85994a7e5d8 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -674,15 +674,16 @@ impl LocalCluster { .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); + let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); info!( "executing transfer of {} from {} to {}", lamports, source_keypair.pubkey(), *dest_pubkey ); + client - .send_and_confirm_transaction_with_retries(&[source_keypair], &mut tx, 10, 0) + .send_transaction_to_upcoming_leaders(&tx) .expect("client transfer should succeed"); client .rpc_client() @@ -746,7 +747,7 @@ impl LocalCluster { }, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let mut transaction = Transaction::new( + let transaction = Transaction::new( &[from_account.as_ref(), vote_account], message, client @@ -756,7 +757,7 @@ impl LocalCluster { .0, ); client - .send_and_confirm_transaction_with_retries(&[from_account], &mut transaction, 10, 0) + .send_transaction_to_upcoming_leaders(&transaction) .expect("fund vote"); client .rpc_client() @@ -776,7 +777,7 @@ impl LocalCluster { amount, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let mut transaction = Transaction::new( + let transaction = Transaction::new( &[from_account.as_ref(), &stake_account_keypair], message, client @@ -787,12 +788,7 @@ impl LocalCluster { ); client - .send_and_confirm_transaction_with_retries( - &[from_account.as_ref(), &stake_account_keypair], - &mut transaction, - 5, - 0, - ) + .send_transaction_to_upcoming_leaders(&transaction) .expect("delegate stake"); client .rpc_client() diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 2fb5e58b6eb422..6e161f474253f2 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -222,7 +222,7 @@ fn test_local_cluster_signature_subscribe() { .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( &cluster.funding_keypair, &solana_sdk::pubkey::new_rand(), 10, @@ -240,12 +240,7 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); tx_client - .send_and_confirm_transaction_with_retries( - &[&cluster.funding_keypair], - &mut transaction, - 5, - 0, - ) + .send_transaction_to_upcoming_leaders(&transaction) .unwrap(); let mut got_received_notification = false; @@ -2674,7 +2669,6 @@ fn test_oc_bad_signatures() { // 3) Start up a spy to listen for and push votes to leader TPU let client = cluster.build_tpu_quic_client().unwrap(); - let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); let gossip_voter = cluster_tests::start_gossip_voter( @@ -2709,7 +2703,7 @@ fn test_oc_bad_signatures() { let vote_slots: Vec = vec![vote_slot]; let bad_authorized_signer_keypair = Keypair::new(); - let mut vote_tx = vote_transaction::new_vote_transaction( + let vote_tx = vote_transaction::new_vote_transaction( vote_slots, vote_hash, leader_vote_tx.message.recent_blockhash, @@ -2720,12 +2714,7 @@ fn test_oc_bad_signatures() { None, ); client - .send_and_confirm_transaction_with_retries( - &[&cluster_funding_keypair], - &mut vote_tx, - 5, - 0, - ) + .send_transaction_to_upcoming_leaders(&vote_tx) .unwrap(); num_votes_simulated.fetch_add(1, Ordering::Relaxed); diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index c463f3f9fdcd82..469b4105fe27ec 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -1,7 +1,6 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, - log::*, rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_connection_cache::{ client_connection::ClientConnection, @@ -12,7 +11,7 @@ use { solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ client::AsyncClient, - clock::{Slot, MAX_PROCESSING_AGE}, + clock::Slot, signature::Signature, transaction::{Transaction, VersionedTransaction}, transport::Result as TransportResult, @@ -21,7 +20,6 @@ use { collections::VecDeque, net::UdpSocket, sync::{Arc, RwLock}, - time::Instant, }, }; #[cfg(feature = "spinner")] @@ -107,67 +105,29 @@ where } /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout - /// size - /// Attempt to send and confirm tx "attempts" times - /// Wait for signature confirmation before returning - /// Return the transaction signature /// NOTE: send_wire_transaction() and try_send_transaction() above both fail in a specific case when used in LocalCluster /// They both invoke the nonblocking TPUClient and both fail when calling "transfer_with_client()" multiple times /// I do not full understand WHY the nonblocking TPUClient fails in this specific case. But the method defined below /// does work although it has only been tested in LocalCluster integration tests - pub fn send_and_confirm_transaction_with_retries( + pub fn send_transaction_to_upcoming_leaders( &self, - keypairs: &T, - transaction: &mut Transaction, - attempts: usize, - pending_confirmations: usize, - ) -> TransportResult { - for attempt in 0..attempts { - let now = Instant::now(); - let mut num_confirmed = 0; - let mut wait_time = MAX_PROCESSING_AGE; - // resend the same transaction until the transaction has no chance of succeeding - let wire_transaction = - bincode::serialize(&transaction).expect("should serialize transaction"); - - while now.elapsed().as_secs() < wait_time as u64 { - let leaders = self - .tpu_client - .get_leader_tpu_service() - .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); - if num_confirmed == 0 { - for tpu_address in &leaders { - let cache = self.tpu_client.get_connection_cache(); - let conn = cache.get_connection(tpu_address); - conn.send_data_async(wire_transaction.clone())?; - } - } - - if let Ok(confirmed_blocks) = self.rpc_client().poll_for_signature_confirmation( - &transaction.signatures[0], - pending_confirmations, - ) { - num_confirmed = confirmed_blocks; - if confirmed_blocks >= pending_confirmations { - return Ok(transaction.signatures[0]); - } - // Since network has seen the transaction, wait longer to receive - // all pending confirmations. Resending the transaction could result into - // extra transaction fees - wait_time = wait_time.max( - MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), - ); - } - } - info!("{attempt} tries failed transfer"); - let blockhash = self.rpc_client().get_latest_blockhash()?; - transaction.sign(keypairs, blockhash); + transaction: &Transaction, + ) -> TransportResult<()> { + let wire_transaction = + bincode::serialize(&transaction).expect("should serialize transaction"); + + let leaders = self + .tpu_client + .get_leader_tpu_service() + .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); + + for tpu_address in &leaders { + let cache = self.tpu_client.get_connection_cache(); + let conn = cache.get_connection(tpu_address); + conn.send_data_async(wire_transaction.clone())?; } - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "failed to confirm transaction".to_string(), - ) - .into()) + + Ok(()) } /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according