Skip to content

Commit

Permalink
Remove ThinClient from LocalCluster (anza-xyz#1300)
Browse files Browse the repository at this point in the history
* setup tpu client methods required for localcluster to use TpuClient

* add new_tpu_quic_client() for local cluster tests

* update local-cluster src files to use TpuClient. tests next

* finish removing thinclient from localcluster

* address comments

* add note for send_and_confirm_transaction_with_retries

* remove retry logic from tpu-client. Send directly to upcoming leaders without retry.
  • Loading branch information
gregcusack authored and samkim-crypto committed Jul 31, 2024
1 parent 61ee9fc commit 2a55187
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 148 deletions.
3 changes: 1 addition & 2 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use {
solana_client::thin_client::ThinClient,
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
Expand Down Expand Up @@ -41,7 +40,7 @@ impl ClusterValidatorInfo {

pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<QuicTpuClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
&self,
Expand Down
122 changes: 68 additions & 54 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
/// discover the rest of the network.
use log::*;
use {
crate::cluster::QuicTpuClient,
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
solana_client::{
connection_cache::{ConnectionCache, Protocol},
thin_client::ThinClient,
},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_core::consensus::{
tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage},
VOTE_THRESHOLD_DEPTH,
Expand All @@ -24,8 +22,8 @@ use {
gossip_service::{self, discover_cluster, GossipService},
},
solana_ledger::blockstore::Blockstore,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
client::SyncClient,
clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
Expand All @@ -40,6 +38,7 @@ use {
transport::TransportError,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuSenderError},
solana_vote::vote_transaction::VoteTransaction,
solana_vote_program::vote_transaction,
std::{
Expand Down Expand Up @@ -89,31 +88,34 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
return;
}
let random_keypair = Keypair::new();
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
)
.expect("balance in source");
assert!(bal > 0);
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.unwrap();
let mut transaction =
let transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = client
.retry_transfer_until_confirmed(funding_keypair, &mut transaction, 10, confs)
client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
continue;
}
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(&sig, confs).unwrap();
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(&transaction.signatures[0], confs)
.unwrap();
}
});
}
Expand All @@ -123,10 +125,10 @@ pub fn verify_balances<S: ::std::hash::BuildHasher>(
node: &ContactInfo,
connection_cache: Arc<ConnectionCache>,
) {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache);
let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap();
for (pk, b) in expected_balances {
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(&pk, CommitmentConfig::processed())
.expect("balance in source");
assert_eq!(bal, b);
Expand All @@ -140,32 +142,33 @@ pub fn send_many_transactions(
max_tokens_per_transfer: u64,
num_txs: u64,
) -> HashMap<Pubkey, u64> {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap();
let mut expected_balances = HashMap::new();
for _ in 0..num_txs {
let random_keypair = Keypair::new();
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
)
.expect("balance in source");
assert!(bal > 0);
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer);

let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
transfer_amount,
blockhash,
);

client
.retry_transfer(funding_keypair, &mut transaction, 5)
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
Expand Down Expand Up @@ -238,14 +241,14 @@ pub fn kill_entry_and_spend_and_verify_rest(
)
.unwrap();
assert!(cluster_nodes.len() >= nodes);
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), entry_point_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(entry_point_info, connection_cache.clone()).unwrap();

// sleep long enough to make sure we are in epoch 3
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1);

for ingress_node in &cluster_nodes {
client
.rpc_client()
.poll_get_balance_with_commitment(ingress_node.pubkey(), CommitmentConfig::processed())
.unwrap_or_else(|err| panic!("Node {} has no balance: {}", ingress_node.pubkey(), err));
}
Expand All @@ -266,9 +269,9 @@ pub fn kill_entry_and_spend_and_verify_rest(
continue;
}

let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let balance = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
Expand All @@ -286,38 +289,27 @@ pub fn kill_entry_and_spend_and_verify_rest(

let random_keypair = Keypair::new();
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
1,
blockhash,
);

let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = {
let sig = client.retry_transfer_until_confirmed(
funding_keypair,
&mut transaction,
5,
confs,
);
match sig {
Err(e) => {
result = Err(e);
continue;
}

Ok(sig) => sig,
}
};
if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) {
result = Err(e);
continue;
}
info!("poll_all_nodes_for_signature()");
match poll_all_nodes_for_signature(
entry_point_info,
&cluster_nodes,
connection_cache,
&sig,
&transaction.signatures[0],
confs,
) {
Err(e) => {
Expand Down Expand Up @@ -353,10 +345,10 @@ pub fn check_min_slot_is_rooted(
let loop_start = Instant::now();
let loop_timeout = Duration::from_secs(180);
for ingress_node in contact_infos.iter() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
loop {
let root_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
if root_slot >= min_slot || last_print.elapsed().as_secs() > 3 {
Expand Down Expand Up @@ -394,9 +386,9 @@ pub fn check_for_new_roots(
assert!(loop_start.elapsed() < loop_timeout);

for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let root_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
roots[i].insert(root_slot);
Expand Down Expand Up @@ -427,13 +419,14 @@ pub fn check_no_new_roots(
.iter()
.enumerate()
.map(|(i, ingress_node)| {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let initial_root = client
.rpc_client()
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey()));
roots[i] = initial_root;
client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey()))
})
Expand All @@ -446,9 +439,9 @@ pub fn check_no_new_roots(
let mut reached_end_slot = false;
loop {
for contact_info in contact_infos {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(contact_info, connection_cache.clone()).unwrap();
current_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].pubkey()));
if current_slot > end_slot {
Expand All @@ -472,10 +465,10 @@ pub fn check_no_new_roots(
}

for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
assert_eq!(
client
.rpc_client()
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())),
roots[i]
Expand All @@ -494,9 +487,10 @@ fn poll_all_nodes_for_signature(
if validator.pubkey() == entry_point_info.pubkey() {
continue;
}
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(sig, confs)?;
let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(sig, confs)?;
}

Ok(())
Expand Down Expand Up @@ -678,3 +672,23 @@ pub fn submit_vote_to_cluster_gossip(
socket_addr_space,
)
}

pub fn new_tpu_quic_client(
contact_info: &ContactInfo,
connection_cache: Arc<ConnectionCache>,
) -> Result<QuicTpuClient, TpuSenderError> {
let rpc_pubsub_url = format!("ws://{}/", contact_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", contact_info.rpc().unwrap());

let cache = match &*connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"),
};

TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
}
Loading

0 comments on commit 2a55187

Please sign in to comment.