diff --git a/.github/workflows/solidity-foundry.yml b/.github/workflows/solidity-foundry.yml index 45f1d8e9d79..abdb64d1763 100644 --- a/.github/workflows/solidity-foundry.yml +++ b/.github/workflows/solidity-foundry.yml @@ -23,6 +23,9 @@ jobs: - 'contracts/src/v0.8/**/*' - '.github/workflows/solidity-foundry.yml' - 'contracts/foundry.toml' + - 'contracts/gas-snapshots/*.gas-snapshot' + - '.gitmodules' + - 'contracts/foundry-lib' tests: strategy: diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 422e1a364b5..0bcf3e482ee 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -2,6 +2,7 @@ package txmgr import ( "context" + "database/sql" "fmt" "math/big" "sync" @@ -435,6 +436,20 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Trigger(ad // CreateTransaction inserts a new transaction func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTransaction(txRequest txmgrtypes.TxRequest[ADDR, TX_HASH], qs ...pg.QOpt) (tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) { + // Check for existing Tx with IdempotencyKey. If found, return the Tx and do nothing + // Skipping CreateTransaction to avoid double send + if txRequest.IdempotencyKey != nil { + var existingTx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + existingTx, err = b.txStore.FindTxWithIdempotencyKey(*txRequest.IdempotencyKey, b.chainID) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return tx, errors.Wrap(err, "Failed to search for transaction with IdempotencyKey") + } + if existingTx != nil { + b.logger.Infow("Found a Tx with IdempotencyKey. Returning existing Tx without creating a new one.", "IdempotencyKey", *txRequest.IdempotencyKey) + return *existingTx, nil + } + } + if err = b.checkEnabled(txRequest.FromAddress); err != nil { return tx, err } diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 055d7852bfd..9c812788bf6 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -322,6 +322,32 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxAttem return r0, r1 } +// FindTxWithIdempotencyKey provides a mock function with given fields: idempotencyKey, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithIdempotencyKey(idempotencyKey string, chainID CHAIN_ID) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(idempotencyKey, chainID) + + var r0 *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r1 error + if rf, ok := ret.Get(0).(func(string, CHAIN_ID) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(idempotencyKey, chainID) + } + if rf, ok := ret.Get(0).(func(string, CHAIN_ID) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(idempotencyKey, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + } + } + + if rf, ok := ret.Get(1).(func(string, CHAIN_ID) error); ok { + r1 = rf(idempotencyKey, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindTxWithSequence provides a mock function with given fields: fromAddress, seq func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxWithSequence(fromAddress ADDR, seq SEQ) (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { ret := _m.Called(fromAddress, seq) diff --git a/common/txmgr/types/tx.go b/common/txmgr/types/tx.go index a28c72fd363..a0faa81c3aa 100644 --- a/common/txmgr/types/tx.go +++ b/common/txmgr/types/tx.go @@ -66,6 +66,14 @@ func (s TxAttemptState) String() (str string) { } type TxRequest[ADDR types.Hashable, TX_HASH types.Hashable] struct { + // IdempotencyKey is a globally unique ID set by the caller, to prevent accidental creation of duplicated Txs during retries or crash recovery. + // If this field is set, the TXM will first search existing Txs with this field. + // If found, it will return the existing Tx, without creating a new one. TXM will not validate or ensure that existing Tx is same as the incoming TxRequest. + // If not found, TXM will create a new Tx. + // If IdempotencyKey is set to null, TXM will always create a new Tx. + // Since IdempotencyKey has to be globally unique, consider prepending the service or component's name it is being used by + // Such as {service}-{ID}. E.g vrf-12345 + IdempotencyKey *string FromAddress ADDR ToAddress ADDR EncodedPayload []byte @@ -178,6 +186,7 @@ type Tx[ FEE feetypes.Fee, ] struct { ID int64 + IdempotencyKey *string Sequence *SEQ FromAddress ADDR ToAddress ADDR diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 41a989fc769..8c56b8a6ed6 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -64,6 +64,9 @@ type TransactionStore[ FindTxAttemptsConfirmedMissingReceipt(chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindTxAttemptsRequiringReceiptFetch(chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindTxAttemptsRequiringResend(olderThan time.Time, maxInFlightTransactions uint32, chainID CHAIN_ID, address ADDR) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + // Search for Tx using the idempotencyKey and chainID + FindTxWithIdempotencyKey(idempotencyKey string, chainID CHAIN_ID) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + // Search for Tx using the fromAddress and sequence FindTxWithSequence(fromAddress ADDR, seq SEQ) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) FindNextUnstartedTransactionFromAddress(etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], fromAddress ADDR, chainID CHAIN_ID, qopts ...pg.QOpt) error FindTransactionsConfirmedInBlockRange(highBlockNumber, lowBlockNumber int64, chainID CHAIN_ID) (etxs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) diff --git a/contracts/gas-snapshots/automation-dev.gas-snapshot b/contracts/gas-snapshots/automation-dev.gas-snapshot index 1a939d69e19..badbfba604c 100644 --- a/contracts/gas-snapshots/automation-dev.gas-snapshot +++ b/contracts/gas-snapshots/automation-dev.gas-snapshot @@ -1,5 +1,5 @@ AutomationForwarder_forward:testBasicSuccess() (gas: 87630) -AutomationForwarder_forward:testNotAuthorizedReverts() (gas: 21681) +AutomationForwarder_forward:testNotAuthorizedReverts() (gas: 25427) AutomationForwarder_forward:testWrongFunctionSelectorSuccess() (gas: 17958) AutomationForwarder_updateRegistry:testBasicSuccess() (gas: 14577) -AutomationForwarder_updateRegistry:testNotFromRegistryNotAuthorizedReverts() (gas: 13893) \ No newline at end of file +AutomationForwarder_updateRegistry:testNotFromRegistryNotAuthorizedReverts() (gas: 17665) \ No newline at end of file diff --git a/contracts/gas-snapshots/functions.gas-snapshot b/contracts/gas-snapshots/functions.gas-snapshot index fa2c9dba497..7ee4f779d6a 100644 --- a/contracts/gas-snapshots/functions.gas-snapshot +++ b/contracts/gas-snapshots/functions.gas-snapshot @@ -1,17 +1,17 @@ -FunctionsOracle_sendRequest:testEmptyRequestDataReverts() (gas: 13430) +FunctionsOracle_sendRequest:testEmptyRequestDataReverts() (gas: 13452) FunctionsOracle_setDONPublicKey:testEmptyPublicKeyReverts() (gas: 10974) FunctionsOracle_setDONPublicKey:testOnlyOwnerReverts() (gas: 11255) FunctionsOracle_setDONPublicKey:testSetDONPublicKeySuccess() (gas: 126453) -FunctionsOracle_setDONPublicKey:testSetDONPublicKey_gas() (gas: 97558) +FunctionsOracle_setDONPublicKey:testSetDONPublicKey_gas() (gas: 97580) FunctionsOracle_setRegistry:testEmptyPublicKeyReverts() (gas: 10635) FunctionsOracle_setRegistry:testOnlyOwnerReverts() (gas: 10927) FunctionsOracle_setRegistry:testSetRegistrySuccess() (gas: 35791) FunctionsOracle_setRegistry:testSetRegistry_gas() (gas: 31987) FunctionsOracle_typeAndVersion:testTypeAndVersionSuccess() (gas: 6905) -FunctionsRouter_Pause:test_Pause_RevertIfNotOwner() (gas: 13293) -FunctionsRouter_Pause:test_Pause_Success() (gas: 20205) -FunctionsRouter_Unpause:test_Unpause_RevertIfNotOwner() (gas: 13338) -FunctionsRouter_Unpause:test_Unpause_Success() (gas: 77279) -FunctionsSubscriptions_createSubscription:test_CreateSubscription_RevertIfNotAllowedSender() (gas: 26347) -FunctionsSubscriptions_createSubscription:test_CreateSubscription_RevertIfPaused() (gas: 15699) +FunctionsRouter_Pause:test_Pause_RevertIfNotOwner() (gas: 13315) +FunctionsRouter_Pause:test_Pause_Success() (gas: 20254) +FunctionsRouter_Unpause:test_Unpause_RevertIfNotOwner() (gas: 13294) +FunctionsRouter_Unpause:test_Unpause_Success() (gas: 77334) +FunctionsSubscriptions_createSubscription:test_CreateSubscription_RevertIfNotAllowedSender() (gas: 26368) +FunctionsSubscriptions_createSubscription:test_CreateSubscription_RevertIfPaused() (gas: 15714) FunctionsSubscriptions_createSubscription:test_CreateSubscription_Success() (gas: 152436) \ No newline at end of file diff --git a/contracts/gas-snapshots/shared.gas-snapshot b/contracts/gas-snapshots/shared.gas-snapshot index cb2e8defe4a..cf003c5a26d 100644 --- a/contracts/gas-snapshots/shared.gas-snapshot +++ b/contracts/gas-snapshots/shared.gas-snapshot @@ -1,32 +1,32 @@ -BurnMintERC677_approve:testApproveSuccess() (gas: 52254) -BurnMintERC677_approve:testInvalidAddressReverts() (gas: 10641) -BurnMintERC677_burn:testBasicBurnSuccess() (gas: 164344) -BurnMintERC677_burn:testBurnFromZeroAddressReverts() (gas: 43462) -BurnMintERC677_burn:testExceedsBalanceReverts() (gas: 18035) +BurnMintERC677_approve:testApproveSuccess() (gas: 55248) +BurnMintERC677_approve:testInvalidAddressReverts() (gas: 10663) +BurnMintERC677_burn:testBasicBurnSuccess() (gas: 164342) +BurnMintERC677_burn:testBurnFromZeroAddressReverts() (gas: 47201) +BurnMintERC677_burn:testExceedsBalanceReverts() (gas: 21841) BurnMintERC677_burn:testSenderNotBurnerReverts() (gas: 13359) -BurnMintERC677_burnFrom:testBurnFromSuccess() (gas: 54662) -BurnMintERC677_burnFrom:testExceedsBalanceReverts() (gas: 32873) -BurnMintERC677_burnFrom:testInsufficientAllowanceReverts() (gas: 18107) +BurnMintERC677_burnFrom:testBurnFromSuccess() (gas: 57658) +BurnMintERC677_burnFrom:testExceedsBalanceReverts() (gas: 35864) +BurnMintERC677_burnFrom:testInsufficientAllowanceReverts() (gas: 21849) BurnMintERC677_burnFrom:testSenderNotBurnerReverts() (gas: 13359) -BurnMintERC677_burnFromAlias:testBurnFromSuccess() (gas: 54688) -BurnMintERC677_burnFromAlias:testExceedsBalanceReverts() (gas: 32889) -BurnMintERC677_burnFromAlias:testInsufficientAllowanceReverts() (gas: 18127) +BurnMintERC677_burnFromAlias:testBurnFromSuccess() (gas: 57684) +BurnMintERC677_burnFromAlias:testExceedsBalanceReverts() (gas: 35880) +BurnMintERC677_burnFromAlias:testInsufficientAllowanceReverts() (gas: 21869) BurnMintERC677_burnFromAlias:testSenderNotBurnerReverts() (gas: 13379) BurnMintERC677_constructor:testConstructorSuccess() (gas: 1669109) -BurnMintERC677_decreaseApproval:testDecreaseApprovalSuccess() (gas: 28520) -BurnMintERC677_grantMintAndBurnRoles:testGrantMintAndBurnRolesSuccess() (gas: 120049) -BurnMintERC677_grantRole:testGrantBurnAccessSuccess() (gas: 52707) +BurnMintERC677_decreaseApproval:testDecreaseApprovalSuccess() (gas: 28537) +BurnMintERC677_grantMintAndBurnRoles:testGrantMintAndBurnRolesSuccess() (gas: 120071) +BurnMintERC677_grantRole:testGrantBurnAccessSuccess() (gas: 52724) BurnMintERC677_grantRole:testGrantManySuccess() (gas: 935521) -BurnMintERC677_grantRole:testGrantMintAccessSuccess() (gas: 93588) +BurnMintERC677_grantRole:testGrantMintAccessSuccess() (gas: 93605) BurnMintERC677_increaseApproval:testIncreaseApprovalSuccess() (gas: 40911) BurnMintERC677_mint:testBasicMintSuccess() (gas: 149365) -BurnMintERC677_mint:testMaxSupplyExceededReverts() (gas: 46627) +BurnMintERC677_mint:testMaxSupplyExceededReverts() (gas: 50385) BurnMintERC677_mint:testSenderNotMinterReverts() (gas: 11195) BurnMintERC677_supportsInterface:testConstructorSuccess() (gas: 8685) -BurnMintERC677_transfer:testInvalidAddressReverts() (gas: 10617) +BurnMintERC677_transfer:testInvalidAddressReverts() (gas: 10639) BurnMintERC677_transfer:testTransferSuccess() (gas: 39462) OpStackBurnMintERC677_constructor:testConstructorSuccess() (gas: 1739317) -OpStackBurnMintERC677_interfaceCompatibility:testBurnCompatibility() (gas: 259440) -OpStackBurnMintERC677_interfaceCompatibility:testMintCompatibility() (gas: 137935) +OpStackBurnMintERC677_interfaceCompatibility:testBurnCompatibility() (gas: 263373) +OpStackBurnMintERC677_interfaceCompatibility:testMintCompatibility() (gas: 137957) OpStackBurnMintERC677_interfaceCompatibility:testStaticFunctionsCompatibility() (gas: 10622) OpStackBurnMintERC677_supportsInterface:testConstructorSuccess() (gas: 8961) \ No newline at end of file diff --git a/contracts/gas-snapshots/vrf.gas-snapshot b/contracts/gas-snapshots/vrf.gas-snapshot index 29053204765..44145caadaa 100644 --- a/contracts/gas-snapshots/vrf.gas-snapshot +++ b/contracts/gas-snapshots/vrf.gas-snapshot @@ -1,18 +1,18 @@ TrustedBlockhashStoreTest:testGenericBHSFunctions() (gas: 53507) -TrustedBlockhashStoreTest:testTrustedBHSFunctions() (gas: 49536) +TrustedBlockhashStoreTest:testTrustedBHSFunctions() (gas: 54617) VRFCoordinatorV2Plus_Migration:testDeregister() (gas: 101083) -VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenInvalidCaller() (gas: 29421) -VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenInvalidCoordinator() (gas: 19855) -VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenPendingFulfillment() (gas: 237702) -VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenReentrant() (gas: 354749) -VRFCoordinatorV2Plus_Migration:testMigration() (gas: 471631) -VRFCoordinatorV2Plus_Migration:testMigrationNoLink() (gas: 431052) -VRFV2Plus:testCancelSubWithNoLink() (gas: 160261) +VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenInvalidCaller() (gas: 33190) +VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenInvalidCoordinator() (gas: 19877) +VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenPendingFulfillment() (gas: 237679) +VRFCoordinatorV2Plus_Migration:testMigrateRevertsWhenReentrant() (gas: 357765) +VRFCoordinatorV2Plus_Migration:testMigration() (gas: 471605) +VRFCoordinatorV2Plus_Migration:testMigrationNoLink() (gas: 431075) +VRFV2Plus:testCancelSubWithNoLink() (gas: 160279) VRFV2Plus:testCreateSubscription() (gas: 181127) -VRFV2Plus:testGetActiveSubscriptionIds() (gas: 3453659) -VRFV2Plus:testRegisterProvingKey() (gas: 101025) -VRFV2Plus:testRequestAndFulfillRandomWordsLINK() (gas: 755178) -VRFV2Plus:testRequestAndFulfillRandomWordsNative() (gas: 705581) +VRFV2Plus:testGetActiveSubscriptionIds() (gas: 3453681) +VRFV2Plus:testRegisterProvingKey() (gas: 101085) +VRFV2Plus:testRequestAndFulfillRandomWordsLINK() (gas: 755144) +VRFV2Plus:testRequestAndFulfillRandomWordsNative() (gas: 705591) VRFV2Plus:testSetConfig() (gas: 73032) -VRFV2PlusWrapperTest:testRequestAndFulfillRandomWordsLINKWrapper() (gas: 390063) -VRFV2PlusWrapperTest:testRequestAndFulfillRandomWordsNativeWrapper() (gas: 290612) \ No newline at end of file +VRFV2PlusWrapperTest:testRequestAndFulfillRandomWordsLINKWrapper() (gas: 393885) +VRFV2PlusWrapperTest:testRequestAndFulfillRandomWordsNativeWrapper() (gas: 294434) \ No newline at end of file diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 1df4073e528..dbd3437aa82 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -152,6 +152,7 @@ func toOnchainReceipt(rs []*evmtypes.Receipt) []rawOnchainReceipt { // This is exported, as tests and other external code still directly reads DB using this schema. type DbEthTx struct { ID int64 + IdempotencyKey *string Nonce *int64 FromAddress common.Address ToAddress common.Address @@ -218,6 +219,7 @@ func DbEthTxToEthTx(dbEthTx DbEthTx, evmEthTx *Tx) { n := evmtypes.Nonce(*dbEthTx.Nonce) evmEthTx.Sequence = &n } + evmEthTx.IdempotencyKey = dbEthTx.IdempotencyKey evmEthTx.FromAddress = dbEthTx.FromAddress evmEthTx.ToAddress = dbEthTx.ToAddress evmEthTx.EncodedPayload = dbEthTx.EncodedPayload @@ -906,6 +908,21 @@ func (o *evmTxStore) FindReceiptsPendingConfirmation(ctx context.Context, blockN return } +// FindTxWithIdempotencyKey returns any broadcast ethtx with the given idempotencyKey and chainID +func (o *evmTxStore) FindTxWithIdempotencyKey(idempotencyKey string, chainID *big.Int) (etx *Tx, err error) { + var dbEtx DbEthTx + err = o.q.Get(&dbEtx, `SELECT * FROM eth_txes WHERE idempotency_key = $1 and evm_chain_id = $2`, idempotencyKey, chainID.String()) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, pkgerrors.Wrap(err, "FindTxWithIdempotencyKey failed to load eth_txes") + } + etx = new(Tx) + DbEthTxToEthTx(dbEtx, etx) + return +} + // FindTxWithSequence returns any broadcast ethtx with the given nonce func (o *evmTxStore) FindTxWithSequence(fromAddress common.Address, nonce evmtypes.Nonce) (etx *Tx, err error) { etx = new(Tx) @@ -1501,12 +1518,12 @@ func (o *evmTxStore) CreateTransaction(txRequest TxRequest, chainID *big.Int, qo } } err = tx.Get(&dbEtx, ` -INSERT INTO eth_txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker) +INSERT INTO eth_txes (from_address, to_address, encoded_payload, value, gas_limit, state, created_at, meta, subject, evm_chain_id, min_confirmations, pipeline_task_run_id, transmit_checker, idempotency_key) VALUES ( -$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11 +$1,$2,$3,$4,$5,'unstarted',NOW(),$6,$7,$8,$9,$10,$11,$12 ) RETURNING "eth_txes".* -`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker) +`, txRequest.FromAddress, txRequest.ToAddress, txRequest.EncodedPayload, assets.Eth(txRequest.Value), txRequest.FeeLimit, txRequest.Meta, txRequest.Strategy.Subject(), chainID.String(), txRequest.MinConfirmations, txRequest.PipelineTaskRunID, txRequest.Checker, txRequest.IdempotencyKey) if err != nil { return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert eth_tx") } diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index b4be30063b5..777a179be45 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -666,6 +666,35 @@ func TestORM_FindReceiptsPendingConfirmation(t *testing.T) { assert.Equal(t, tr.ID, receiptsPlus[0].ID) } +func Test_FindTxWithIdempotencyKey(t *testing.T) { + t.Parallel() + db := pgtest.NewSqlxDB(t) + cfg := newTestChainScopedConfig(t) + txStore := cltest.NewTestTxStore(t, db, cfg.Database()) + ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() + _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore, 0) + + t.Run("returns nil if no results", func(t *testing.T) { + idempotencyKey := "777" + etx, err := txStore.FindTxWithIdempotencyKey(idempotencyKey, big.NewInt(0)) + require.NoError(t, err) + assert.Nil(t, etx) + }) + + t.Run("returns transaction if it exists", func(t *testing.T) { + idempotencyKey := "777" + cfg.EVM().ChainID() + etx := cltest.MustCreateUnstartedGeneratedTx(t, txStore, fromAddress, big.NewInt(0), + cltest.EvmTxRequestWithIdempotencyKey(idempotencyKey)) + require.Equal(t, idempotencyKey, *etx.IdempotencyKey) + + res, err := txStore.FindTxWithIdempotencyKey(idempotencyKey, big.NewInt(0)) + require.NoError(t, err) + assert.Equal(t, etx.Sequence, res.Sequence) + require.Equal(t, idempotencyKey, *res.IdempotencyKey) + }) +} + func TestORM_FindTxWithSequence(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index ef481df3b70..4a06741a2f3 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -428,6 +428,32 @@ func (_m *EvmTxStore) FindTxWithAttempts(etxID int64) (types.Tx[*big.Int, common return r0, r1 } +// FindTxWithIdempotencyKey provides a mock function with given fields: idempotencyKey, chainID +func (_m *EvmTxStore) FindTxWithIdempotencyKey(idempotencyKey string, chainID *big.Int) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(idempotencyKey, chainID) + + var r0 *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r1 error + if rf, ok := ret.Get(0).(func(string, *big.Int) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(idempotencyKey, chainID) + } + if rf, ok := ret.Get(0).(func(string, *big.Int) *types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(idempotencyKey, chainID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + } + } + + if rf, ok := ret.Get(1).(func(string, *big.Int) error); ok { + r1 = rf(idempotencyKey, chainID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // FindTxWithSequence provides a mock function with given fields: fromAddress, seq func (_m *EvmTxStore) FindTxWithSequence(fromAddress common.Address, seq evmtypes.Nonce) (*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { ret := _m.Called(fromAddress, seq) diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 00333d7df87..854ccd6a4af 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -323,6 +323,51 @@ func TestTxm_CreateTransaction(t *testing.T) { require.NotNil(t, m.FwdrDestAddress) require.Equal(t, etx.ToAddress.String(), fwdrAddr.String()) }) + + t.Run("insert Tx successfully with a IdempotencyKey", func(t *testing.T) { + evmConfig.maxQueued = uint64(3) + id := uuid.New() + idempotencyKey := "1" + _, err := txm.CreateTransaction(txmgr.TxRequest{ + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + ToAddress: testutils.NewAddress(), + EncodedPayload: []byte{1, 2, 3}, + FeeLimit: 21000, + PipelineTaskRunID: &id, + Strategy: txmgrcommon.NewSendEveryStrategy(), + }) + assert.NoError(t, err) + }) + + t.Run("doesn't insert eth_tx if a matching tx already exists for that IdempotencyKey", func(t *testing.T) { + evmConfig.maxQueued = uint64(3) + id := uuid.New() + idempotencyKey := "2" + tx1, err := txm.CreateTransaction(txmgr.TxRequest{ + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + ToAddress: testutils.NewAddress(), + EncodedPayload: []byte{1, 2, 3}, + FeeLimit: 21000, + PipelineTaskRunID: &id, + Strategy: txmgrcommon.NewSendEveryStrategy(), + }) + assert.NoError(t, err) + + tx2, err := txm.CreateTransaction(txmgr.TxRequest{ + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + ToAddress: testutils.NewAddress(), + EncodedPayload: []byte{1, 2, 3}, + FeeLimit: 21000, + PipelineTaskRunID: &id, + Strategy: txmgrcommon.NewSendEveryStrategy(), + }) + assert.NoError(t, err) + + assert.Equal(t, tx1.GetID(), tx2.GetID()) + }) } func newMockTxStrategy(t *testing.T) *commontxmmocks.TxStrategy { diff --git a/core/internal/cltest/factories.go b/core/internal/cltest/factories.go index 800731f74a4..bfa479af624 100644 --- a/core/internal/cltest/factories.go +++ b/core/internal/cltest/factories.go @@ -348,6 +348,12 @@ func EvmTxRequestWithValue(value big.Int) func(*txmgr.TxRequest) { } } +func EvmTxRequestWithIdempotencyKey(idempotencyKey string) func(*txmgr.TxRequest) { + return func(tx *txmgr.TxRequest) { + tx.IdempotencyKey = &idempotencyKey + } +} + func MustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress common.Address, toAddress common.Address, encodedPayload []byte, gasLimit uint32, value big.Int, chainID *big.Int, opts ...interface{}) (tx txmgr.Tx) { txRequest := txmgr.TxRequest{ FromAddress: fromAddress, diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index 0f70e74b247..de2ee44c2ce 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -1248,6 +1248,7 @@ func Test_FindJobsByPipelineSpecIDs(t *testing.T) { jb, err := directrequest.ValidatedDirectRequestSpec(testspecs.DirectRequestSpec) require.NoError(t, err) + jb.DirectRequestSpec.EVMChainID = utils.NewBigI(0) err = orm.CreateJob(&jb) require.NoError(t, err) @@ -1270,6 +1271,28 @@ func Test_FindJobsByPipelineSpecIDs(t *testing.T) { require.NoError(t, err) assert.Len(t, jbs, 0) }) + + t.Run("with chainID disabled", func(t *testing.T) { + newCfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0] = &evmcfg.EVMConfig{ + ChainID: utils.NewBigI(0), + Enabled: ptr(false), + } + c.EVM = append(c.EVM, &evmcfg.EVMConfig{ + ChainID: utils.NewBigI(123123123), + Enabled: ptr(true), + }) + }) + relayExtenders2 := evmtest.NewChainRelayExtenders(t, evmtest.TestChainOpts{DB: db, GeneralConfig: newCfg, KeyStore: keyStore.Eth()}) + legacyChains2, err := evmrelay.NewLegacyChainsFromRelayerExtenders(relayExtenders2) + + require.NoError(t, err) + orm2 := NewTestORM(t, db, legacyChains2, pipelineORM, bridgesORM, keyStore, config.Database()) + + jbs, err := orm2.FindJobsByPipelineSpecIDs([]int32{jb.PipelineSpecID}) + require.NoError(t, err) + assert.Len(t, jbs, 1) + }) } func Test_FindPipelineRuns(t *testing.T) { diff --git a/core/services/job/orm.go b/core/services/job/orm.go index d7cfbd0fc4a..277b03b24dd 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -20,6 +20,7 @@ import ( "github.com/smartcontractkit/sqlx" "github.com/smartcontractkit/chainlink/v2/core/bridges" + "github.com/smartcontractkit/chainlink/v2/core/chains" "github.com/smartcontractkit/chainlink/v2/core/chains/evm" evmconfig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config" "github.com/smartcontractkit/chainlink/v2/core/config" @@ -1185,7 +1186,8 @@ func (o *orm) FindJobsByPipelineSpecIDs(ids []int32) ([]Job, error) { } for i := range jbs { err = o.LoadEnvConfigVars(&jbs[i]) - if err != nil { + //We must return the jobs even if the chainID is disabled + if err != nil && !errors.Is(err, chains.ErrNoSuchChainID) { return err } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go index 1d8500bcc61..3af93f873c1 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/block_time.go @@ -11,7 +11,7 @@ import ( ) var ( - defaultSampleSize = int64(10) + defaultSampleSize = int64(200) defaultBlockTime = time.Second * 1 ) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go index eccc9304d99..a21cdd25eb0 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/transmit/event_provider.go @@ -170,52 +170,54 @@ func (c *TransmitEventProvider) processLogs(latestBlock int64, logs ...logpoller // ensure we don't have duplicates continue } - if _, ok := c.cache.get(ocr2keepers.BlockNumber(log.BlockNumber), k); ok { - // ensure we return only unseen logs - continue - } - l, err := c.parseLog(c.registry, log) - if err != nil { - c.logger.Debugw("failed to parse log", "err", err) - continue - } - id := l.Id() - upkeepId := &ocr2keepers.UpkeepIdentifier{} - ok := upkeepId.FromBigInt(id) + + transmitEvent, ok := c.cache.get(ocr2keepers.BlockNumber(log.BlockNumber), k) if !ok { - return nil, core.ErrInvalidUpkeepID + l, err := c.parseLog(c.registry, log) + if err != nil { + c.logger.Debugw("failed to parse log", "err", err) + continue + } + id := l.Id() + upkeepId := &ocr2keepers.UpkeepIdentifier{} + ok := upkeepId.FromBigInt(id) + if !ok { + return nil, core.ErrInvalidUpkeepID + } + triggerW, err := core.UnpackTrigger(id, l.Trigger()) + if err != nil { + return nil, fmt.Errorf("%w: failed to unpack trigger", err) + } + trigger := ocr2keepers.NewTrigger( + ocr2keepers.BlockNumber(triggerW.BlockNum), + triggerW.BlockHash, + ) + switch core.GetUpkeepType(*upkeepId) { + case ocr2keepers.LogTrigger: + trigger.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{} + trigger.LogTriggerExtension.TxHash = triggerW.TxHash + trigger.LogTriggerExtension.Index = triggerW.LogIndex + default: + } + workID := core.UpkeepWorkID(*upkeepId, trigger) + transmitEvent = ocr2keepers.TransmitEvent{ + Type: l.TransmitEventType(), + TransmitBlock: ocr2keepers.BlockNumber(l.BlockNumber), + TransactionHash: l.TxHash, + WorkID: workID, + UpkeepID: *upkeepId, + CheckBlock: trigger.BlockNumber, + } } - triggerW, err := core.UnpackTrigger(id, l.Trigger()) - if err != nil { - return nil, fmt.Errorf("%w: failed to unpack trigger", err) - } - trigger := ocr2keepers.NewTrigger( - ocr2keepers.BlockNumber(triggerW.BlockNum), - triggerW.BlockHash, - ) - switch core.GetUpkeepType(*upkeepId) { - case ocr2keepers.LogTrigger: - trigger.LogTriggerExtension = &ocr2keepers.LogTriggerExtension{} - trigger.LogTriggerExtension.TxHash = triggerW.TxHash - trigger.LogTriggerExtension.Index = triggerW.LogIndex - default: - } - workID := core.UpkeepWorkID(*upkeepId, trigger) - e := ocr2keepers.TransmitEvent{ - Type: l.TransmitEventType(), - TransmitBlock: ocr2keepers.BlockNumber(l.BlockNumber), - Confirmations: latestBlock - l.BlockNumber, - TransactionHash: l.TxHash, - WorkID: workID, - UpkeepID: *upkeepId, - CheckBlock: trigger.BlockNumber, - } - vals = append(vals, e) - visited[k] = e + + transmitEvent.Confirmations = latestBlock - int64(transmitEvent.TransmitBlock) + + vals = append(vals, transmitEvent) + visited[k] = transmitEvent } // adding to the cache only after we've processed all the logs - // the next time we call processLogs we don't want to return these logs + // the next time we call processLogs we don't want to process these logs for k, e := range visited { c.cache.add(k, e) } diff --git a/core/store/migrate/migrations/0192_add_request_id_column_eth_txes_table.sql b/core/store/migrate/migrations/0192_add_request_id_column_eth_txes_table.sql new file mode 100644 index 00000000000..6a9ab7846d1 --- /dev/null +++ b/core/store/migrate/migrations/0192_add_request_id_column_eth_txes_table.sql @@ -0,0 +1,7 @@ +-- +goose Up + +ALTER TABLE eth_txes ADD COLUMN idempotency_key varchar(2000) UNIQUE; + +-- +goose Down + +ALTER TABLE eth_txes DROP COLUMN idempotency_key; diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 90d619cd19e..93ae8b611f5 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [dev] -... +### Fixed + - Fixed a bug that was preventing job runs to be displayed when the job `chainID` was disabled. + ## 2.5.0 - UNRELEASED