Skip to content

Commit

Permalink
finish removing thinclient from localcluster
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed May 10, 2024
1 parent e95b87c commit 13c4b28
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 62 deletions.
73 changes: 38 additions & 35 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use {
crate::cluster::QuicTpuClient,
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
solana_client::{
connection_cache::{ConnectionCache, Protocol},
thin_client::ThinClient,
},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_core::consensus::{
tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage},
VOTE_THRESHOLD_DEPTH,
Expand All @@ -27,7 +24,6 @@ use {
solana_ledger::blockstore::Blockstore,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
client::SyncClient,
clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
Expand Down Expand Up @@ -92,31 +88,34 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
return;
}
let random_keypair = Keypair::new();
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
)
.expect("balance in source");
assert!(bal > 0);
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.unwrap();
let mut transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = client
.retry_transfer_until_confirmed(funding_keypair, &mut transaction, 10, confs)
.send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 10, confs)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
continue;
}
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(&sig, confs).unwrap();
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(&sig, confs)
.unwrap();
}
});
}
Expand All @@ -126,10 +125,10 @@ pub fn verify_balances<S: ::std::hash::BuildHasher>(
node: &ContactInfo,
connection_cache: Arc<ConnectionCache>,
) {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache);
let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap();
for (pk, b) in expected_balances {
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(&pk, CommitmentConfig::processed())
.expect("balance in source");
assert_eq!(bal, b);
Expand All @@ -143,19 +142,20 @@ pub fn send_many_transactions(
max_tokens_per_transfer: u64,
num_txs: u64,
) -> HashMap<Pubkey, u64> {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap();
let mut expected_balances = HashMap::new();
for _ in 0..num_txs {
let random_keypair = Keypair::new();
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
)
.expect("balance in source");
assert!(bal > 0);
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer);
Expand All @@ -168,7 +168,7 @@ pub fn send_many_transactions(
);

client
.retry_transfer(funding_keypair, &mut transaction, 5)
.send_and_confirm_transaction_with_retries(&[funding_keypair], &mut transaction, 5, 0)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
Expand Down Expand Up @@ -241,14 +241,14 @@ pub fn kill_entry_and_spend_and_verify_rest(
)
.unwrap();
assert!(cluster_nodes.len() >= nodes);
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), entry_point_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(entry_point_info, connection_cache.clone()).unwrap();

// sleep long enough to make sure we are in epoch 3
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1);

for ingress_node in &cluster_nodes {
client
.rpc_client()
.poll_get_balance_with_commitment(ingress_node.pubkey(), CommitmentConfig::processed())
.unwrap_or_else(|err| panic!("Node {} has no balance: {}", ingress_node.pubkey(), err));
}
Expand All @@ -269,9 +269,9 @@ pub fn kill_entry_and_spend_and_verify_rest(
continue;
}

let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let balance = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
Expand All @@ -289,6 +289,7 @@ pub fn kill_entry_and_spend_and_verify_rest(

let random_keypair = Keypair::new();
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let mut transaction = system_transaction::transfer(
Expand All @@ -300,8 +301,8 @@ pub fn kill_entry_and_spend_and_verify_rest(

let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = {
let sig = client.retry_transfer_until_confirmed(
funding_keypair,
let sig = client.send_and_confirm_transaction_with_retries(
&[funding_keypair],
&mut transaction,
5,
confs,
Expand Down Expand Up @@ -356,10 +357,10 @@ pub fn check_min_slot_is_rooted(
let loop_start = Instant::now();
let loop_timeout = Duration::from_secs(180);
for ingress_node in contact_infos.iter() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
loop {
let root_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
if root_slot >= min_slot || last_print.elapsed().as_secs() > 3 {
Expand Down Expand Up @@ -397,9 +398,9 @@ pub fn check_for_new_roots(
assert!(loop_start.elapsed() < loop_timeout);

for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let root_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
roots[i].insert(root_slot);
Expand Down Expand Up @@ -430,13 +431,14 @@ pub fn check_no_new_roots(
.iter()
.enumerate()
.map(|(i, ingress_node)| {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let initial_root = client
.rpc_client()
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey()));
roots[i] = initial_root;
client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey()))
})
Expand All @@ -449,9 +451,9 @@ pub fn check_no_new_roots(
let mut reached_end_slot = false;
loop {
for contact_info in contact_infos {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(contact_info, connection_cache.clone()).unwrap();
current_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].pubkey()));
if current_slot > end_slot {
Expand All @@ -475,10 +477,10 @@ pub fn check_no_new_roots(
}

for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
assert_eq!(
client
.rpc_client()
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())),
roots[i]
Expand All @@ -497,9 +499,10 @@ fn poll_all_nodes_for_signature(
if validator.pubkey() == entry_point_info.pubkey() {
continue;
}
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(sig, confs)?;
let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(sig, confs)?;
}

Ok(())
Expand Down
Loading

0 comments on commit 13c4b28

Please sign in to comment.