Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve the algorithm to schedule transaction replays #196

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions services/src/transaction_replayer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::tpu_utils::tpu_service::TpuService;
use anyhow::bail;
use anyhow::{bail, Context};
use log::error;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::{tx_store::TxStore, AnyhowJoinHandle};
Expand All @@ -23,19 +23,25 @@ pub struct TransactionReplay {
pub replay_at: Instant,
}

/// Transaction Replayer
/// It will replay transaction sent to the cluster if they are not confirmed
/// They will be replayed max_replay times
/// The replay time will be linearly increasing by after count * replay after
/// So the transasctions will be replayed like retry_after, retry_after*2, retry_after*3 ...

#[derive(Clone)]
pub struct TransactionReplayer {
pub tpu_service: TpuService,
pub tx_store: TxStore,
pub retry_after: Duration,
pub retry_offset: Duration,
}

impl TransactionReplayer {
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_after: Duration) -> Self {
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_offset: Duration) -> Self {
Self {
tpu_service,
tx_store,
retry_after,
retry_offset,
}
}

Expand All @@ -46,12 +52,19 @@ impl TransactionReplayer {
) -> AnyhowJoinHandle {
let tpu_service = self.tpu_service.clone();
let tx_store = self.tx_store.clone();
let retry_after = self.retry_after;
let retry_offset = self.retry_offset;

tokio::spawn(async move {
while let Some(mut tx_replay) = reciever.recv().await {
MESSAGES_IN_REPLAY_QUEUE.dec();
if Instant::now() < tx_replay.replay_at {
let now = Instant::now();
if now < tx_replay.replay_at {
if tx_replay.replay_at > now + retry_offset {
// requeue the transactions will be replayed after retry_after duration
sender.send(tx_replay).context("replay channel closed")?;
MESSAGES_IN_REPLAY_QUEUE.inc();
continue;
}
tokio::time::sleep_until(tx_replay.replay_at).await;
}
if let Some(tx) = tx_store.get(&tx_replay.signature) {
Expand All @@ -69,13 +82,10 @@ impl TransactionReplayer {

if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;
tx_replay.replay_at = Instant::now() + retry_after;
if let Err(e) = sender.send(tx_replay) {
error!("error while scheduling replay ({})", e);
continue;
} else {
MESSAGES_IN_REPLAY_QUEUE.inc();
}
tx_replay.replay_at =
Instant::now() + retry_offset.mul_f32(tx_replay.replay_count as f32);
sender.send(tx_replay).context("replay channel closed")?;
MESSAGES_IN_REPLAY_QUEUE.inc();
}
}
error!("transaction replay channel broken");
Expand Down
6 changes: 3 additions & 3 deletions services/src/transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl TransactionServiceBuilder {
replay_channel,
block_store,
max_retries,
replay_after: self.tx_replayer.retry_after,
replay_offset: self.tx_replayer.retry_offset,
},
jh_services,
)
Expand All @@ -101,7 +101,7 @@ pub struct TransactionService {
pub replay_channel: UnboundedSender<TransactionReplay>,
pub block_store: BlockInformationStore,
pub max_retries: usize,
pub replay_after: Duration,
pub replay_offset: Duration,
}

impl TransactionService {
Expand Down Expand Up @@ -146,7 +146,7 @@ impl TransactionService {
e
);
}
let replay_at = Instant::now() + self.replay_after;
let replay_at = Instant::now() + self.replay_offset;
// ignore error for replay service
if self
.replay_channel
Expand Down
Loading