From 233445a7799ef5d154310791d2c0930a37449077 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Tue, 19 Dec 2023 03:05:43 +0100 Subject: [PATCH] BCI-2508: TXM duplicate nonces caused by trasmitchecker (#11546) * Broadcaster: run checker only for unstarted txs * attempt to fix sigscanner * fix txstore invariant check * add invariant to ensure we are processing unstarted tx * add explanation why we run check only on unstarted --- common/txmgr/broadcaster.go | 61 ++++++++++++++++----------- core/chains/evm/txmgr/evm_tx_store.go | 4 +- 2 files changed, 38 insertions(+), 27 deletions(-) diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 7323fba7a81..9f2204f37e2 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -485,22 +485,9 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) proc return false, nil } n++ - var a txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] - var retryable bool - a, _, _, retryable, err = eb.NewTxAttempt(ctx, *etx, eb.lggr) - if err != nil { - return retryable, fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err) - } - - if err := eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &a); errors.Is(err, ErrTxRemoved) { - eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject) - continue - } else if err != nil { - return true, fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err) - } - if err, retryable := eb.handleInProgressTx(ctx, *etx, a, time.Now()); err != nil { - return retryable, fmt.Errorf("processUnstartedTxs failed on handleInProgressTx: %w", err) + if err, retryable := eb.handleUnstartedTx(ctx, etx); err != nil { + return retryable, fmt.Errorf("processUnstartedTxs failed on handleUnstartedTx: %w", err) } } } @@ -520,11 +507,14 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand return nil, false } -// There can be at most one in_progress transaction per address. -// Here we complete the job that we didn't finish last time. -func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) { - if etx.State != TxInProgress { - return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleUnstartedTx(ctx context.Context, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) (error, bool) { + if etx.State != TxUnstarted { + return fmt.Errorf("invariant violation: expected transaction %v to be unstarted, it was %s", etx.ID, etx.State), false + } + + attempt, _, _, retryable, err := eb.NewTxAttempt(ctx, *etx, eb.lggr) + if err != nil { + return fmt.Errorf("processUnstartedTxs failed on NewAttempt: %w", err), retryable } checkerSpec, err := etx.GetChecker() @@ -541,19 +531,40 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand // If the transmit check does not complete within the timeout, the transaction will be sent // anyway. + // It's intentional that we only run `Check` for unstarted transactions. + // Running it on other states might lead to nonce duplication, as we might mark applied transactions as fatally errored. + checkCtx, cancel := context.WithTimeout(ctx, TransmitCheckTimeout) defer cancel() - err = checker.Check(checkCtx, lgr, etx, attempt) + err = checker.Check(checkCtx, lgr, *etx, attempt) if errors.Is(err, context.Canceled) { lgr.Warn("Transmission checker timed out, sending anyway") } else if err != nil { etx.Error = null.StringFrom(err.Error()) lgr.Warnw("Transmission checker failed, fatally erroring transaction.", "err", err) - return eb.saveFatallyErroredTransaction(lgr, &etx), true + return eb.saveFatallyErroredTransaction(lgr, etx), true } cancel() - lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "err", err, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx) + if err = eb.txStore.UpdateTxUnstartedToInProgress(ctx, etx, &attempt); errors.Is(err, ErrTxRemoved) { + eb.lggr.Debugw("tx removed", "txID", etx.ID, "subject", etx.Subject) + return nil, false + } else if err != nil { + return fmt.Errorf("processUnstartedTxs failed on UpdateTxUnstartedToInProgress: %w", err), true + } + + return eb.handleInProgressTx(ctx, *etx, attempt, time.Now()) +} + +// There can be at most one in_progress transaction per address. +// Here we complete the job that we didn't finish last time. +func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) handleInProgressTx(ctx context.Context, etx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], initialBroadcastAt time.Time) (error, bool) { + if etx.State != TxInProgress { + return fmt.Errorf("invariant violation: expected transaction %v to be in_progress, it was %s", etx.ID, etx.State), false + } + + lgr := etx.GetLogger(logger.With(eb.lggr, "fee", attempt.TxFee)) + lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx) errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr) if errType != client.Fatal { @@ -760,8 +771,8 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) saveFatallyErroredTransaction(lgr logger.Logger, etx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ctx, cancel := eb.chStop.NewCtx() defer cancel() - if etx.State != TxInProgress { - return fmt.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State) + if etx.State != TxInProgress && etx.State != TxUnstarted { + return fmt.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State) } if !etx.Error.Valid { return errors.New("expected error field to be set") diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 9b46ab7a80c..da206c46c7d 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1561,8 +1561,8 @@ func (o *evmTxStore) UpdateTxFatalError(ctx context.Context, etx *Tx) error { ctx, cancel = o.mergeContexts(ctx) defer cancel() qq := o.q.WithOpts(pg.WithParentCtx(ctx)) - if etx.State != txmgr.TxInProgress { - return pkgerrors.Errorf("can only transition to fatal_error from in_progress, transaction is currently %s", etx.State) + if etx.State != txmgr.TxInProgress && etx.State != txmgr.TxUnstarted { + return pkgerrors.Errorf("can only transition to fatal_error from in_progress or unstarted, transaction is currently %s", etx.State) } if !etx.Error.Valid { return errors.New("expected error field to be set")