Skip to content

Commit

Permalink
add cluster trait. leave dependency on solana_client::tpu_client
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Mar 18, 2024
1 parent 54ca966 commit af87503
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 61 deletions.
5 changes: 3 additions & 2 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use {
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_local_cluster::{
local_cluster::{build_tpu_quic_client, ClusterConfig, LocalCluster},
cluster::Cluster,
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_rpc::rpc::JsonRpcConfig,
Expand Down Expand Up @@ -75,7 +76,7 @@ fn test_bench_tps_local_cluster(config: Config) {

cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);

let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| {
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));

Expand Down
2 changes: 2 additions & 0 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub use {
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};

pub type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;

pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
Expand Down
13 changes: 5 additions & 8 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,16 +818,15 @@ fn main() {
pub mod test {
use {
super::*,
solana_client::tpu_client::TpuClient,
solana_client::tpu_client::QuicTpuClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
solana_local_cluster::{
cluster::Cluster,
local_cluster::{build_tpu_quic_client, ClusterConfig, LocalCluster},
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
};
Expand All @@ -837,9 +836,7 @@ pub mod test {
// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
run_dos::<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>(
nodes, iterations, None, params,
);
run_dos::<QuicTpuClient>(nodes, iterations, None, params);
}

#[test]
Expand Down Expand Up @@ -979,7 +976,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| {
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));

Expand Down Expand Up @@ -1113,7 +1110,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = Arc::new(build_tpu_quic_client(&cluster).unwrap_or_else(|err| {
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));

Expand Down
11 changes: 8 additions & 3 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
solana_client::thin_client::ThinClient,
solana_client::{thin_client::ThinClient, tpu_client::QuicTpuClient},
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair},
solana_streamer::socket::SocketAddrSpace,
std::{path::PathBuf, sync::Arc},
std::{io::Result, path::PathBuf, sync::Arc},
};

pub struct ValidatorInfo {
Expand Down Expand Up @@ -38,6 +38,11 @@ impl ClusterValidatorInfo {
pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient>;
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo;
fn restart_node(
Expand Down
90 changes: 42 additions & 48 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use {
connection_cache::ConnectionCache,
rpc_client::RpcClient,
thin_client::ThinClient,
tpu_client::{TpuClient, TpuClientConfig},
tpu_client::{QuicTpuClient, TpuClient, TpuClientConfig},
},
solana_core::{
consensus::tower_storage::FileTowerStorage,
Expand All @@ -23,7 +23,6 @@ use {
gossip_service::discover_cluster,
},
solana_ledger::{create_new_tmp_ledger, shred::Shred},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_runtime::{
genesis_utils::{
create_genesis_config_with_vote_accounts_and_cluster_type, GenesisConfigInfo,
Expand Down Expand Up @@ -69,52 +68,6 @@ use {
},
};

pub fn build_tpu_quic_client(
cluster: &LocalCluster,
) -> Result<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
build_tpu_client(cluster, |rpc_url| Arc::new(RpcClient::new(rpc_url)))
}

pub fn build_tpu_quic_client_with_commitment(
cluster: &LocalCluster,
commitment_config: CommitmentConfig,
) -> Result<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
build_tpu_client(cluster, |rpc_url| {
Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config))
})
}

fn build_tpu_client<F>(
cluster: &LocalCluster,
rpc_client_builder: F,
) -> Result<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>
where
F: FnOnce(String) -> Arc<RpcClient>,
{
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());

let cache = match &*cluster.connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => {
return Err(Error::new(
ErrorKind::Other,
"Expected a Quic ConnectionCache. Got UDP",
))
}
};

let tpu_client = TpuClient::new_with_connection_cache(
rpc_client_builder(rpc_url),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?;

Ok(tpu_client)
}

const DUMMY_SNAPSHOT_CONFIG_PATH_MARKER: &str = "dummy";

pub struct ClusterConfig {
Expand Down Expand Up @@ -854,6 +807,34 @@ impl LocalCluster {
..SnapshotConfig::new_load_only()
}
}

fn build_tpu_client<F>(&self, rpc_client_builder: F) -> Result<QuicTpuClient>
where
F: FnOnce(String) -> Arc<RpcClient>,
{
let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap());

let cache = match &*self.connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => {
return Err(Error::new(
ErrorKind::Other,
"Expected a Quic ConnectionCache. Got UDP",
))
}
};

let tpu_client = TpuClient::new_with_connection_cache(
rpc_client_builder(rpc_url),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?;

Ok(tpu_client)
}
}

impl Cluster for LocalCluster {
Expand All @@ -872,6 +853,19 @@ impl Cluster for LocalCluster {
})
}

fn build_tpu_quic_client(&self) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url)))
}

fn build_tpu_quic_client_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| {
Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config))
})
}

fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo {
let mut node = self.validators.remove(pubkey).unwrap();

Expand Down

0 comments on commit af87503

Please sign in to comment.