From e673f0a735fb17f3c4e71ac6190d4a389364a4e9 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 10 Apr 2024 22:47:33 +0000 Subject: [PATCH 01/10] wip. tpuclient for transfer_with_client and setup_vote_and_stake_accounts --- local-cluster/src/local_cluster.rs | 115 +++++++++++++++++------------ 1 file changed, 67 insertions(+), 48 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index fc511f83d5296d..43a334bd83c8c8 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -30,7 +30,6 @@ use { }, solana_sdk::{ account::{Account, AccountSharedData}, - client::SyncClient, clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT}, commitment_config::CommitmentConfig, epoch_schedule::EpochSchedule, @@ -481,11 +480,7 @@ impl LocalCluster { mut voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &self.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); + let client = self.build_tpu_quic_client().expect("tpu_client"); // Must have enough tokens to fund vote account and set delegate let should_create_vote_pubkey = voting_keypair.is_none(); @@ -508,10 +503,12 @@ impl LocalCluster { &validator_pubkey, stake * 2 + 2, ); + info!( "validator {} balance {}", validator_pubkey, validator_balance ); + Self::setup_vote_and_stake_accounts( &client, voting_keypair.as_ref().unwrap(), @@ -577,11 +574,7 @@ impl LocalCluster { } pub fn transfer(&self, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64) -> u64 { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &self.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, self.connection_cache.clone()); + let client = self.build_tpu_quic_client().expect("new tpu quic client"); Self::transfer_with_client(&client, source_keypair, dest_pubkey, lamports) } @@ -675,16 +668,18 @@ impl LocalCluster { } fn transfer_with_client( - client: &ThinClient, + client: &QuicTpuClient, source_keypair: &Keypair, dest_pubkey: &Pubkey, lamports: u64, ) -> u64 { trace!("getting leader blockhash"); + let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); + let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); info!( "executing transfer of {} from {} to {}", lamports, @@ -692,19 +687,20 @@ impl LocalCluster { *dest_pubkey ); client - .retry_transfer(source_keypair, &mut tx, 10) - .expect("client transfer"); + .try_send_transaction(&tx) + .expect("client transfer should succeed"); client + .rpc_client() .wait_for_balance_with_commitment( dest_pubkey, Some(lamports), CommitmentConfig::processed(), ) - .expect("get balance") + .expect("get balance should succeed") } fn setup_vote_and_stake_accounts( - client: &ThinClient, + client: &QuicTpuClient, vote_account: &Keypair, from_account: &Arc, amount: u64, @@ -720,6 +716,7 @@ impl LocalCluster { // Create the vote account if necessary if client + .rpc_client() .poll_get_balance_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()) .unwrap_or(0) == 0 @@ -729,6 +726,7 @@ impl LocalCluster { // as the cluster is already running, and using the wrong account size will cause the // InitializeAccount tx to fail let use_current_vote_state = client + .rpc_client() .poll_get_balance_with_commitment( &feature_set::vote_state_add_vote_latency::id(), CommitmentConfig::processed(), @@ -753,18 +751,20 @@ impl LocalCluster { }, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let mut transaction = Transaction::new( + let transaction = Transaction::new( &[from_account.as_ref(), vote_account], message, client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap() .0, ); client - .retry_transfer(from_account, &mut transaction, 10) - .expect("fund vote"); + .try_send_transaction(&transaction) + .expect("should fund vote"); client + .rpc_client() .wait_for_balance_with_commitment( &vote_account_pubkey, Some(amount), @@ -781,24 +781,21 @@ impl LocalCluster { amount, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let mut transaction = Transaction::new( + let transaction = Transaction::new( &[from_account.as_ref(), &stake_account_keypair], message, client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap() .0, ); client - .send_and_confirm_transaction( - &[from_account.as_ref(), &stake_account_keypair], - &mut transaction, - 5, - 0, - ) + .try_send_transaction(&transaction) .expect("delegate stake"); client + .rpc_client() .wait_for_balance_with_commitment( &stake_account_pubkey, Some(amount), @@ -814,36 +811,58 @@ impl LocalCluster { info!("Checking for vote account registration of {}", node_pubkey); match ( client + .rpc_client() .get_account_with_commitment(&stake_account_pubkey, CommitmentConfig::processed()), - client.get_account_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()), + client + .rpc_client() + .get_account_with_commitment(&vote_account_pubkey, CommitmentConfig::processed()), ) { - (Ok(Some(stake_account)), Ok(Some(vote_account))) => { - match ( - stake_state::stake_from(&stake_account), - vote_state::from(&vote_account), - ) { - (Some(stake_state), Some(vote_state)) => { - if stake_state.delegation.voter_pubkey != vote_account_pubkey - || stake_state.delegation.stake != amount - { - Err(Error::new(ErrorKind::Other, "invalid stake account state")) - } else if vote_state.node_pubkey != node_pubkey { - Err(Error::new(ErrorKind::Other, "invalid vote account state")) - } else { - info!("node {} {:?} {:?}", node_pubkey, stake_state, vote_state); - - Ok(()) + (Ok(stake_account), Ok(vote_account)) => { + match (stake_account.value, vote_account.value) { + (Some(stake_account), Some(vote_account)) => { + match ( + stake_state::stake_from(&stake_account), + vote_state::from(&vote_account), + ) { + (Some(stake_state), Some(vote_state)) => { + if stake_state.delegation.voter_pubkey != vote_account_pubkey + || stake_state.delegation.stake != amount + { + Err(Error::new(ErrorKind::Other, "invalid stake account state")) + } else if vote_state.node_pubkey != node_pubkey { + Err(Error::new(ErrorKind::Other, "invalid vote account state")) + } else { + info!( + "node {} {:?} {:?}", + node_pubkey, stake_state, vote_state + ); + + return Ok(()); + } + } + (None, _) => { + Err(Error::new(ErrorKind::Other, "invalid stake account data")) + } + (_, None) => { + Err(Error::new(ErrorKind::Other, "invalid vote account data")) + } } } - (None, _) => Err(Error::new(ErrorKind::Other, "invalid stake account data")), - (_, None) => Err(Error::new(ErrorKind::Other, "invalid vote account data")), + (None, _) => Err(Error::new( + ErrorKind::Other, + "unable to retrieve stake account data", + )), + (_, None) => Err(Error::new( + ErrorKind::Other, + "unable to retrieve vote account data", + )), } } - (Ok(None), _) | (Err(_), _) => Err(Error::new( + (Err(_), _) => Err(Error::new( ErrorKind::Other, "unable to retrieve stake account data", )), - (_, Ok(None)) | (_, Err(_)) => Err(Error::new( + (_, Err(_)) => Err(Error::new( ErrorKind::Other, "unable to retrieve vote account data", )), From a592fc4d377ae40ac4f9b608c660a1544ce2475a Mon Sep 17 00:00:00 2001 From: greg Date: Thu, 11 Apr 2024 01:13:25 +0000 Subject: [PATCH 02/10] add send_and_confirm_transaction for tpu client --- local-cluster/src/local_cluster.rs | 2 +- tpu-client/src/nonblocking/tpu_client.rs | 27 +++++++++++++++++++++++- tpu-client/src/tpu_client.rs | 12 ++++++++++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 43a334bd83c8c8..555b50ab77e66e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -792,7 +792,7 @@ impl LocalCluster { ); client - .try_send_transaction(&transaction) + .send_and_confirm_transaction(&transaction) .expect("delegate stake"); client .rpc_client() diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 57a9b0b4033c61..a2d7bd9e3892ac 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -26,7 +26,7 @@ use { epoch_info::EpochInfo, pubkey::Pubkey, quic::QUIC_PORT_OFFSET, - signature::SignerError, + signature::{Signature, SignerError}, transaction::Transaction, transport::{Result as TransportResult, TransportError}, }, @@ -489,6 +489,31 @@ where } } + pub async fn send_and_confirm_transaction( + &self, + transaction: &Transaction, + ) -> TransportResult { + let pending_confirmations: usize = 0; + let wire_transaction = + bincode::serialize(&transaction).expect("transaction serialization failed"); + let _ = self.send_wire_transaction(wire_transaction).await; + + if let Ok(confirmed_blocks) = self + .rpc_client() + .poll_for_signature_confirmation(&transaction.signatures[0], pending_confirmations) + .await + { + if confirmed_blocks >= pending_confirmations { + return Ok(transaction.signatures[0]); + } + } + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to confirm transaction".to_string(), + ) + .into()) + } + /// Create a new client that disconnects when dropped pub async fn new( name: &'static str, diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 6b1e7dbe44f7ac..bdcb888ed17a84 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -6,7 +6,10 @@ use { ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, }, solana_rpc_client::rpc_client::RpcClient, - solana_sdk::{clock::Slot, transaction::Transaction, transport::Result as TransportResult}, + solana_sdk::{ + clock::Slot, signature::Signature, transaction::Transaction, + transport::Result as TransportResult, + }, std::{ collections::VecDeque, net::UdpSocket, @@ -115,6 +118,13 @@ where self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) } + pub fn send_and_confirm_transaction( + &self, + transaction: &Transaction, + ) -> TransportResult { + self.invoke(self.tpu_client.send_and_confirm_transaction(transaction)) + } + /// Create a new client that disconnects when dropped pub fn new( name: &'static str, From 67a23ed359d9bb2181114e499a7b9e773c73527b Mon Sep 17 00:00:00 2001 From: greg Date: Thu, 11 Apr 2024 02:00:39 +0000 Subject: [PATCH 03/10] change cluster trait to use QuicTpuClient and update upstream dependencies --- local-cluster/src/cluster.rs | 2 +- local-cluster/src/local_cluster.rs | 15 +++---- .../src/local_cluster_snapshot_utils.rs | 1 + local-cluster/tests/local_cluster.rs | 37 ++++++++++++---- tpu-client/src/tpu_client.rs | 44 ++++++++++++++++++- 5 files changed, 79 insertions(+), 20 deletions(-) diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 99f31bf93504f7..b99a08c6034ee3 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -41,7 +41,7 @@ impl ClusterValidatorInfo { pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; - fn get_validator_client(&self, pubkey: &Pubkey) -> Option; + fn get_validator_client(&self, pubkey: &Pubkey) -> Option; fn build_tpu_quic_client(&self) -> Result; fn build_tpu_quic_client_with_commitment( &self, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 555b50ab77e66e..2c4ef79b95976e 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -8,9 +8,7 @@ use { itertools::izip, log::*, solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs, - solana_client::{ - connection_cache::ConnectionCache, rpc_client::RpcClient, thin_client::ThinClient, - }, + solana_client::{connection_cache::ConnectionCache, rpc_client::RpcClient}, solana_core::{ consensus::tower_storage::FileTowerStorage, validator::{Validator, ValidatorConfig, ValidatorStartProgress}, @@ -914,13 +912,10 @@ impl Cluster for LocalCluster { self.validators.keys().cloned().collect() } - fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.validators.get(pubkey).map(|f| { - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - self.connection_cache.protocol(), - &f.info.contact_info, - ); - ThinClient::new(rpc, tpu, self.connection_cache.clone()) + fn get_validator_client(&self, pubkey: &Pubkey) -> Option { + self.validators.get(pubkey).map(|_| { + self.build_tpu_quic_client() + .expect("should build tpu quic client") }) } diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 259b9e1559ab69..206728ab45444c 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -76,6 +76,7 @@ impl LocalCluster { .get_validator_client(self.entry_point_info.pubkey()) .unwrap(); let last_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .expect("Couldn't get slot"); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 5336f7c7151278..0cf71ae0c3b809 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -31,7 +31,7 @@ use { use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup, }, solana_local_cluster::{ - cluster::{Cluster, ClusterValidatorInfo}, + cluster::{Cluster, ClusterValidatorInfo, QuicTpuClient}, cluster_tests, integration_tests::{ copy_blocks, create_custom_leader_schedule, @@ -995,6 +995,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st let validator_current_slot = cluster .get_validator_client(&validator_identity.pubkey()) .unwrap() + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap(); trace!("validator current slot: {validator_current_slot}"); @@ -1373,7 +1374,10 @@ fn test_snapshots_blockstore_floor() { let target_slot = slot_floor + 40; while current_slot <= target_slot { trace!("current_slot: {}", current_slot); - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { + if let Ok(slot) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::processed()) + { current_slot = slot; } else { continue; @@ -1565,6 +1569,7 @@ fn test_no_voting() { .unwrap(); loop { let last_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .expect("Couldn't get slot"); if last_slot > 4 * VOTE_THRESHOLD_DEPTH as u64 { @@ -1628,6 +1633,7 @@ fn test_optimistic_confirmation_violation_detection() { let mut prev_voted_slot = 0; loop { let last_voted_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap(); if last_voted_slot > 50 { @@ -1681,6 +1687,7 @@ fn test_optimistic_confirmation_violation_detection() { let client = cluster.get_validator_client(&node_to_restart).unwrap(); loop { let last_root = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap(); if last_root > prev_voted_slot { @@ -1758,7 +1765,10 @@ fn test_validator_saves_tower() { // Wait for some votes to be generated loop { - if let Ok(slot) = validator_client.get_slot_with_commitment(CommitmentConfig::processed()) { + if let Ok(slot) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::processed()) + { trace!("current slot: {}", slot); if slot > 2 { break; @@ -1783,7 +1793,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!("current root: {}", root); if root > 0 { break root; @@ -1812,7 +1825,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!( "current root: {}, last_replayed_root: {}", root, @@ -1845,7 +1861,10 @@ fn test_validator_saves_tower() { #[allow(deprecated)] // This test depends on knowing the immediate root, without any delay from the commitment // service, so the deprecated CommitmentConfig::root() is retained - if let Ok(root) = validator_client.get_slot_with_commitment(CommitmentConfig::root()) { + if let Ok(root) = validator_client + .rpc_client() + .get_slot_with_commitment(CommitmentConfig::root()) + { trace!("current root: {}, last tower root: {}", root, tower3_root); if root > tower3_root { break root; @@ -2856,8 +2875,8 @@ fn setup_transfer_scan_threads( num_starting_accounts: usize, exit: Arc, scan_commitment: CommitmentConfig, - update_client_receiver: Receiver, - scan_client_receiver: Receiver, + update_client_receiver: Receiver, + scan_client_receiver: Receiver, ) -> ( JoinHandle<()>, JoinHandle<()>, @@ -2895,6 +2914,7 @@ fn setup_transfer_scan_threads( return; } let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); for i in 0..starting_keypairs_.len() { @@ -2941,6 +2961,7 @@ fn setup_transfer_scan_threads( return; } if let Some(total_scan_balance) = client + .rpc_client() .get_program_accounts_with_config( &system_program::id(), scan_commitment_config.clone(), diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index bdcb888ed17a84..d886867639b0ed 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -7,7 +7,10 @@ use { }, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ - clock::Slot, signature::Signature, transaction::Transaction, + client::AsyncClient, + clock::Slot, + signature::Signature, + transaction::{Transaction, VersionedTransaction}, transport::Result as TransportResult, }, std::{ @@ -118,6 +121,16 @@ where self.invoke(self.tpu_client.try_send_wire_transaction(wire_transaction)) } + pub fn try_send_wire_transaction_batch( + &self, + wire_transactions: Vec>, + ) -> TransportResult<()> { + self.invoke( + self.tpu_client + .try_send_wire_transaction_batch(wire_transactions), + ) + } + pub fn send_and_confirm_transaction( &self, transaction: &Transaction, @@ -197,6 +210,35 @@ where } } +impl AsyncClient for TpuClient +where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, +{ + fn async_send_versioned_transaction( + &self, + transaction: VersionedTransaction, + ) -> TransportResult { + let wire_transaction = + bincode::serialize(&transaction).expect("serialize Transaction in send_batch"); + self.send_wire_transaction(wire_transaction); + Ok(transaction.signatures[0]) + } + + fn async_send_versioned_transaction_batch( + &self, + batch: Vec, + ) -> TransportResult<()> { + let buffers = batch + .into_par_iter() + .map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch")) + .collect::>(); + self.try_send_wire_transaction_batch(buffers)?; + Ok(()) + } +} + // 48 chosen because it's unlikely that 12 leaders in a row will miss their slots const MAX_SLOT_SKIP_DISTANCE: u64 = 48; From fe53aefc30ae7280a8075d9d054c74e9ea774398 Mon Sep 17 00:00:00 2001 From: greg Date: Mon, 22 Apr 2024 20:51:34 +0000 Subject: [PATCH 04/10] add try_send_transaction_blocking for tpu_client and expose connection cache and leader tpu service from nonblocking tpuclient --- local-cluster/src/cluster.rs | 1 - local-cluster/src/local_cluster.rs | 2 +- .../src/local_cluster_snapshot_utils.rs | 2 +- tpu-client/src/nonblocking/tpu_client.rs | 19 +++++- tpu-client/src/tpu_client.rs | 65 ++++++++++++++++++- 5 files changed, 83 insertions(+), 6 deletions(-) diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index b99a08c6034ee3..c2e3acc60751a3 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,5 +1,4 @@ use { - solana_client::thin_client::ThinClient, solana_core::validator::{Validator, ValidatorConfig}, solana_gossip::{cluster_info::Node, contact_info::ContactInfo}, solana_ledger::shred::Shred, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 2c4ef79b95976e..d0d7a816aac99b 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -759,7 +759,7 @@ impl LocalCluster { .0, ); client - .try_send_transaction(&transaction) + .try_send_transaction_blocking(&transaction) .expect("should fund vote"); client .rpc_client() diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 206728ab45444c..3ba9cda04d9489 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -7,7 +7,7 @@ use { }, snapshot_utils, }, - solana_sdk::{client::SyncClient, commitment_config::CommitmentConfig}, + solana_sdk::commitment_config::CommitmentConfig, std::{ path::Path, thread::sleep, diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index a2d7bd9e3892ac..6cb106b519c495 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -701,6 +701,23 @@ where self.exit.store(true, Ordering::Relaxed); self.leader_tpu_service.join().await; } + + pub fn get_connection_cache(&self) -> Arc> + where + P: ConnectionPool, + M: ConnectionManager, + C: NewConnectionConfig, + { + self.connection_cache.clone() + } + + pub fn get_leader_tpu_service(&self) -> &LeaderTpuService { + &self.leader_tpu_service + } + + pub fn get_fanout_slots(&self) -> u64 { + self.fanout_slots + } } impl Drop for TpuClient { @@ -777,7 +794,7 @@ impl LeaderTpuService { self.recent_slots.estimated_current_slot() } - fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { + pub fn leader_tpu_sockets(&self, fanout_slots: u64) -> Vec { let current_slot = self.recent_slots.estimated_current_slot(); self.leader_tpu_cache .read() diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index d886867639b0ed..b2da296a3f8f13 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -2,8 +2,11 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, rayon::iter::{IntoParallelIterator, ParallelIterator}, - solana_connection_cache::connection_cache::{ - ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + solana_connection_cache::{ + client_connection::ClientConnection, + connection_cache::{ + ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, + }, }, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ @@ -101,6 +104,64 @@ where self.invoke(self.tpu_client.try_send_transaction(transaction)) } + pub fn try_send_transaction_blocking( + &self, + transaction: &Transaction, + ) -> TransportResult { + let pending_confirmations: usize = 0; + let wire_transaction = + bincode::serialize(&transaction).expect("transaction serialization failed"); + + let leaders = self + .tpu_client + .get_leader_tpu_service() + .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); + let cc = self.tpu_client.get_connection_cache(); + for tpu_address in &leaders { + let conn = cc.get_connection(tpu_address); + match conn.send_data_async(wire_transaction.clone()) { + Ok(_) => println!("tpuclient send_data success"), + Err(err) => println!("tpuclient send_data failed: {err}"), + } + } + + // match self.rpc_client().poll_for_signature_confirmation( + // &transaction.signatures[0], + // pending_confirmations, + // ) { + // Ok(confirmed_blocks) => { + // println!("tpuclient confirmed blocks found: {confirmed_blocks}"); + // // println!("thinclient num confirmed: {num_confirmed}"); + // // num_confirmed = confirmed_blocks; + // if confirmed_blocks >= pending_confirmations { + // println!("tpuclient confirmed blocks >= pending confirmations {confirmed_blocks} > {pending_confirmations}"); + // return Ok(transaction.signatures[0]); + // } + // // Since network has seen the transaction, wait longer to receive + // // all pending confirmations. Resending the transaction could result into + // // extra transaction fees + // // wait_time = wait_time.max( + // // MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), + // // ); + // } + // Err(err) => println!("tpuclient error polling for signature confirmation: err: {err}"), + // } + + if let Ok(confirmed_blocks) = self + .rpc_client() + .poll_for_signature_confirmation(&transaction.signatures[0], pending_confirmations) + { + if confirmed_blocks >= pending_confirmations { + return Ok(transaction.signatures[0]); + } + } + Err(std::io::Error::new( + std::io::ErrorKind::Other, + "failed to confirm transaction".to_string(), + ) + .into()) + } + /// Serialize and send a batch of transactions to the current and upcoming leader TPUs according /// to fanout size /// Returns the last error if all sends fail From dd26a393d7987b590cc2259ab00e193f397f97d2 Mon Sep 17 00:00:00 2001 From: greg Date: Mon, 22 Apr 2024 22:38:15 +0000 Subject: [PATCH 05/10] dont clone connection cache. and use try_send_transaction_blocking in transfer_with_client --- local-cluster/src/local_cluster.rs | 4 +-- tpu-client/src/nonblocking/tpu_client.rs | 4 +-- tpu-client/src/tpu_client.rs | 31 +++--------------------- 3 files changed, 7 insertions(+), 32 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d0d7a816aac99b..35d1b9ffc23ce8 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -685,7 +685,7 @@ impl LocalCluster { *dest_pubkey ); client - .try_send_transaction(&tx) + .try_send_transaction_blocking(&tx) .expect("client transfer should succeed"); client .rpc_client() @@ -759,7 +759,7 @@ impl LocalCluster { .0, ); client - .try_send_transaction_blocking(&transaction) + .try_send_transaction(&transaction) .expect("should fund vote"); client .rpc_client() diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 6cb106b519c495..97a62bdf4d135f 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -702,13 +702,13 @@ where self.leader_tpu_service.join().await; } - pub fn get_connection_cache(&self) -> Arc> + pub fn get_connection_cache(&self) -> &Arc> where P: ConnectionPool, M: ConnectionManager, C: NewConnectionConfig, { - self.connection_cache.clone() + &self.connection_cache } pub fn get_leader_tpu_service(&self) -> &LeaderTpuService { diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index b2da296a3f8f13..c2a9ccbe4a619a 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -116,37 +116,12 @@ where .tpu_client .get_leader_tpu_service() .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); - let cc = self.tpu_client.get_connection_cache(); + let cache = self.tpu_client.get_connection_cache(); for tpu_address in &leaders { - let conn = cc.get_connection(tpu_address); - match conn.send_data_async(wire_transaction.clone()) { - Ok(_) => println!("tpuclient send_data success"), - Err(err) => println!("tpuclient send_data failed: {err}"), - } + let conn = cache.get_connection(tpu_address); + conn.send_data_async(wire_transaction.clone())?; } - // match self.rpc_client().poll_for_signature_confirmation( - // &transaction.signatures[0], - // pending_confirmations, - // ) { - // Ok(confirmed_blocks) => { - // println!("tpuclient confirmed blocks found: {confirmed_blocks}"); - // // println!("thinclient num confirmed: {num_confirmed}"); - // // num_confirmed = confirmed_blocks; - // if confirmed_blocks >= pending_confirmations { - // println!("tpuclient confirmed blocks >= pending confirmations {confirmed_blocks} > {pending_confirmations}"); - // return Ok(transaction.signatures[0]); - // } - // // Since network has seen the transaction, wait longer to receive - // // all pending confirmations. Resending the transaction could result into - // // extra transaction fees - // // wait_time = wait_time.max( - // // MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), - // // ); - // } - // Err(err) => println!("tpuclient error polling for signature confirmation: err: {err}"), - // } - if let Ok(confirmed_blocks) = self .rpc_client() .poll_for_signature_confirmation(&transaction.signatures[0], pending_confirmations) From c114eadf6ebe6d4444c1f18b4ef25ce8ab0fac73 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 24 Apr 2024 01:42:34 +0000 Subject: [PATCH 06/10] switch local-cluster tests to use TpuClient --- local-cluster/tests/local_cluster.rs | 88 ++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 24 deletions(-) diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 0cf71ae0c3b809..b7d4092617d425 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,7 @@ use { solana_accounts_db::{ hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs, }, - solana_client::thin_client::ThinClient, + solana_client::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig}, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -62,7 +62,7 @@ use { }, solana_sdk::{ account::AccountSharedData, - client::{AsyncClient, SyncClient}, + client::AsyncClient, clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE}, commitment_config::CommitmentConfig, epoch_schedule::{DEFAULT_SLOTS_PER_EPOCH, MINIMUM_SLOTS_PER_EPOCH}, @@ -76,6 +76,7 @@ use { vote::state::TowerSync, }, solana_streamer::socket::SocketAddrSpace, + solana_tpu_client::tpu_client::TpuClient, solana_turbine::broadcast_stage::{ broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition}, BroadcastStageType, @@ -216,17 +217,28 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - non_bootstrap_info, - ); - let tx_client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let cache = match &*cluster.connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), + }; + + let tx_client = TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap(); let (blockhash, _) = tx_client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( &cluster.funding_keypair, &solana_sdk::pubkey::new_rand(), 10, @@ -244,8 +256,12 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); tx_client - .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) - .unwrap(); + .try_send_transaction(&transaction) + .expect("should execute transfer"); + + // tx_client + // .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) + // .unwrap(); let mut got_received_notification = false; loop { @@ -422,11 +438,21 @@ fn test_mainnet_beta_cluster_type() { .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - &cluster.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let cache = match &*cluster.connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), + }; + + let client = TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap(); // Programs that are available at epoch 0 for program_id in [ @@ -444,8 +470,10 @@ fn test_mainnet_beta_cluster_type() { ( program_id, client + .rpc_client() .get_account_with_commitment(program_id, CommitmentConfig::processed()) .unwrap() + .value ), (_program_id, Some(_)) ); @@ -457,8 +485,10 @@ fn test_mainnet_beta_cluster_type() { ( program_id, client + .rpc_client() .get_account_with_commitment(program_id, CommitmentConfig::processed()) .unwrap() + .value ), (program_id, None) ); @@ -2670,12 +2700,22 @@ fn test_oc_bad_signatures() { ); // 3) Start up a spy to listen for and push votes to leader TPU - let (rpc, tpu) = cluster_tests::get_client_facing_addr( - cluster.connection_cache.protocol(), - &cluster.entry_point_info, - ); - let client = ThinClient::new(rpc, tpu, cluster.connection_cache.clone()); - let cluster_funding_keypair = cluster.funding_keypair.insecure_clone(); + let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); + + let cache = match &*cluster.connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), + }; + + let client = TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) + .unwrap(); + let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); let gossip_voter = cluster_tests::start_gossip_voter( @@ -2710,7 +2750,7 @@ fn test_oc_bad_signatures() { let vote_slots: Vec = vec![vote_slot]; let bad_authorized_signer_keypair = Keypair::new(); - let mut vote_tx = vote_transaction::new_vote_transaction( + let vote_tx = vote_transaction::new_vote_transaction( vote_slots, vote_hash, leader_vote_tx.message.recent_blockhash, @@ -2721,8 +2761,8 @@ fn test_oc_bad_signatures() { None, ); client - .retry_transfer(&cluster_funding_keypair, &mut vote_tx, 5) - .unwrap(); + .try_send_transaction(&vote_tx) + .expect("Should execute vote_tx"); num_votes_simulated.fetch_add(1, Ordering::Relaxed); } From 3a147ce2dd890650e2326c0230cfdf92baabea9b Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 24 Apr 2024 18:45:45 +0000 Subject: [PATCH 07/10] rename try_send_transaction_blocking to send_and_confirm_transaction --- local-cluster/src/local_cluster.rs | 2 +- tpu-client/src/tpu_client.rs | 13 +++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 35d1b9ffc23ce8..d22b3c5579b332 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -685,7 +685,7 @@ impl LocalCluster { *dest_pubkey ); client - .try_send_transaction_blocking(&tx) + .send_and_confirm_transaction(&tx) .expect("client transfer should succeed"); client .rpc_client() diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index c2a9ccbe4a619a..e982ca735f10f6 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -104,7 +104,11 @@ where self.invoke(self.tpu_client.try_send_transaction(transaction)) } - pub fn try_send_transaction_blocking( + /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout + /// size + /// Waits for signature confirmation before returning + /// Returns the transaction signature + pub fn send_and_confirm_transaction( &self, transaction: &Transaction, ) -> TransportResult { @@ -167,13 +171,6 @@ where ) } - pub fn send_and_confirm_transaction( - &self, - transaction: &Transaction, - ) -> TransportResult { - self.invoke(self.tpu_client.send_and_confirm_transaction(transaction)) - } - /// Create a new client that disconnects when dropped pub fn new( name: &'static str, From 2887b9bd36594466d7df6f514f5a4f2f51f18016 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 8 May 2024 20:36:30 +0000 Subject: [PATCH 08/10] clean up --- local-cluster/src/local_cluster.rs | 2 - local-cluster/tests/local_cluster.rs | 54 ++---------------------- tpu-client/src/nonblocking/tpu_client.rs | 27 +----------- 3 files changed, 4 insertions(+), 79 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d22b3c5579b332..d211dbb31413dc 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -501,12 +501,10 @@ impl LocalCluster { &validator_pubkey, stake * 2 + 2, ); - info!( "validator {} balance {}", validator_pubkey, validator_balance ); - Self::setup_vote_and_stake_accounts( &client, voting_keypair.as_ref().unwrap(), diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index b7d4092617d425..88a2b7caf3ebc0 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,6 @@ use { solana_accounts_db::{ hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs, }, - solana_client::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig}, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -76,7 +75,6 @@ use { vote::state::TowerSync, }, solana_streamer::socket::SocketAddrSpace, - solana_tpu_client::tpu_client::TpuClient, solana_turbine::broadcast_stage::{ broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition}, BroadcastStageType, @@ -217,21 +215,7 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let non_bootstrap_info = cluster.get_contact_info(&non_bootstrap_id).unwrap(); - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let cache = match &*cluster.connection_cache { - ConnectionCache::Quic(cache) => cache, - ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), - }; - - let tx_client = TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap(); + let tx_client = cluster.build_tpu_quic_client().unwrap(); let (blockhash, _) = tx_client .rpc_client() @@ -259,10 +243,6 @@ fn test_local_cluster_signature_subscribe() { .try_send_transaction(&transaction) .expect("should execute transfer"); - // tx_client - // .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) - // .unwrap(); - let mut got_received_notification = false; loop { let responses: Vec<_> = receiver.try_iter().collect(); @@ -438,21 +418,7 @@ fn test_mainnet_beta_cluster_type() { .unwrap(); assert_eq!(cluster_nodes.len(), 1); - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let cache = match &*cluster.connection_cache { - ConnectionCache::Quic(cache) => cache, - ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), - }; - - let client = TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap(); + let client = cluster.build_tpu_quic_client().unwrap(); // Programs that are available at epoch 0 for program_id in [ @@ -2700,21 +2666,7 @@ fn test_oc_bad_signatures() { ); // 3) Start up a spy to listen for and push votes to leader TPU - let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap()); - let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap()); - - let cache = match &*cluster.connection_cache { - ConnectionCache::Quic(cache) => cache, - ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), - }; - - let client = TpuClient::new_with_connection_cache( - Arc::new(RpcClient::new(rpc_url)), - rpc_pubsub_url.as_str(), - TpuClientConfig::default(), - cache.clone(), - ) - .unwrap(); + let client = cluster.build_tpu_quic_client().unwrap(); let voter_thread_sleep_ms: usize = 100; let num_votes_simulated = Arc::new(AtomicUsize::new(0)); diff --git a/tpu-client/src/nonblocking/tpu_client.rs b/tpu-client/src/nonblocking/tpu_client.rs index 97a62bdf4d135f..ce274b8245d4d5 100644 --- a/tpu-client/src/nonblocking/tpu_client.rs +++ b/tpu-client/src/nonblocking/tpu_client.rs @@ -26,7 +26,7 @@ use { epoch_info::EpochInfo, pubkey::Pubkey, quic::QUIC_PORT_OFFSET, - signature::{Signature, SignerError}, + signature::SignerError, transaction::Transaction, transport::{Result as TransportResult, TransportError}, }, @@ -489,31 +489,6 @@ where } } - pub async fn send_and_confirm_transaction( - &self, - transaction: &Transaction, - ) -> TransportResult { - let pending_confirmations: usize = 0; - let wire_transaction = - bincode::serialize(&transaction).expect("transaction serialization failed"); - let _ = self.send_wire_transaction(wire_transaction).await; - - if let Ok(confirmed_blocks) = self - .rpc_client() - .poll_for_signature_confirmation(&transaction.signatures[0], pending_confirmations) - .await - { - if confirmed_blocks >= pending_confirmations { - return Ok(transaction.signatures[0]); - } - } - Err(std::io::Error::new( - std::io::ErrorKind::Other, - "failed to confirm transaction".to_string(), - ) - .into()) - } - /// Create a new client that disconnects when dropped pub async fn new( name: &'static str, From 0cdce8989ca18d61baa89506244882a1173cf653 Mon Sep 17 00:00:00 2001 From: greg Date: Wed, 8 May 2024 23:46:17 +0000 Subject: [PATCH 09/10] remove thinclient from localcluster/src/cluster_tests --- local-cluster/src/cluster_tests.rs | 119 ++++++++++++++++++----------- 1 file changed, 73 insertions(+), 46 deletions(-) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 09ee39d2e47dd8..07747f58782781 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -4,12 +4,10 @@ /// discover the rest of the network. use log::*; use { + crate::cluster::QuicTpuClient, rand::{thread_rng, Rng}, rayon::{prelude::*, ThreadPool}, - solana_client::{ - connection_cache::{ConnectionCache, Protocol}, - thin_client::ThinClient, - }, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_core::consensus::{ tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, VOTE_THRESHOLD_DEPTH, @@ -24,8 +22,8 @@ use { gossip_service::{self, discover_cluster, GossipService}, }, solana_ledger::blockstore::Blockstore, + solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ - client::SyncClient, clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS}, commitment_config::CommitmentConfig, epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, @@ -40,6 +38,7 @@ use { transport::TransportError, }, solana_streamer::socket::SocketAddrSpace, + solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuSenderError}, solana_vote::vote_transaction::VoteTransaction, solana_vote_program::vote_transaction, std::{ @@ -89,9 +88,11 @@ pub fn spend_and_verify_all_nodes( return; } let random_keypair = Keypair::new(); - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + let bal = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -99,21 +100,22 @@ pub fn spend_and_verify_all_nodes( .expect("balance in source"); assert!(bal > 0); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .unwrap(); - let mut transaction = + let transaction = system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; - let sig = client - .retry_transfer_until_confirmed(funding_keypair, &mut transaction, 10, confs) - .unwrap(); + let sig = client.send_and_confirm_transaction(&transaction).unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(validator.pubkey()) { continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); - client.poll_for_signature_confirmation(&sig, confs).unwrap(); + let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap(); + client + .rpc_client() + .poll_for_signature_confirmation(&sig, confs) + .unwrap(); } }); } @@ -123,10 +125,10 @@ pub fn verify_balances( node: &ContactInfo, connection_cache: Arc, ) { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); - let client = ThinClient::new(rpc, tpu, connection_cache); + let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap(); for (pk, b) in expected_balances { let bal = client + .rpc_client() .poll_get_balance_with_commitment(&pk, CommitmentConfig::processed()) .expect("balance in source"); assert_eq!(bal, b); @@ -140,12 +142,13 @@ pub fn send_many_transactions( max_tokens_per_transfer: u64, num_txs: u64, ) -> HashMap { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap(); + let mut expected_balances = HashMap::new(); for _ in 0..num_txs { let random_keypair = Keypair::new(); let bal = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -153,20 +156,19 @@ pub fn send_many_transactions( .expect("balance in source"); assert!(bal > 0); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), transfer_amount, blockhash, ); - client - .retry_transfer(funding_keypair, &mut transaction, 5) - .unwrap(); + client.try_send_transaction(&transaction).unwrap(); expected_balances.insert(random_keypair.pubkey(), transfer_amount); } @@ -238,14 +240,14 @@ pub fn kill_entry_and_spend_and_verify_rest( ) .unwrap(); assert!(cluster_nodes.len() >= nodes); - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), entry_point_info); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(entry_point_info, connection_cache.clone()).unwrap(); // sleep long enough to make sure we are in epoch 3 let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1); for ingress_node in &cluster_nodes { client + .rpc_client() .poll_get_balance_with_commitment(ingress_node.pubkey(), CommitmentConfig::processed()) .unwrap_or_else(|err| panic!("Node {} has no balance: {}", ingress_node.pubkey(), err)); } @@ -266,9 +268,10 @@ pub fn kill_entry_and_spend_and_verify_rest( continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + let balance = client + .rpc_client() .poll_get_balance_with_commitment( &funding_keypair.pubkey(), CommitmentConfig::processed(), @@ -286,9 +289,10 @@ pub fn kill_entry_and_spend_and_verify_rest( let random_keypair = Keypair::new(); let (blockhash, _) = client + .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let mut transaction = system_transaction::transfer( + let transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), 1, @@ -297,12 +301,7 @@ pub fn kill_entry_and_spend_and_verify_rest( let confs = VOTE_THRESHOLD_DEPTH + 1; let sig = { - let sig = client.retry_transfer_until_confirmed( - funding_keypair, - &mut transaction, - 5, - confs, - ); + let sig = client.send_and_confirm_transaction(&transaction); match sig { Err(e) => { result = Err(e); @@ -353,10 +352,11 @@ pub fn check_min_slot_is_rooted( let loop_start = Instant::now(); let loop_timeout = Duration::from_secs(180); for ingress_node in contact_infos.iter() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + loop { let root_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); if root_slot >= min_slot || last_print.elapsed().as_secs() > 3 { @@ -394,9 +394,10 @@ pub fn check_for_new_roots( assert!(loop_start.elapsed() < loop_timeout); for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + let root_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::finalized()) .unwrap_or(0); roots[i].insert(root_slot); @@ -427,13 +428,15 @@ pub fn check_no_new_roots( .iter() .enumerate() .map(|(i, ingress_node)| { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + let initial_root = client + .rpc_client() .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())); roots[i] = initial_root; client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())) }) @@ -446,9 +449,10 @@ pub fn check_no_new_roots( let mut reached_end_slot = false; loop { for contact_info in contact_infos { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(contact_info, connection_cache.clone()).unwrap(); + current_slot = client + .rpc_client() .get_slot_with_commitment(CommitmentConfig::processed()) .unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].pubkey())); if current_slot > end_slot { @@ -472,10 +476,11 @@ pub fn check_no_new_roots( } for (i, ingress_node) in contact_infos.iter().enumerate() { - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); + let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); + assert_eq!( client + .rpc_client() .get_slot() .unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())), roots[i] @@ -494,9 +499,11 @@ fn poll_all_nodes_for_signature( if validator.pubkey() == entry_point_info.pubkey() { continue; } - let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator); - let client = ThinClient::new(rpc, tpu, connection_cache.clone()); - client.poll_for_signature_confirmation(sig, confs)?; + let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap(); + + client + .rpc_client() + .poll_for_signature_confirmation(sig, confs)?; } Ok(()) @@ -678,3 +685,23 @@ pub fn submit_vote_to_cluster_gossip( socket_addr_space, ) } + +pub fn new_tpu_quic_client( + contact_info: &ContactInfo, + connection_cache: Arc, +) -> Result { + let rpc_pubsub_url = format!("ws://{}/", contact_info.rpc_pubsub().unwrap()); + let rpc_url = format!("http://{}", contact_info.rpc().unwrap()); + + let cache = match &*connection_cache { + ConnectionCache::Quic(cache) => cache, + ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"), + }; + + TpuClient::new_with_connection_cache( + Arc::new(RpcClient::new(rpc_url)), + rpc_pubsub_url.as_str(), + TpuClientConfig::default(), + cache.clone(), + ) +} From b86668ff63d368d37ce2b1ebe1df6027a4afebed Mon Sep 17 00:00:00 2001 From: greg Date: Thu, 9 May 2024 22:56:40 +0000 Subject: [PATCH 10/10] remove warmup epochs in some test cases build send_and_confirm_transaction_with_retries --- local-cluster/src/cluster_tests.rs | 28 +++++++---- local-cluster/src/local_cluster.rs | 8 ++-- local-cluster/tests/local_cluster.rs | 2 + tpu-client/src/tpu_client.rs | 71 +++++++++++++++++++--------- 4 files changed, 74 insertions(+), 35 deletions(-) diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 07747f58782781..79ebcb645a63bc 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -88,7 +88,6 @@ pub fn spend_and_verify_all_nodes( return; } let random_keypair = Keypair::new(); - let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap(); let bal = client @@ -103,10 +102,17 @@ pub fn spend_and_verify_all_nodes( .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .unwrap(); - let transaction = + let mut transaction = system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash); let confs = VOTE_THRESHOLD_DEPTH + 1; - let sig = client.send_and_confirm_transaction(&transaction).unwrap(); + let sig = client + .send_and_confirm_transaction_with_retries( + &[funding_keypair], + &mut transaction, + 10, + confs, + ) + .unwrap(); for validator in &cluster_nodes { if ignore_nodes.contains(validator.pubkey()) { continue; @@ -161,14 +167,16 @@ pub fn send_many_transactions( .unwrap(); let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer); - let transaction = system_transaction::transfer( + let mut transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), transfer_amount, blockhash, ); - client.try_send_transaction(&transaction).unwrap(); + client + .send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 5, 0) + .unwrap(); expected_balances.insert(random_keypair.pubkey(), transfer_amount); } @@ -292,7 +300,7 @@ pub fn kill_entry_and_spend_and_verify_rest( .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let transaction = system_transaction::transfer( + let mut transaction = system_transaction::transfer( funding_keypair, &random_keypair.pubkey(), 1, @@ -301,7 +309,12 @@ pub fn kill_entry_and_spend_and_verify_rest( let confs = VOTE_THRESHOLD_DEPTH + 1; let sig = { - let sig = client.send_and_confirm_transaction(&transaction); + let sig = client.send_and_confirm_transaction_with_retries( + &[funding_keypair], + &mut transaction, + 5, + confs, + ); match sig { Err(e) => { result = Err(e); @@ -500,7 +513,6 @@ fn poll_all_nodes_for_signature( continue; } let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap(); - client .rpc_client() .poll_for_signature_confirmation(sig, confs)?; diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d211dbb31413dc..4d516d3001b6a1 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -675,7 +675,7 @@ impl LocalCluster { .rpc_client() .get_latest_blockhash_with_commitment(CommitmentConfig::processed()) .unwrap(); - let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); + let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash); info!( "executing transfer of {} from {} to {}", lamports, @@ -683,7 +683,7 @@ impl LocalCluster { *dest_pubkey ); client - .send_and_confirm_transaction(&tx) + .send_and_confirm_transaction_with_retries(&[source_keypair], &mut tx, 10, 0) .expect("client transfer should succeed"); client .rpc_client() @@ -777,7 +777,7 @@ impl LocalCluster { amount, ); let message = Message::new(&instructions, Some(&from_account.pubkey())); - let transaction = Transaction::new( + let mut transaction = Transaction::new( &[from_account.as_ref(), &stake_account_keypair], message, client @@ -788,7 +788,7 @@ impl LocalCluster { ); client - .send_and_confirm_transaction(&transaction) + .send_and_confirm_transaction_with_retries(&[from_account], &mut transaction, 5, 0) .expect("delegate stake"); client .rpc_client() diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 88a2b7caf3ebc0..e3da7b8f9e576d 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -367,6 +367,7 @@ fn test_restart_node() { ticks_per_slot, slots_per_epoch, stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, ..ClusterConfig::default() }, SocketAddrSpace::Unspecified, @@ -1227,6 +1228,7 @@ fn test_snapshot_restart_tower() { safe_clone_config(&leader_snapshot_test_config.validator_config), safe_clone_config(&validator_snapshot_test_config.validator_config), ], + skip_warmup_slots: true, ..ClusterConfig::default() }; diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index e982ca735f10f6..1c706d04d46964 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -1,6 +1,7 @@ pub use crate::nonblocking::tpu_client::TpuSenderError; use { crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient, + log::*, rayon::iter::{IntoParallelIterator, ParallelIterator}, solana_connection_cache::{ client_connection::ClientConnection, @@ -11,7 +12,7 @@ use { solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ client::AsyncClient, - clock::Slot, + clock::{Slot, MAX_PROCESSING_AGE}, signature::Signature, transaction::{Transaction, VersionedTransaction}, transport::Result as TransportResult, @@ -20,6 +21,7 @@ use { collections::VecDeque, net::UdpSocket, sync::{Arc, RwLock}, + time::Instant, }, }; #[cfg(feature = "spinner")] @@ -106,33 +108,56 @@ where /// Serialize and send transaction to the current and upcoming leader TPUs according to fanout /// size + /// Attempts to send and confirm tx "tries" times /// Waits for signature confirmation before returning /// Returns the transaction signature - pub fn send_and_confirm_transaction( + pub fn send_and_confirm_transaction_with_retries( &self, - transaction: &Transaction, + keypairs: &T, + transaction: &mut Transaction, + tries: usize, + pending_confirmations: usize, ) -> TransportResult { - let pending_confirmations: usize = 0; - let wire_transaction = - bincode::serialize(&transaction).expect("transaction serialization failed"); - - let leaders = self - .tpu_client - .get_leader_tpu_service() - .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); - let cache = self.tpu_client.get_connection_cache(); - for tpu_address in &leaders { - let conn = cache.get_connection(tpu_address); - conn.send_data_async(wire_transaction.clone())?; - } - - if let Ok(confirmed_blocks) = self - .rpc_client() - .poll_for_signature_confirmation(&transaction.signatures[0], pending_confirmations) - { - if confirmed_blocks >= pending_confirmations { - return Ok(transaction.signatures[0]); + for x in 0..tries { + let now = Instant::now(); + let mut num_confirmed = 0; + let mut wait_time = MAX_PROCESSING_AGE; + // resend the same transaction until the transaction has no chance of succeeding + let wire_transaction = + bincode::serialize(&transaction).expect("transaction serialization failed"); + + while now.elapsed().as_secs() < wait_time as u64 { + let leaders = self + .tpu_client + .get_leader_tpu_service() + .leader_tpu_sockets(self.tpu_client.get_fanout_slots()); + if num_confirmed == 0 { + for tpu_address in &leaders { + let cache = self.tpu_client.get_connection_cache(); + let conn = cache.get_connection(tpu_address); + conn.send_data_async(wire_transaction.clone())?; + } + } + + if let Ok(confirmed_blocks) = self.rpc_client().poll_for_signature_confirmation( + &transaction.signatures[0], + pending_confirmations, + ) { + num_confirmed = confirmed_blocks; + if confirmed_blocks >= pending_confirmations { + return Ok(transaction.signatures[0]); + } + // Since network has seen the transaction, wait longer to receive + // all pending confirmations. Resending the transaction could result into + // extra transaction fees + wait_time = wait_time.max( + MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed), + ); + } } + info!("{} tries failed transfer", x); + let blockhash = self.rpc_client().get_latest_blockhash()?; + transaction.sign(keypairs, blockhash); } Err(std::io::Error::new( std::io::ErrorKind::Other,