Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed May 14, 2024
1 parent dd7ad73 commit 57cede4
Showing 1 changed file with 9 additions and 38 deletions.
47 changes: 9 additions & 38 deletions tpu-client/src/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub use crate::nonblocking::tpu_client::TpuSenderError;
use {
crate::nonblocking::tpu_client::TpuClient as NonblockingTpuClient,
log::*,
rayon::iter::{IntoParallelIterator, ParallelIterator},
solana_connection_cache::{
client_connection::ClientConnection,
Expand All @@ -10,10 +11,9 @@ use {
},
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
client::AsyncClient,
clock::{Slot, MAX_PROCESSING_AGE},
signature::Signature,
transaction::{Transaction, VersionedTransaction},
transaction::Transaction,
transport::Result as TransportResult,
},
std::{
Expand Down Expand Up @@ -107,23 +107,23 @@ where

/// Serialize and send transaction to the current and upcoming leader TPUs according to fanout
/// size
/// Attempts to send and confirm tx "tries" times
/// Waits for signature confirmation before returning
/// Returns the transaction signature
/// Attempt to send and confirm tx "attempts" times
/// Wait for signature confirmation before returning
/// Return the transaction signature
pub fn send_and_confirm_transaction_with_retries<T: Signers + ?Sized>(
&self,
keypairs: &T,
transaction: &mut Transaction,
tries: usize,
attempts: usize,
pending_confirmations: usize,
) -> TransportResult<Signature> {
for x in 0..tries {
for attempt in 0..attempts {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;
// resend the same transaction until the transaction has no chance of succeeding
let wire_transaction =
bincode::serialize(&transaction).expect("transaction serialization failed");
bincode::serialize(&transaction).expect("should serialize transaction");

while now.elapsed().as_secs() < wait_time as u64 {
let leaders = self
Expand Down Expand Up @@ -154,7 +154,7 @@ where
);
}
}
log::info!("{x} tries failed transfer");
info!("{attempt} tries failed transfer");
let blockhash = self.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
}
Expand Down Expand Up @@ -267,35 +267,6 @@ where
}
}

impl<P, M, C> AsyncClient for TpuClient<P, M, C>
where
P: ConnectionPool<NewConnectionConfig = C>,
M: ConnectionManager<ConnectionPool = P, NewConnectionConfig = C>,
C: NewConnectionConfig,
{
fn async_send_versioned_transaction(
&self,
transaction: VersionedTransaction,
) -> TransportResult<Signature> {
let wire_transaction =
bincode::serialize(&transaction).expect("serialize Transaction in send_batch");
self.send_wire_transaction(wire_transaction);
Ok(transaction.signatures[0])
}

fn async_send_versioned_transaction_batch(
&self,
batch: Vec<VersionedTransaction>,
) -> TransportResult<()> {
let buffers = batch
.into_par_iter()
.map(|tx| bincode::serialize(&tx).expect("serialize Transaction in send_batch"))
.collect::<Vec<_>>();
self.try_send_wire_transaction_batch(buffers)?;
Ok(())
}
}

// 48 chosen because it's unlikely that 12 leaders in a row will miss their slots
const MAX_SLOT_SKIP_DISTANCE: u64 = 48;

Expand Down

0 comments on commit 57cede4

Please sign in to comment.