Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.18: rpc-sts: add config options for stake-weighted qos (backport of #197) #341

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub struct Config {
pub batch_size: usize,
/// How frequently batches are sent
pub batch_send_rate_ms: u64,
pub tpu_peers: Option<Vec<SocketAddr>>,
}

impl Default for Config {
Expand All @@ -125,6 +126,7 @@ impl Default for Config {
service_max_retries: DEFAULT_SERVICE_MAX_RETRIES,
batch_size: DEFAULT_TRANSACTION_BATCH_SIZE,
batch_send_rate_ms: DEFAULT_BATCH_SEND_RATE_MS,
tpu_peers: None,
}
}
}
Expand Down Expand Up @@ -565,12 +567,18 @@ impl SendTransactionService {
stats: &SendTransactionServiceStats,
) {
// Processing the transactions in batch
let addresses = Self::get_tpu_addresses_with_slots(
let mut addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().map(|a| (a, 0)).collect::<Vec<_>>())
.unwrap_or_default();
let leader_addresses = Self::get_tpu_addresses_with_slots(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
addresses.extend(leader_addresses);

let wire_transactions = transactions
.iter()
Expand All @@ -583,8 +591,8 @@ impl SendTransactionService {
})
.collect::<Vec<&[u8]>>();

for address in &addresses {
Self::send_transactions(address.0, &wire_transactions, connection_cache, stats);
for (address, _) in &addresses {
Self::send_transactions(address, &wire_transactions, connection_cache, stats);
}
}

Expand Down Expand Up @@ -701,14 +709,20 @@ impl SendTransactionService {

let iter = wire_transactions.chunks(config.batch_size);
for chunk in iter {
let mut addresses = config
.tpu_peers
.as_ref()
.map(|addrs| addrs.iter().collect::<Vec<_>>())
.unwrap_or_default();
let mut leader_info_provider = leader_info_provider.lock().unwrap();
let leader_info = leader_info_provider.get_leader_info();
let addresses = Self::get_tpu_addresses(
let leader_addresses = Self::get_tpu_addresses(
tpu_address,
leader_info,
config,
connection_cache.protocol(),
);
addresses.extend(leader_addresses);

for address in &addresses {
Self::send_transactions(address, chunk, connection_cache, stats);
Expand Down
16 changes: 16 additions & 0 deletions validator/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,22 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
.default_value(&default_args.rpc_send_transaction_batch_size)
.help("The size of transactions to be sent in batch."),
)
.arg(
Arg::with_name("rpc_send_transaction_tpu_peer")
.long("rpc-send-transaction-tpu-peer")
.takes_value(true)
.number_of_values(1)
.multiple(true)
.value_name("HOST:PORT")
.validator(solana_net_utils::is_host_port)
.help("Peer(s) to broadcast transactions to instead of the current leader")
)
.arg(
Arg::with_name("rpc_send_transaction_also_leader")
.long("rpc-send-transaction-also-leader")
.requires("rpc_send_transaction_tpu_peer")
.help("With `--rpc-send-transaction-tpu-peer HOST:PORT`, also send to the current leader")
)
.arg(
Arg::with_name("rpc_scan_and_fix_roots")
.long("rpc-scan-and-fix-roots")
Expand Down
28 changes: 23 additions & 5 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,27 @@ pub fn main() {
);
exit(1);
}
let rpc_send_transaction_tpu_peers = matches
.values_of("rpc_send_transaction_tpu_peer")
.map(|values| {
values
.map(solana_net_utils::parse_host_port)
.collect::<Result<Vec<SocketAddr>, String>>()
})
.transpose()
.unwrap_or_else(|e| {
eprintln!("failed to parse rpc send-transaction-service tpu peer address: {e}");
exit(1);
});
let rpc_send_transaction_also_leader = matches.is_present("rpc_send_transaction_also_leader");
let leader_forward_count =
if rpc_send_transaction_tpu_peers.is_some() && !rpc_send_transaction_also_leader {
// rpc-sts is configured to send only to specific tpu peers. disable leader forwards
0
} else {
value_t_or_exit!(matches, "rpc_send_transaction_leader_forward_count", u64)
};

let full_api = matches.is_present("full_rpc_api");

let mut validator_config = ValidatorConfig {
Expand Down Expand Up @@ -1374,11 +1395,7 @@ pub fn main() {
contact_debug_interval,
send_transaction_service_config: send_transaction_service::Config {
retry_rate_ms: rpc_send_retry_rate_ms,
leader_forward_count: value_t_or_exit!(
matches,
"rpc_send_transaction_leader_forward_count",
u64
),
leader_forward_count,
default_max_retries: value_t!(
matches,
"rpc_send_transaction_default_max_retries",
Expand All @@ -1392,6 +1409,7 @@ pub fn main() {
),
batch_send_rate_ms: rpc_send_batch_send_rate_ms,
batch_size: rpc_send_batch_size,
tpu_peers: rpc_send_transaction_tpu_peers,
},
no_poh_speed_test: matches.is_present("no_poh_speed_test"),
no_os_memory_stats_reporting: matches.is_present("no_os_memory_stats_reporting"),
Expand Down
Loading