Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add in method for building a TpuClient for LocalCluster tests #258

Merged
merged 2 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 5 additions & 22 deletions bench-tps/tests/bench_tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ use {
cli::{Config, InstructionPaddingConfig},
send_batch::generate_durable_nonce_accounts,
},
solana_client::{
connection_cache::ConnectionCache,
tpu_client::{TpuClient, TpuClientConfig},
},
solana_client::tpu_client::{TpuClient, TpuClientConfig},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's import solana_tpu_client directly, since the dependency on ConnectionCache is going away. We may be able to remove solana_client entirely, eventually.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhh this is good to know. i was actually going to ask about this, since solana_client seems to be just a wrapper around solana_tpu_client. sounds good!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this needs to be separate PRs. Add in with dependency on solana_client and then another PR that switches over the dependencies to solana_tpu_client. A lot of the code including bench-tps relies on solana_client. For example, bench-tps implements BenchTpsClient for solana_client/tpu_client not solana_tpu_client/tpu_client. so switching this up in this PR as well may be large.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to do dependency updates in a separate PR, that's fine with me. I can definitely see the argument for that. Just let me know whether you think it makes more sense to do it before or after this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes more sense to do it after this PR. This code will be following same usage as previous code (aka using solana_client::tpu_client. I have another PR in drafts that will then switch over to using solana-tpu-client. Then I will make another PR that finally switches over ThinClient in LocalCluster to solana-tpu-client.

solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_local_cluster::{
cluster::Cluster,
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
Expand Down Expand Up @@ -78,24 +76,9 @@ fn test_bench_tps_local_cluster(config: Config) {

cluster.transfer(&cluster.funding_keypair, &faucet_pubkey, 100_000_000);

let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};

let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());

let client = Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient {err:?}");
}),
);
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));

let lamports_per_account = 100;

Expand Down
2 changes: 2 additions & 0 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub use {
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};

pub type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;

pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
Expand Down
39 changes: 8 additions & 31 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ fn main() {
pub mod test {
use {
super::*,
solana_client::tpu_client::TpuClient,
solana_client::tpu_client::QuicTpuClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
Expand All @@ -827,43 +827,16 @@ pub mod test {
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
solana_tpu_client::tpu_client::TpuClientConfig,
};

const TEST_SEND_BATCH_SIZE: usize = 1;

// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
run_dos::<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>(
nodes, iterations, None, params,
);
}

fn build_tpu_quic_client(
cluster: &LocalCluster,
) -> Arc<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());

let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};

Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
)
run_dos::<QuicTpuClient>(nodes, iterations, None, params);
}

#[test]
Expand Down Expand Up @@ -1003,7 +976,9 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = build_tpu_quic_client(&cluster);
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));

// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
Expand Down Expand Up @@ -1135,7 +1110,9 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = build_tpu_quic_client(&cluster);
let client = Arc::new(cluster.build_tpu_quic_client().unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}));

// creates one transaction and sends it 10 times
// this is done in single thread
Expand Down
1 change: 1 addition & 0 deletions local-cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ solana-gossip = { workspace = true }
solana-ledger = { workspace = true }
solana-logger = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc-client = { workspace = true }
solana-rpc-client-api = { workspace = true }
solana-runtime = { workspace = true }
Expand Down
11 changes: 8 additions & 3 deletions local-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use {
solana_client::thin_client::ThinClient,
solana_client::{thin_client::ThinClient, tpu_client::QuicTpuClient},
solana_core::validator::{Validator, ValidatorConfig},
solana_gossip::{cluster_info::Node, contact_info::ContactInfo},
solana_ledger::shred::Shred,
solana_sdk::{pubkey::Pubkey, signature::Keypair},
solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey, signature::Keypair},
solana_streamer::socket::SocketAddrSpace,
std::{path::PathBuf, sync::Arc},
std::{io::Result, path::PathBuf, sync::Arc},
};

pub struct ValidatorInfo {
Expand Down Expand Up @@ -38,6 +38,11 @@ impl ClusterValidatorInfo {
pub trait Cluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey>;
fn get_validator_client(&self, pubkey: &Pubkey) -> Option<ThinClient>;
fn build_tpu_quic_client(&self) -> Result<QuicTpuClient>;
fn build_tpu_quic_client_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient>;
fn get_contact_info(&self, pubkey: &Pubkey) -> Option<&ContactInfo>;
fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo;
fn restart_node(
Expand Down
48 changes: 47 additions & 1 deletion local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use {
itertools::izip,
log::*,
solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this dependency on solana_rpc_client directly, so that it's more obvious when it's time to remove solana_client

thin_client::ThinClient,
tpu_client::{QuicTpuClient, TpuClient, TpuClientConfig},
},
solana_core::{
consensus::tower_storage::FileTowerStorage,
validator::{Validator, ValidatorConfig, ValidatorStartProgress},
Expand Down Expand Up @@ -802,6 +807,34 @@ impl LocalCluster {
..SnapshotConfig::new_load_only()
}
}

fn build_tpu_client<F>(&self, rpc_client_builder: F) -> Result<QuicTpuClient>
where
F: FnOnce(String) -> Arc<RpcClient>,
{
let rpc_pubsub_url = format!("ws://{}/", self.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap());

let cache = match &*self.connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => {
return Err(Error::new(
ErrorKind::Other,
"Expected a Quic ConnectionCache. Got UDP",
))
}
};

let tpu_client = TpuClient::new_with_connection_cache(
rpc_client_builder(rpc_url),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.map_err(|err| Error::new(ErrorKind::Other, format!("TpuSenderError: {}", err)))?;

Ok(tpu_client)
}
}

impl Cluster for LocalCluster {
Expand All @@ -820,6 +853,19 @@ impl Cluster for LocalCluster {
})
}

fn build_tpu_quic_client(&self) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| Arc::new(RpcClient::new(rpc_url)))
}

fn build_tpu_quic_client_with_commitment(
&self,
commitment_config: CommitmentConfig,
) -> Result<QuicTpuClient> {
self.build_tpu_client(|rpc_url| {
Arc::new(RpcClient::new_with_commitment(rpc_url, commitment_config))
})
}

fn exit_node(&mut self, pubkey: &Pubkey) -> ClusterValidatorInfo {
let mut node = self.validators.remove(pubkey).unwrap();

Expand Down
Loading