Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ThinClient from dos/ #117

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,19 @@ use {
transport::Result as TransportResult,
},
solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::sync::Arc,
};
pub use {
crate::nonblocking::tpu_client::TpuSenderError,
solana_tpu_client::tpu_client::{TpuClientConfig, DEFAULT_FANOUT_SLOTS, MAX_FANOUT_SLOTS},
};

pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
}

/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
Expand Down
2 changes: 1 addition & 1 deletion dos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ solana-logger = { workspace = true }
solana-measure = { workspace = true }
solana-net-utils = { workspace = true }
solana-perf = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc = { workspace = true }
solana-rpc-client = { workspace = true }
solana-sdk = { workspace = true }
Expand All @@ -38,4 +39,3 @@ targets = ["x86_64-unknown-linux-gnu"]

[dev-dependencies]
solana-local-cluster = { workspace = true }
solana-thin-client = { workspace = true }
83 changes: 48 additions & 35 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,15 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_client::{
connection_cache::ConnectionCache, tpu_client::TpuClientWrapper,
tpu_connection::TpuConnection,
},
solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
contact_info::Protocol,
gossip_service::{discover, get_multi_client},
gossip_service::{discover, get_client},
legacy_contact_info::LegacyContactInfo as ContactInfo,
},
solana_measure::measure::Measure,
Expand Down Expand Up @@ -791,33 +794,30 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};
let (client, num_clients) = get_multi_client(
&validators,
&SocketAddrSpace::Unspecified,
Arc::new(connection_cache),
);
if validators.len() < num_clients {
eprintln!(
"Error: Insufficient nodes discovered. Expecting {} or more",
validators.len()
);
exit(1);
}
(gossip_nodes, Some(Arc::new(client)))
let client = get_client(&validators, Arc::new(connection_cache));
(gossip_nodes, Some(client))
} else {
(vec![], None)
};

info!("done found {} nodes", nodes.len());

run_dos(&nodes, 0, client, cmd_params);
if let Some(tpu_client) = client {
match tpu_client {
TpuClientWrapper::Quic(quic_client) => {
run_dos(&nodes, 0, Some(Arc::new(quic_client)), cmd_params);
}
TpuClientWrapper::Udp(udp_client) => {
run_dos(&nodes, 0, Some(Arc::new(udp_client)), cmd_params);
}
};
}
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
pub mod test {
use {
super::*,
solana_client::thin_client::ThinClient,
solana_client::tpu_client::TpuClient,
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
Expand All @@ -826,16 +826,43 @@ pub mod test {
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
solana_tpu_client::tpu_client::TpuClientConfig,
};

const TEST_SEND_BATCH_SIZE: usize = 1;

// thin wrapper for the run_dos function
// to avoid specifying everywhere generic parameters
fn run_dos_no_client(nodes: &[ContactInfo], iterations: usize, params: DosClientParameters) {
run_dos::<ThinClient>(nodes, iterations, None, params);
run_dos::<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>>(
nodes, iterations, None, params,
);
}

fn build_tpu_quic_client(
cluster: &LocalCluster,
) -> Arc<TpuClient<QuicPool, QuicConnectionManager, QuicConfig>> {
let rpc_pubsub_url = format!("ws://{}/", cluster.entry_point_info.rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", cluster.entry_point_info.rpc().unwrap());

let ConnectionCache::Quic(cache) = &*cluster.connection_cache else {
panic!("Expected a Quic ConnectionCache.");
};

Arc::new(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
)
}

#[test]
Expand Down Expand Up @@ -975,14 +1002,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
let client = build_tpu_quic_client(&cluster);

// creates one transaction with 8 valid signatures and sends it 10 times
run_dos(
Expand Down Expand Up @@ -1114,14 +1134,7 @@ pub mod test {
.unwrap();
let nodes_slice = [node];

let client = Arc::new(ThinClient::new(
cluster.entry_point_info.rpc().unwrap(),
cluster
.entry_point_info
.tpu(cluster.connection_cache.protocol())
.unwrap(),
cluster.connection_cache.clone(),
));
let client = build_tpu_quic_client(&cluster);

// creates one transaction and sends it 10 times
// this is done in single thread
Expand Down
2 changes: 1 addition & 1 deletion gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ serde_derive = { workspace = true }
solana-bloom = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-entry = { workspace = true }
solana-frozen-abi = { workspace = true }
solana-frozen-abi-macro = { workspace = true }
Expand All @@ -44,7 +45,6 @@ solana-rayon-threadlimit = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-thin-client = { workspace = true }
solana-tpu-client = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
Expand Down
60 changes: 33 additions & 27 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use {
crate::{cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo},
crossbeam_channel::{unbounded, Sender},
rand::{thread_rng, Rng},
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper},
},
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
Expand Down Expand Up @@ -197,35 +201,37 @@ pub fn discover(
#[deprecated(since = "1.18.6", note = "Interface will change")]
pub fn get_client(
CriesofCarrots marked this conversation as resolved.
Show resolved Hide resolved
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> ThinClient {
let protocol = connection_cache.protocol();
let nodes: Vec<_> = nodes
.iter()
.filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.collect();
) -> TpuClientWrapper {
let select = thread_rng().gen_range(0..nodes.len());
let (rpc, tpu) = nodes[select];
ThinClient::new(rpc, tpu, connection_cache)
}

#[deprecated(since = "1.18.6", note = "Will be removed in favor of get_client")]
pub fn get_multi_client(
nodes: &[ContactInfo],
socket_addr_space: &SocketAddrSpace,
connection_cache: Arc<ConnectionCache>,
) -> (ThinClient, usize) {
let protocol = connection_cache.protocol();
let (rpc_addrs, tpu_addrs): (Vec<_>, Vec<_>) = nodes
.iter()
.filter_map(|node| node.valid_client_facing_addr(protocol, socket_addr_space))
.unzip();
let num_nodes = tpu_addrs.len();
(
ThinClient::new_from_addrs(rpc_addrs, tpu_addrs, connection_cache),
num_nodes,
)
Comment on lines -220 to -228
Copy link
Author

@gregcusack gregcusack Mar 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be spinning up rpc multiple clients under the wrapper of a ThinClient. I was able to get similar performance with a single TPU Quic client. This similar performance is why I changed it to a single TPU Client. However, I am not sure I have full context as to whether solana-dos needs to spin up multiple TPU clients for reasons other than maximizing rate_per_second

let rpc_pubsub_url = format!("ws://{}/", nodes[select].rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", nodes[select].rpc().unwrap());

match &*connection_cache {
ConnectionCache::Quic(cache) => TpuClientWrapper::Quic(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
),
ConnectionCache::Udp(cache) => TpuClientWrapper::Udp(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
TpuClientConfig::default(),
cache.clone(),
)
.unwrap_or_else(|err| {
panic!("Could not create TpuClient with Udp Cache {err:?}");
}),
),
}
}

fn spy(
Expand Down
33 changes: 0 additions & 33 deletions gossip/src/legacy_contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,6 @@ impl LegacyContactInfo {
pub fn is_valid_address(addr: &SocketAddr, socket_addr_space: &SocketAddrSpace) -> bool {
addr.port() != 0u16 && Self::is_valid_ip(addr.ip()) && socket_addr_space.check(addr)
}

pub(crate) fn valid_client_facing_addr(
&self,
protocol: Protocol,
socket_addr_space: &SocketAddrSpace,
) -> Option<(SocketAddr, SocketAddr)> {
Some((
self.rpc()
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
self.tpu(protocol)
.ok()
.filter(|addr| socket_addr_space.check(addr))?,
))
}
}

impl TryFrom<&ContactInfo> for LegacyContactInfo {
Expand Down Expand Up @@ -342,24 +327,6 @@ mod tests {
assert!(ci.serve_repair.ip().is_unspecified());
}

#[test]
fn test_valid_client_facing() {
let mut ci = LegacyContactInfo::default();
assert_eq!(
ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None
);
ci.tpu = socketaddr!(Ipv4Addr::LOCALHOST, 123);
assert_eq!(
ci.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified),
None
);
ci.rpc = socketaddr!(Ipv4Addr::LOCALHOST, 234);
assert!(ci
.valid_client_facing_addr(Protocol::QUIC, &SocketAddrSpace::Unspecified)
.is_some());
}

#[test]
fn test_sanitize() {
let mut ci = LegacyContactInfo::default();
Expand Down
2 changes: 1 addition & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading