Skip to content

Commit

Permalink
BCI-2508: TXM duplicate nonces caused by trasmitchecker (#11546)
Browse files Browse the repository at this point in the history
* Broadcaster: run checker only for unstarted txs

* attempt to fix sigscanner

* fix txstore invariant check

* add invariant to ensure we are processing unstarted tx

* add explanation why we run check only on unstarted
  • Loading branch information
dhaidashenko committed Dec 19, 2023
1 parent 17420af commit 233445a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 27 deletions.
61 changes: 36 additions & 25 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,22 +485,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
return false, nil
}
n++
var a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
var retryable bool
a, _, _, retryable, err = eb.NewTxAttempt(ctx, *etx, eb.lggr)
if err != nil {
return retryable, fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err)
}

if err := eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &a); errors.Is(err, ErrTxRemoved) {
eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject)
continue
} else if err != nil {
return true, fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err)
}

if err, retryable := eb.handleInProgressTx(ctx, *etx, a, time.Now()); err != nil {
return retryable, fmt.Errorf("processUnstartedTxs failed on handleInProgressTx: %w", err)
if err, retryable := eb.handleUnstartedTx(ctx, etx); err != nil {
return retryable, fmt.Errorf("processUnstartedTxs failed on handleUnstartedTx: %w", err)
}
}
}
Expand All @@ -520,11 +507,14 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return nil, false
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
if etx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleUnstartedTx(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (error, bool) {
if etx.State != TxUnstarted {
return fmt.Errorf("invariant violation: expected transaction %v to be unstarted, it was %s", etx.ID, etx.State), false
}

attempt, _, _, retryable, err := eb.NewTxAttempt(ctx, *etx, eb.lggr)
if err != nil {
return fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err), retryable
}

checkerSpec, err := etx.GetChecker()
Expand All @@ -541,19 +531,40 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand

// If the transmit check does not complete within the timeout, the transaction will be sent
// anyway.
// It's intentional that we only run `Check` for unstarted transactions.
// Running it on other states might lead to nonce duplication, as we might mark applied transactions as fatally errored.

checkCtx, cancel := context.WithTimeout(ctx, TransmitCheckTimeout)
defer cancel()
err = checker.Check(checkCtx, lgr, etx, attempt)
err = checker.Check(checkCtx, lgr, *etx, attempt)
if errors.Is(err, context.Canceled) {
lgr.Warn("Transmission checker timed out, sending anyway")
} else if err != nil {
etx.Error = null.StringFrom(err.Error())
lgr.Warnw("Transmission checker failed, fatally erroring transaction.", "err", err)
return eb.saveFatallyErroredTransaction(lgr, &etx), true
return eb.saveFatallyErroredTransaction(lgr, etx), true
}
cancel()

lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "err", err, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
if err = eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &attempt); errors.Is(err, ErrTxRemoved) {
eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject)
return nil, false
} else if err != nil {
return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true
}

return eb.handleInProgressTx(ctx, *etx, attempt, time.Now())
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
if etx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false
}

lgr := etx.GetLogger(logger.With(eb.lggr, "fee", attempt.TxFee))
lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr)

if errType != client.Fatal {
Expand Down Expand Up @@ -760,8 +771,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
if etx.State != TxInProgress {
return fmt.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State)
if etx.State != TxInProgress && etx.State != TxUnstarted {
return fmt.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State)
}
if !etx.Error.Valid {
return errors.New("expected error field to be set")
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,8 +1561,8 @@ func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error {
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
if etx.State != txmgr.TxInProgress {
return pkgerrors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State)
if etx.State != txmgr.TxInProgress && etx.State != txmgr.TxUnstarted {
return pkgerrors.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State)
}
if !etx.Error.Valid {
return errors.New("expected error field to be set")
Expand Down

0 comments on commit 233445a

Please sign in to comment.