Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ThinClient from LocalCluster #1300

Merged
merged 7 commits into from
Jun 13, 2024
Merged
3 changes: 1 addition & 2 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use {
solana_client::thin_client::ThinClient,
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
Expand Down Expand Up @@ -41,7 +40,7 @@ impl ClusterValidatorInfo {

pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<QuicTpuClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
&self,
Expand Down
122 changes: 68 additions & 54 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
/// discover the rest of the network.
use log::*;
use {
crate::cluster::QuicTpuClient,
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
solana_client::{
connection_cache::{ConnectionCache, Protocol},
thin_client::ThinClient,
},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_core::consensus::{
tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage},
VOTE_THRESHOLD_DEPTH,
Expand All @@ -24,8 +22,8 @@ use {
gossip_service::{self, discover_cluster, GossipService},
},
solana_ledger::blockstore::Blockstore,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
client::SyncClient,
clock::{self, Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
Expand All @@ -40,6 +38,7 @@ use {
transport::TransportError,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuSenderError},
solana_vote::vote_transaction::VoteTransaction,
solana_vote_program::vote_transaction,
std::{
Expand Down Expand Up @@ -89,31 +88,34 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
return;
}
let random_keypair = Keypair::new();
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
)
.expect("balance in source");
assert!(bal > 0);
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.unwrap();
let mut transaction =
let transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = client
.retry_transfer_until_confirmed(funding_keypair, &mut transaction, 10, confs)
client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
continue;
}
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(&sig, confs).unwrap();
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(&transaction.signatures[0], confs)
.unwrap();
}
});
}
Expand All @@ -123,10 +125,10 @@ pub fn verify_balances<S: ::std::hash::BuildHasher>(
node: &ContactInfo,
connection_cache: Arc<ConnectionCache>,
) {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache);
let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap();
for (pk, b) in expected_balances {
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(&pk, CommitmentConfig::processed())
.expect("balance in source");
assert_eq!(bal, b);
Expand All @@ -140,32 +142,33 @@ pub fn send_many_transactions(
max_tokens_per_transfer: u64,
num_txs: u64,
) -> HashMap<Pubkey, u64> {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(node, connection_cache.clone()).unwrap();
let mut expected_balances = HashMap::new();
for _ in 0..num_txs {
let random_keypair = Keypair::new();
let bal = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
)
.expect("balance in source");
assert!(bal > 0);
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer);

let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
transfer_amount,
blockhash,
);

client
.retry_transfer(funding_keypair, &mut transaction, 5)
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
Expand Down Expand Up @@ -238,14 +241,14 @@ pub fn kill_entry_and_spend_and_verify_rest(
)
.unwrap();
assert!(cluster_nodes.len() >= nodes);
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), entry_point_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(entry_point_info, connection_cache.clone()).unwrap();

// sleep long enough to make sure we are in epoch 3
let first_two_epoch_slots = MINIMUM_SLOTS_PER_EPOCH * (3 + 1);

for ingress_node in &cluster_nodes {
client
.rpc_client()
.poll_get_balance_with_commitment(ingress_node.pubkey(), CommitmentConfig::processed())
.unwrap_or_else(|err| panic!("Node {} has no balance: {}", ingress_node.pubkey(), err));
}
Expand All @@ -266,9 +269,9 @@ pub fn kill_entry_and_spend_and_verify_rest(
continue;
}

let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let balance = client
.rpc_client()
.poll_get_balance_with_commitment(
&funding_keypair.pubkey(),
CommitmentConfig::processed(),
Expand All @@ -286,38 +289,27 @@ pub fn kill_entry_and_spend_and_verify_rest(

let random_keypair = Keypair::new();
let (blockhash, _) = client
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let mut transaction = system_transaction::transfer(
let transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
1,
blockhash,
);

let confs = VOTE_THRESHOLD_DEPTH + 1;
let sig = {
let sig = client.retry_transfer_until_confirmed(
funding_keypair,
&mut transaction,
5,
confs,
);
match sig {
Err(e) => {
result = Err(e);
continue;
}

Ok(sig) => sig,
}
};
if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) {
result = Err(e);
continue;
}
info!("poll_all_nodes_for_signature()");
match poll_all_nodes_for_signature(
entry_point_info,
&cluster_nodes,
connection_cache,
&sig,
&transaction.signatures[0],
confs,
) {
Err(e) => {
Expand Down Expand Up @@ -353,10 +345,10 @@ pub fn check_min_slot_is_rooted(
let loop_start = Instant::now();
let loop_timeout = Duration::from_secs(180);
for ingress_node in contact_infos.iter() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
loop {
let root_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
if root_slot >= min_slot || last_print.elapsed().as_secs() > 3 {
Expand Down Expand Up @@ -394,9 +386,9 @@ pub fn check_for_new_roots(
assert!(loop_start.elapsed() < loop_timeout);

for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let root_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::finalized())
.unwrap_or(0);
roots[i].insert(root_slot);
Expand Down Expand Up @@ -427,13 +419,14 @@ pub fn check_no_new_roots(
.iter()
.enumerate()
.map(|(i, ingress_node)| {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
let initial_root = client
.rpc_client()
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey()));
roots[i] = initial_root;
client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey()))
})
Expand All @@ -446,9 +439,9 @@ pub fn check_no_new_roots(
let mut reached_end_slot = false;
loop {
for contact_info in contact_infos {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), contact_info);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(contact_info, connection_cache.clone()).unwrap();
current_slot = client
.rpc_client()
.get_slot_with_commitment(CommitmentConfig::processed())
.unwrap_or_else(|_| panic!("get_slot for {} failed", contact_infos[0].pubkey()));
if current_slot > end_slot {
Expand All @@ -472,10 +465,10 @@ pub fn check_no_new_roots(
}

for (i, ingress_node) in contact_infos.iter().enumerate() {
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), ingress_node);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
let client = new_tpu_quic_client(ingress_node, connection_cache.clone()).unwrap();
assert_eq!(
client
.rpc_client()
.get_slot()
.unwrap_or_else(|_| panic!("get_slot for {} failed", ingress_node.pubkey())),
roots[i]
Expand All @@ -494,9 +487,10 @@ fn poll_all_nodes_for_signature(
if validator.pubkey() == entry_point_info.pubkey() {
continue;
}
let (rpc, tpu) = get_client_facing_addr(connection_cache.protocol(), validator);
let client = ThinClient::new(rpc, tpu, connection_cache.clone());
client.poll_for_signature_confirmation(sig, confs)?;
let client = new_tpu_quic_client(validator, connection_cache.clone()).unwrap();
client
.rpc_client()
.poll_for_signature_confirmation(sig, confs)?;
}

Ok(())
Expand Down Expand Up @@ -678,3 +672,23 @@ pub fn submit_vote_to_cluster_gossip(
socket_addr_space,
)
}

pub fn new_tpu_quic_client(
contact_info: &ContactInfo,
connection_cache: Arc<ConnectionCache>,
) -> Result<QuicTpuClient, TpuSenderError> {
let rpc_pubsub_url = format!("ws://{}/", contact_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", contact_info.rpc().unwrap());

let cache = match &*connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => panic!("Expected a Quic ConnectionCache. Got UDP"),
};

TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to clone here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya it is because cache is of type &Arc<ConnectionCache> and new_with_connection_cache() requires ownership over the Arc aka Arc<ConnectionCache>. If I try to match instead on connection_cache instead of &*connection_cache, then I end up matching on the Arc instead of the underlying connection_cache

)
}
Loading