diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 27aa1bea400ac4..3aec459a73ce0e 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -114,6 +114,7 @@ pub struct Config { pub batch_size: usize, /// How frequently batches are sent pub batch_send_rate_ms: u64, + pub tpu_peers: Option>, } impl Default for Config { @@ -125,6 +126,7 @@ impl Default for Config { service_max_retries: DEFAULT_SERVICE_MAX_RETRIES, batch_size: DEFAULT_TRANSACTION_BATCH_SIZE, batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS, + tpu_peers: None, } } } @@ -565,12 +567,18 @@ impl SendTransactionService { stats: &SendTransactionServiceStats, ) { // Processing the transactions in batch - let addresses = Self::get_tpu_addresses_with_slots( + let mut addresses = config + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) + .unwrap_or_default(); + let leader_addresses = Self::get_tpu_addresses_with_slots( tpu_address, leader_info, config, connection_cache.protocol(), ); + addresses.extend(leader_addresses); let wire_transactions = transactions .iter() @@ -583,8 +591,8 @@ impl SendTransactionService { }) .collect::>(); - for address in &addresses { - Self::send_transactions(address.0, &wire_transactions, connection_cache, stats); + for (address, _) in &addresses { + Self::send_transactions(address, &wire_transactions, connection_cache, stats); } } @@ -701,14 +709,20 @@ impl SendTransactionService { let iter = wire_transactions.chunks(config.batch_size); for chunk in iter { + let mut addresses = config + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().collect::>()) + .unwrap_or_default(); let mut leader_info_provider = leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let addresses = Self::get_tpu_addresses( + let leader_addresses = Self::get_tpu_addresses( tpu_address, leader_info, config, connection_cache.protocol(), ); + addresses.extend(leader_addresses); for address in &addresses { Self::send_transactions(address, chunk, connection_cache, stats); diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 2e40751840edf8..5a79463bd81e48 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1045,6 +1045,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .default_value(&default_args.rpc_send_transaction_batch_size) .help("The size of transactions to be sent in batch."), ) + .arg( + Arg::with_name("rpc_send_transaction_tpu_peer") + .long("rpc-send-transaction-tpu-peer") + .takes_value(true) + .number_of_values(1) + .multiple(true) + .value_name("HOST:PORT") + .validator(solana_net_utils::is_host_port) + .help("Peer(s) to broadcast transactions to instead of the current leader") + ) + .arg( + Arg::with_name("rpc_send_transaction_also_leader") + .long("rpc-send-transaction-also-leader") + .requires("rpc_send_transaction_tpu_peer") + .help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader") + ) .arg( Arg::with_name("rpc_scan_and_fix_roots") .long("rpc-scan-and-fix-roots") diff --git a/validator/src/main.rs b/validator/src/main.rs index 680397a9d3c73c..f101f103bbc958 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1266,6 +1266,27 @@ pub fn main() { ); exit(1); } + let rpc_send_transaction_tpu_peers = matches + .values_of("rpc_send_transaction_tpu_peer") + .map(|values| { + values + .map(solana_net_utils::parse_host_port) + .collect::, String>>() + }) + .transpose() + .unwrap_or_else(|e| { + eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}"); + exit(1); + }); + let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader"); + let leader_forward_count = + if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader { + // rpc-sts is configured to send only to specific tpu peers. disable leader forwards + 0 + } else { + value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64) + }; + let full_api = matches.is_present("full_rpc_api"); let mut validator_config = ValidatorConfig { @@ -1359,11 +1380,7 @@ pub fn main() { contact_debug_interval, send_transaction_service_config: send_transaction_service::Config { retry_rate_ms: rpc_send_retry_rate_ms, - leader_forward_count: value_t_or_exit!( - matches, - "rpc_send_transaction_leader_forward_count", - u64 - ), + leader_forward_count, default_max_retries: value_t!( matches, "rpc_send_transaction_default_max_retries", @@ -1377,6 +1394,7 @@ pub fn main() { ), batch_send_rate_ms: rpc_send_batch_send_rate_ms, batch_size: rpc_send_batch_size, + tpu_peers: rpc_send_transaction_tpu_peers, }, no_poh_speed_test: matches.is_present("no_poh_speed_test"), no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),