Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BCI-2508: TXM duplicate nonces caused by trasmitchecker #11546

Merged
merged 8 commits into from
Dec 19, 2023
61 changes: 36 additions & 25 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,22 +485,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc
return false, nil
}
n++
var a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
var retryable bool
a, _, _, retryable, err = eb.NewTxAttempt(ctx, *etx, eb.lggr)
if err != nil {
return retryable, fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err)
}

if err := eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &a); errors.Is(err, ErrTxRemoved) {
eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject)
continue
} else if err != nil {
return true, fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err)
}

if err, retryable := eb.handleInProgressTx(ctx, *etx, a, time.Now()); err != nil {
return retryable, fmt.Errorf("processUnstartedTxs failed on handleInProgressTx: %w", err)
if err, retryable := eb.handleUnstartedTx(ctx, etx); err != nil {
return retryable, fmt.Errorf("processUnstartedTxs failed on handleUnstartedTx: %w", err)
}
}
}
Expand All @@ -520,11 +507,14 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
return nil, false
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
if etx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleUnstartedTx(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (error, bool) {
if etx.State != TxUnstarted {
return fmt.Errorf("invariant violation: expected transaction %v to be unstarted, it was %s", etx.ID, etx.State), false
}

attempt, _, _, retryable, err := eb.NewTxAttempt(ctx, *etx, eb.lggr)
if err != nil {
return fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err), retryable
}

checkerSpec, err := etx.GetChecker()
Expand All @@ -541,19 +531,40 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand

// If the transmit check does not complete within the timeout, the transaction will be sent
// anyway.
// It's intentional that we only run `Check` for unstarted transactions.
// Running it on other states might lead to nonce duplication, as we might mark applied transactions as fatally errored.

checkCtx, cancel := context.WithTimeout(ctx, TransmitCheckTimeout)
defer cancel()
err = checker.Check(checkCtx, lgr, etx, attempt)
err = checker.Check(checkCtx, lgr, *etx, attempt)
if errors.Is(err, context.Canceled) {
lgr.Warn("Transmission checker timed out, sending anyway")
} else if err != nil {
etx.Error = null.StringFrom(err.Error())
lgr.Warnw("Transmission checker failed, fatally erroring transaction.", "err", err)
return eb.saveFatallyErroredTransaction(lgr, &etx), true
return eb.saveFatallyErroredTransaction(lgr, etx), true
}
cancel()

lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "err", err, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
if err = eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &attempt); errors.Is(err, ErrTxRemoved) {
eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject)
return nil, false
} else if err != nil {
return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true
}

return eb.handleInProgressTx(ctx, *etx, attempt, time.Now())
}

// There can be at most one in_progress transaction per address.
// Here we complete the job that we didn't finish last time.
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) {
if etx.State != TxInProgress {
return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false
}

lgr := etx.GetLogger(logger.With(eb.lggr, "fee", attempt.TxFee))
lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx)
errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr)

if errType != client.Fatal {
Expand Down Expand Up @@ -760,8 +771,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
ctx, cancel := eb.chStop.NewCtx()
defer cancel()
if etx.State != TxInProgress {
return fmt.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State)
if etx.State != TxInProgress && etx.State != TxUnstarted {
return fmt.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State)
}
if !etx.Error.Valid {
return errors.New("expected error field to be set")
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,8 +1561,8 @@ func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error {
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
if etx.State != txmgr.TxInProgress {
return pkgerrors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State)
if etx.State != txmgr.TxInProgress && etx.State != txmgr.TxUnstarted {
return pkgerrors.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State)
}
if !etx.Error.Valid {
return errors.New("expected error field to be set")
Expand Down
Loading