Skip to content

Commit

Permalink
Changes after groovies review
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Sep 15, 2023
1 parent 8c79534 commit 5f48f21
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
14 changes: 7 additions & 7 deletions services/src/transaction_replayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ pub struct TransactionReplay {
/// Transaction Replayer
/// It will replay transaction sent to the cluster if they are not confirmed
/// They will be replayed max_replay times
/// The replay time will be exponentialy increasing by after count * replay after
/// The replay time will be linearly increasing by after count * replay after
/// So the transasctions will be replayed like retry_after, retry_after*2, retry_after*3 ...
#[derive(Clone)]
pub struct TransactionReplayer {
pub tpu_service: TpuService,
pub tx_store: TxStore,
pub retry_after: Duration,
pub retry_offset: Duration,
}

impl TransactionReplayer {
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_after: Duration) -> Self {
pub fn new(tpu_service: TpuService, tx_store: TxStore, retry_offset: Duration) -> Self {
Self {
tpu_service,
tx_store,
retry_after,
retry_offset,
}
}

Expand All @@ -52,14 +52,14 @@ impl TransactionReplayer {
) -> AnyhowJoinHandle {
let tpu_service = self.tpu_service.clone();
let tx_store = self.tx_store.clone();
let retry_after = self.retry_after;
let retry_offset = self.retry_offset;

tokio::spawn(async move {
while let Some(mut tx_replay) = reciever.recv().await {
MESSAGES_IN_REPLAY_QUEUE.dec();
let now = Instant::now();
if now < tx_replay.replay_at {
if tx_replay.replay_at > now + retry_after {
if tx_replay.replay_at > now + retry_offset {
// requeue the transactions will be replayed after retry_after duration
sender.send(tx_replay).context("replay channel closed")?;
MESSAGES_IN_REPLAY_QUEUE.inc();
Expand All @@ -83,7 +83,7 @@ impl TransactionReplayer {
if tx_replay.replay_count < tx_replay.max_replay {
tx_replay.replay_count += 1;
tx_replay.replay_at =
Instant::now() + retry_after.mul_f32(tx_replay.replay_count as f32);
Instant::now() + retry_offset.mul_f32(tx_replay.replay_count as f32);
sender.send(tx_replay).context("replay channel closed")?;
MESSAGES_IN_REPLAY_QUEUE.inc();
}
Expand Down
6 changes: 3 additions & 3 deletions services/src/transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl TransactionServiceBuilder {
replay_channel,
block_store,
max_retries,
replay_after: self.tx_replayer.retry_after,
replay_offset: self.tx_replayer.retry_offset,
},
jh_services,
)
Expand All @@ -101,7 +101,7 @@ pub struct TransactionService {
pub replay_channel: UnboundedSender<TransactionReplay>,
pub block_store: BlockInformationStore,
pub max_retries: usize,
pub replay_after: Duration,
pub replay_offset: Duration,
}

impl TransactionService {
Expand Down Expand Up @@ -146,7 +146,7 @@ impl TransactionService {
e
);
}
let replay_at = Instant::now() + self.replay_after;
let replay_at = Instant::now() + self.replay_offset;
// ignore error for replay service
if self
.replay_channel
Expand Down

0 comments on commit 5f48f21

Please sign in to comment.