Skip to content

Commit

Permalink
Remove ThinClient from dos/ (anza-xyz#117)
Browse files Browse the repository at this point in the history
* remove `ThinClient` from `dos/` and replace `ThinClient` with `TpuClient`

* remove test for valid_client_facing_addr since it is no longer used
  • Loading branch information
gregcusack authored and willhickey committed Mar 16, 2024
1 parent 096a1f4 commit d5c5f06
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 100 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ use {
transport::Result as TransportResult,
},
solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::sync::Arc,
};
pub use {
crate::nonblocking::tpu_client::TpuSenderError,
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};

pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
}

/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
Expand Down
2 changes: 1 addition & 1 deletion dos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ solana-logger = { workspace = true }
solana-measure = { workspace = true }
solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client = { workspace = true }
solana-sdk = { workspace = true }
Expand All @@ -38,4 +39,3 @@ targets = ["x86_64-unknown-linux-gnu"]

[dev-dependencies]
solana-local-cluster = { workspace = true }
solana-thin-client = { workspace = true }
83 changes: 48 additions & 35 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::{
connection_cache::ConnectionCache, tpu_client::TpuClientWrapper,
tpu_connection::TpuConnection,
},
solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
contact_info::Protocol,
gossip_service::{discover, get_multi_client},
gossip_service::{discover, get_client},
legacy_contact_info::LegacyContactInfo as ContactInfo,
},
solana_measure::measure::Measure,
Expand Down Expand Up @@ -791,33 +794,30 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};
let (client, num_clients) = get_multi_client(
&validators,
&SocketAddrSpace::Unspecified,
Arc::new(connection_cache),
);
if validators.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
validators.len()
);
exit(1);
}
(gossip_nodes, Some(Arc::new(client)))
let client = get_client(&validators, Arc::new(connection_cache));
(gossip_nodes, Some(client))
} else {
(vec![], None)
};

info!("done found {} nodes", nodes.len());

run_dos(&nodes, 0, client, cmd_params);
if let Some(tpu_client) = client {
match tpu_client {
TpuClientWrapper::Quic(quic_client) => {
run_dos(&nodes, 0, Some(Arc::new(quic_client)), cmd_params);
}
TpuClientWrapper::Udp(udp_client) => {
run_dos(&nodes, 0, Some(Arc::new(udp_client)), cmd_params);
}
};
}
}

#[cfg(test)]
pub mod test {
use {
super::*,
solana_client::thin_client::ThinClient,
solana_client::tpu_client::TpuClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
Expand All @@ -826,16 +826,43 @@ pub mod test {
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
solana_tpu_client::tpu_client::TpuClientConfig,
};

const TEST_SEND_BATCH_SIZE: usize = 1;

// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
run_dos::<ThinClient>(nodes, iterations, None, params);
run_dos::<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>(
nodes, iterations, None, params,
);
}

fn build_tpu_quic_client(
cluster: &LocalCluster,
) -> Arc<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());

let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};

Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
)
}

#[test]
Expand Down Expand Up @@ -975,14 +1002,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
let client = build_tpu_quic_client(&cluster);

// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
Expand Down Expand Up @@ -1114,14 +1134,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
let client = build_tpu_quic_client(&cluster);

// creates one transaction and sends it 10 times
// this is done in single thread
Expand Down
2 changes: 1 addition & 1 deletion gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_derive = { workspace = true }
solana-bloom = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-entry = { workspace = true }
solana-frozen-abi = { workspace = true }
solana-frozen-abi-macro = { workspace = true }
Expand All @@ -44,7 +45,6 @@ solana-rayon-threadlimit = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-thin-client = { workspace = true }
solana-tpu-client = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
Expand Down
60 changes: 33 additions & 27 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use {
crate::{cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo},
crossbeam_channel::{unbounded, Sender},
rand::{thread_rng, Rng},
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper},
},
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
Expand Down Expand Up @@ -197,35 +201,37 @@ pub fn discover(
#[deprecated(since = "1.18.6", note = "Interface will change")]
pub fn get_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> ThinClient {
let protocol = connection_cache.protocol();
let nodes: Vec<_> = nodes
.iter()
.filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.collect();
) -> TpuClientWrapper {
let select = thread_rng().gen_range(0..nodes.len());
let (rpc, tpu) = nodes[select];
ThinClient::new(rpc, tpu, connection_cache)
}

#[deprecated(since = "1.18.6", note = "Will be removed in favor of get_client")]
pub fn get_multi_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> (ThinClient, usize) {
let protocol = connection_cache.protocol();
let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes
.iter()
.filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.unzip();
let num_nodes = tpu_addrs.len();
(
ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache),
num_nodes,
)
let rpc_pubsub_url = format!("ws://{}/", nodes[select].rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", nodes[select].rpc().unwrap());

match &*connection_cache {
ConnectionCache::Quic(cache) => TpuClientWrapper::Quic(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
),
ConnectionCache::Udp(cache) => TpuClientWrapper::Udp(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Udp Cache {err:?}");
}),
),
}
}

fn spy(
Expand Down
33 changes: 0 additions & 33 deletions gossip/src/legacy_contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,6 @@ impl LegacyContactInfo {
pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool {
addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr)
}

pub(crate) fn valid_client_facing_addr(
&self,
protocol: Protocol,
socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> {
Some((
self.rpc()
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
self.tpu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
))
}
}

impl TryFrom<&ContactInfo> for LegacyContactInfo {
Expand Down Expand Up @@ -342,24 +327,6 @@ mod tests {
assert!(ci.serve_repair.ip().is_unspecified());
}

#[test]
fn test_valid_client_facing() {
let mut ci = LegacyContactInfo::default();
assert_eq!(
ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None
);
ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123);
assert_eq!(
ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None
);
ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234);
assert!(ci
.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified)
.is_some());
}

#[test]
fn test_sanitize() {
let mut ci = LegacyContactInfo::default();
Expand Down
2 changes: 1 addition & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d5c5f06

Please sign in to comment.