From 608ea0a467ee36e15fdc654a88494ae579d778a6 Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Wed, 13 Mar 2024 19:34:08 +0100 Subject: [PATCH] HeadTracker finalization support (#12082) * Add LatestFinalizedBlock to HeadTracker * Added LatestFinalizedHead to Head * remove unused func * fix flakey nil pointer * improve logs & address lint issue * nitpicks * fixed copy on heads on MarkFinalized * error instead of panic * return error instead of panic * nitpicks * Finalized block based history depth * simplify trimming * nit fixes * fix build issues caused by merge * regen * FIx rpc client mock generation * nit fixes * nit fixes * update comments * ensure that we trim redundant blocks both in slice and in chain in Heads handle corner case for multiple uncle blocks at the end of the slice * nit fix * Update common/headtracker/head_tracker.go Co-authored-by: Dimitris Grigoriou * HeadTracker backfill test with 0 finality depth * docs * Update docs/CHANGELOG.md Co-authored-by: Dimitris Grigoriou * ensure latest finalized block is valid on startup * changeset * switch from warn to debug level when we failed to makr block as finalized --------- Co-authored-by: Dimitris Grigoriou --- .changeset/healthy-toes-destroy.md | 5 + common/client/mock_rpc_test.go | 28 ++ common/client/multi_node.go | 9 + common/client/types.go | 1 + common/headtracker/head_tracker.go | 155 ++++--- common/headtracker/types/client.go | 2 + common/headtracker/types/config.go | 1 + common/mocks/head_tracker.go | 10 +- common/types/head_tracker.go | 11 +- core/chains/evm/client/chain_client.go | 4 + core/chains/evm/client/client.go | 5 + core/chains/evm/client/mocks/client.go | 30 ++ core/chains/evm/client/mocks/rpc_client.go | 30 ++ core/chains/evm/client/null_client.go | 5 + core/chains/evm/client/rpc_client.go | 10 +- .../evm/client/simulated_backend_client.go | 11 + core/chains/evm/headtracker/config.go | 1 + .../evm/headtracker/head_broadcaster_test.go | 5 +- core/chains/evm/headtracker/head_saver.go | 37 +- .../chains/evm/headtracker/head_saver_test.go | 79 +++- core/chains/evm/headtracker/head_tracker.go | 2 +- .../evm/headtracker/head_tracker_test.go | 410 +++++++++--------- core/chains/evm/headtracker/heads.go | 70 ++- core/chains/evm/headtracker/heads_test.go | 83 +++- core/chains/evm/headtracker/mocks/config.go | 18 + core/chains/evm/headtracker/orm.go | 25 +- core/chains/evm/headtracker/orm_test.go | 9 +- core/chains/evm/types/head_test.go | 51 +++ core/chains/evm/types/models.go | 10 + core/config/docs/chains-evm.toml | 4 +- core/internal/cltest/cltest.go | 3 +- docs/CHANGELOG.md | 1 + docs/CONFIG.md | 4 +- 33 files changed, 782 insertions(+), 347 deletions(-) create mode 100644 .changeset/healthy-toes-destroy.md create mode 100644 core/chains/evm/types/head_test.go diff --git a/.changeset/healthy-toes-destroy.md b/.changeset/healthy-toes-destroy.md new file mode 100644 index 00000000000..1c027fdcd01 --- /dev/null +++ b/.changeset/healthy-toes-destroy.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +HeadTracker now respects the `FinalityTagEnabled` config option. If the flag is enabled, HeadTracker backfills blocks up to the latest finalized block provided by the corresponding RPC call. To address potential misconfigurations, `HistoryDepth` is now calculated from the latest finalized block instead of the head. NOTE: Consumers (e.g. TXM and LogPoller) do not fully utilize Finality Tag yet. diff --git a/common/client/mock_rpc_test.go b/common/client/mock_rpc_test.go index 60e0cb4b421..731d0f94cf2 100644 --- a/common/client/mock_rpc_test.go +++ b/common/client/mock_rpc_test.go @@ -454,6 +454,34 @@ func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS return r0, r1 } +// LatestFinalizedBlock provides a mock function with given fields: ctx +func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LatestFinalizedBlock") + } + + var r0 HEAD + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (HEAD, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) HEAD); ok { + r0 = rf(ctx) + } else { + r0 = ret.Get(0).(HEAD) + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // PendingCallContract provides a mock function with given fields: ctx, msg func (_m *mockRPC[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, BATCH_ELEM]) PendingCallContract(ctx context.Context, msg interface{}) ([]byte, error) { ret := _m.Called(ctx, msg) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index e86a7631982..cc8daed599c 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -819,3 +819,12 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP } return n.RPC().TransactionReceipt(ctx, txHash) } + +func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OPS, TX_RECEIPT, FEE, HEAD, RPC_CLIENT, BATCH_ELEM]) LatestFinalizedBlock(ctx context.Context) (head HEAD, err error) { + n, err := c.selectNode() + if err != nil { + return head, err + } + + return n.RPC().LatestFinalizedBlock(ctx) +} diff --git a/common/client/types.go b/common/client/types.go index 485a0b2671a..8d7b5b71b83 100644 --- a/common/client/types.go +++ b/common/client/types.go @@ -117,6 +117,7 @@ type clientAPI[ BlockByNumber(ctx context.Context, number *big.Int) (HEAD, error) BlockByHash(ctx context.Context, hash BLOCK_HASH) (HEAD, error) LatestBlockHeight(context.Context) (*big.Int, error) + LatestFinalizedBlock(ctx context.Context) (HEAD, error) // Events FilterEvents(ctx context.Context, query EVENT_OPS) ([]EVENT, error) diff --git a/common/headtracker/head_tracker.go b/common/headtracker/head_tracker.go index 4cc152fb9fe..eb9d72f123c 100644 --- a/common/headtracker/head_tracker.go +++ b/common/headtracker/head_tracker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math/big" "sync" "time" @@ -96,18 +97,6 @@ func NewHeadTracker[ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error { return ht.StartOnce("HeadTracker", func() error { ht.log.Debugw("Starting HeadTracker", "chainID", ht.chainID) - latestChain, err := ht.headSaver.Load(ctx) - if err != nil { - return err - } - if latestChain.IsValid() { - ht.log.Debugw( - fmt.Sprintf("HeadTracker: Tracking logs from last block %v with hash %s", latestChain.BlockNumber(), latestChain.BlockHash()), - "blockNumber", latestChain.BlockNumber(), - "blockHash", latestChain.BlockHash(), - ) - } - // NOTE: Always try to start the head tracker off with whatever the // latest head is, without waiting for the subscription to send us one. // @@ -115,18 +104,12 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error // anyway when we connect (but we should not rely on this because it is // not specced). If it happens this is fine, and the head will be // ignored as a duplicate. - initialHead, err := ht.getInitialHead(ctx) + err := ht.handleInitialHead(ctx) if err != nil { - if errors.Is(err, ctx.Err()) { - return nil + if ctx.Err() != nil { + return ctx.Err() } - ht.log.Errorw("Error getting initial head", "err", err) - } else if initialHead.IsValid() { - if err := ht.handleNewHead(ctx, initialHead); err != nil { - return fmt.Errorf("error handling initial head: %w", err) - } - } else { - ht.log.Debug("Got nil initial head") + ht.log.Errorw("Error handling initial head", "err", err) } ht.wgDone.Add(3) @@ -140,6 +123,49 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Start(ctx context.Context) error }) } +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleInitialHead(ctx context.Context) error { + initialHead, err := ht.client.HeadByNumber(ctx, nil) + if err != nil { + return fmt.Errorf("failed to fetch initial head: %w", err) + } + + if !initialHead.IsValid() { + ht.log.Warnw("Got nil initial head", "head", initialHead) + return nil + } + ht.log.Debugw("Got initial head", "head", initialHead, "blockNumber", initialHead.BlockNumber(), "blockHash", initialHead.BlockHash()) + + latestFinalized, err := ht.calculateLatestFinalized(ctx, initialHead) + if err != nil { + return fmt.Errorf("failed to calculate latest finalized head: %w", err) + } + + if !latestFinalized.IsValid() { + return fmt.Errorf("latest finalized block is not valid") + } + + latestChain, err := ht.headSaver.Load(ctx, latestFinalized.BlockNumber()) + if err != nil { + return fmt.Errorf("failed to initialized headSaver: %w", err) + } + + if latestChain.IsValid() { + earliest := latestChain.EarliestHeadInChain() + ht.log.Debugw( + "Loaded chain from DB", + "latest_blockNumber", latestChain.BlockNumber(), + "latest_blockHash", latestChain.BlockHash(), + "earliest_blockNumber", earliest.BlockNumber(), + "earliest_blockHash", earliest.BlockHash(), + ) + } + if err := ht.handleNewHead(ctx, initialHead); err != nil { + return fmt.Errorf("error handling initial head: %w", err) + } + + return nil +} + // Close stops HeadTracker service. func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Close() error { return ht.StopOnce("HeadTracker", func() error { @@ -159,36 +185,26 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) HealthReport() map[string]error { return report } -func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain HTH, depth uint) (err error) { - if uint(headWithChain.ChainLength()) >= depth { - return nil +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain, latestFinalized HTH) (err error) { + if !latestFinalized.IsValid() { + return errors.New("can not perform backfill without a valid latestFinalized head") } - baseHeight := headWithChain.BlockNumber() - int64(depth-1) - if baseHeight < 0 { - baseHeight = 0 + if headWithChain.BlockNumber() < latestFinalized.BlockNumber() { + const errMsg = "invariant violation: expected head of canonical chain to be ahead of the latestFinalized" + ht.log.With("head_block_num", headWithChain.BlockNumber(), + "latest_finalized_block_number", latestFinalized.BlockNumber()). + Criticalf(errMsg) + return errors.New(errMsg) } - return ht.backfill(ctx, headWithChain.EarliestHeadInChain(), baseHeight) + return ht.backfill(ctx, headWithChain, latestFinalized) } func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) LatestChain() HTH { return ht.headSaver.LatestChain() } -func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) getInitialHead(ctx context.Context) (HTH, error) { - head, err := ht.client.HeadByNumber(ctx, nil) - if err != nil { - return ht.getNilHead(), fmt.Errorf("failed to fetch initial head: %w", err) - } - loggerFields := []interface{}{"head", head} - if head.IsValid() { - loggerFields = append(loggerFields, "blockNumber", head.BlockNumber(), "blockHash", head.BlockHash()) - } - ht.log.Debugw("Got initial head", loggerFields...) - return head, nil -} - func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) handleNewHead(ctx context.Context, head HTH) error { prevHead := ht.headSaver.LatestChain() @@ -290,7 +306,13 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() { break } { - err := ht.Backfill(ctx, head, uint(ht.config.FinalityDepth())) + latestFinalized, err := ht.calculateLatestFinalized(ctx, head) + if err != nil { + ht.log.Warnw("Failed to calculate finalized block", "err", err) + continue + } + + err = ht.Backfill(ctx, head, latestFinalized) if err != nil { ht.log.Warnw("Unexpected error while backfilling heads", "err", err) } else if ctx.Err() != nil { @@ -302,14 +324,30 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfillLoop() { } } -// backfill fetches all missing heads up until the base height -func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head types.Head[BLOCK_HASH], baseHeight int64) (err error) { - headBlockNumber := head.BlockNumber() - if headBlockNumber <= baseHeight { - return nil +// calculateLatestFinalized - returns latest finalized block. It's expected that currentHeadNumber - is the head of +// canonical chain. There is no guaranties that returned block belongs to the canonical chain. Additional verification +// must be performed before usage. +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) calculateLatestFinalized(ctx context.Context, currentHead HTH) (h HTH, err error) { + if ht.config.FinalityTagEnabled() { + return ht.client.LatestFinalizedBlock(ctx) + } + // no need to make an additional RPC call on chains with instant finality + if ht.config.FinalityDepth() == 0 { + return currentHead, nil } + finalizedBlockNumber := currentHead.BlockNumber() - int64(ht.config.FinalityDepth()) + if finalizedBlockNumber <= 0 { + finalizedBlockNumber = 0 + } + return ht.client.HeadByNumber(ctx, big.NewInt(finalizedBlockNumber)) +} + +// backfill fetches all missing heads up until the latestFinalizedHead +func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, head, latestFinalizedHead HTH) (err error) { + headBlockNumber := head.BlockNumber() mark := time.Now() fetched := 0 + baseHeight := latestFinalizedHead.BlockNumber() l := ht.log.With("blockNumber", headBlockNumber, "n", headBlockNumber-baseHeight, "fromBlockHeight", baseHeight, @@ -337,11 +375,30 @@ func (ht *HeadTracker[HTH, S, ID, BLOCK_HASH]) backfill(ctx context.Context, hea fetched++ if ctx.Err() != nil { ht.log.Debugw("context canceled, aborting backfill", "err", err, "ctx.Err", ctx.Err()) - break + return fmt.Errorf("fetchAndSaveHead failed: %w", ctx.Err()) } else if err != nil { return fmt.Errorf("fetchAndSaveHead failed: %w", err) } } + + if head.BlockHash() != latestFinalizedHead.BlockHash() { + const errMsg = "expected finalized block to be present in canonical chain" + ht.log.With("finalized_block_number", latestFinalizedHead.BlockNumber(), "finalized_hash", latestFinalizedHead.BlockHash(), + "canonical_chain_block_number", head.BlockNumber(), "canonical_chain_hash", head.BlockHash()).Criticalf(errMsg) + return fmt.Errorf(errMsg) + } + + l = l.With("latest_finalized_block_hash", latestFinalizedHead.BlockHash(), + "latest_finalized_block_number", latestFinalizedHead.BlockNumber()) + + err = ht.headSaver.MarkFinalized(ctx, latestFinalizedHead) + if err != nil { + l.Debugw("failed to mark block as finalized", "err", err) + return nil + } + + l.Debugw("marked block as finalized") + return } diff --git a/common/headtracker/types/client.go b/common/headtracker/types/client.go index 906f95bbe54..a1e419809b5 100644 --- a/common/headtracker/types/client.go +++ b/common/headtracker/types/client.go @@ -15,4 +15,6 @@ type Client[H types.Head[BLOCK_HASH], S types.Subscription, ID types.ID, BLOCK_H // SubscribeNewHead is the method in which the client receives new Head. // It can be implemented differently for each chain i.e websocket, polling, etc SubscribeNewHead(ctx context.Context, ch chan<- H) (S, error) + // LatestFinalizedBlock - returns the latest block that was marked as finalized + LatestFinalizedBlock(ctx context.Context) (head H, err error) } diff --git a/common/headtracker/types/config.go b/common/headtracker/types/config.go index ca64f7a2952..019aa9847d9 100644 --- a/common/headtracker/types/config.go +++ b/common/headtracker/types/config.go @@ -5,6 +5,7 @@ import "time" type Config interface { BlockEmissionIdleWarningThreshold() time.Duration FinalityDepth() uint32 + FinalityTagEnabled() bool } type HeadTrackerConfig interface { diff --git a/common/mocks/head_tracker.go b/common/mocks/head_tracker.go index 83ee54b1847..fea31a1d6eb 100644 --- a/common/mocks/head_tracker.go +++ b/common/mocks/head_tracker.go @@ -14,17 +14,17 @@ type HeadTracker[H types.Head[BLOCK_HASH], BLOCK_HASH types.Hashable] struct { mock.Mock } -// Backfill provides a mock function with given fields: ctx, headWithChain, depth -func (_m *HeadTracker[H, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain H, depth uint) error { - ret := _m.Called(ctx, headWithChain, depth) +// Backfill provides a mock function with given fields: ctx, headWithChain, latestFinalized +func (_m *HeadTracker[H, BLOCK_HASH]) Backfill(ctx context.Context, headWithChain H, latestFinalized H) error { + ret := _m.Called(ctx, headWithChain, latestFinalized) if len(ret) == 0 { panic("no return value specified for Backfill") } var r0 error - if rf, ok := ret.Get(0).(func(context.Context, H, uint) error); ok { - r0 = rf(ctx, headWithChain, depth) + if rf, ok := ret.Get(0).(func(context.Context, H, H) error); ok { + r0 = rf(ctx, headWithChain, latestFinalized) } else { r0 = ret.Error(0) } diff --git a/common/types/head_tracker.go b/common/types/head_tracker.go index d8fa8011783..83a2d7b8adb 100644 --- a/common/types/head_tracker.go +++ b/common/types/head_tracker.go @@ -12,9 +12,8 @@ import ( //go:generate mockery --quiet --name HeadTracker --output ../mocks/ --case=underscore type HeadTracker[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { services.Service - // Backfill given a head will fill in any missing heads up to the given depth - // (used for testing) - Backfill(ctx context.Context, headWithChain H, depth uint) (err error) + // Backfill given a head will fill in any missing heads up to latestFinalized + Backfill(ctx context.Context, headWithChain, latestFinalized H) (err error) LatestChain() H } @@ -37,12 +36,14 @@ type HeadSaver[H Head[BLOCK_HASH], BLOCK_HASH Hashable] interface { // Save updates the latest block number, if indeed the latest, and persists // this number in case of reboot. Save(ctx context.Context, head H) error - // Load loads latest EvmHeadTrackerHistoryDepth heads, returns the latest chain. - Load(ctx context.Context) (H, error) + // Load loads latest heads up to latestFinalized - historyDepth, returns the latest chain. + Load(ctx context.Context, latestFinalized int64) (H, error) // LatestChain returns the block header with the highest number that has been seen, or nil. LatestChain() H // Chain returns a head for the specified hash, or nil. Chain(hash BLOCK_HASH) H + // MarkFinalized - marks matching block and all it's direct ancestors as finalized + MarkFinalized(ctx context.Context, latestFinalized H) error } // HeadListener is a chain agnostic interface that manages connection of Client that receives heads from the blockchain node diff --git a/core/chains/evm/client/chain_client.go b/core/chains/evm/client/chain_client.go index cd4665aac8c..64e854f1de4 100644 --- a/core/chains/evm/client/chain_client.go +++ b/core/chains/evm/client/chain_client.go @@ -278,3 +278,7 @@ func (c *chainClient) TransactionReceipt(ctx context.Context, txHash common.Hash //return rpc.TransactionReceipt(ctx, txHash) return rpc.TransactionReceiptGeth(ctx, txHash) } + +func (c *chainClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + return c.multiNode.LatestFinalizedBlock(ctx) +} diff --git a/core/chains/evm/client/client.go b/core/chains/evm/client/client.go index 70d989ae808..ee33db97fd6 100644 --- a/core/chains/evm/client/client.go +++ b/core/chains/evm/client/client.go @@ -63,6 +63,7 @@ type Client interface { HeadByNumber(ctx context.Context, n *big.Int) (*evmtypes.Head, error) HeadByHash(ctx context.Context, n common.Hash) (*evmtypes.Head, error) SubscribeNewHead(ctx context.Context, ch chan<- *evmtypes.Head) (ethereum.Subscription, error) + LatestFinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) SendTransactionReturnCode(ctx context.Context, tx *types.Transaction, fromAddress common.Address) (commonclient.SendTxReturnCode, error) @@ -366,3 +367,7 @@ func (client *client) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, er func (client *client) IsL2() bool { return client.pool.ChainType().IsL2() } + +func (client *client) LatestFinalizedBlock(_ context.Context) (*evmtypes.Head, error) { + return nil, pkgerrors.New("not implemented. client was deprecated. New methods are added only to satisfy type constraints while we are migrating to new alternatives") +} diff --git a/core/chains/evm/client/mocks/client.go b/core/chains/evm/client/mocks/client.go index bbaaafd7615..e6c9da1cbe9 100644 --- a/core/chains/evm/client/mocks/client.go +++ b/core/chains/evm/client/mocks/client.go @@ -565,6 +565,36 @@ func (_m *Client) LatestBlockHeight(ctx context.Context) (*big.Int, error) { return r0, r1 } +// LatestFinalizedBlock provides a mock function with given fields: ctx +func (_m *Client) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LatestFinalizedBlock") + } + + var r0 *evmtypes.Head + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*evmtypes.Head, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *evmtypes.Head); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*evmtypes.Head) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NodeStates provides a mock function with given fields: func (_m *Client) NodeStates() map[string]string { ret := _m.Called() diff --git a/core/chains/evm/client/mocks/rpc_client.go b/core/chains/evm/client/mocks/rpc_client.go index 26d5744a1ab..9fd9d6a9e79 100644 --- a/core/chains/evm/client/mocks/rpc_client.go +++ b/core/chains/evm/client/mocks/rpc_client.go @@ -590,6 +590,36 @@ func (_m *RPCClient) LatestBlockHeight(_a0 context.Context) (*big.Int, error) { return r0, r1 } +// LatestFinalizedBlock provides a mock function with given fields: ctx +func (_m *RPCClient) LatestFinalizedBlock(ctx context.Context) (*types.Head, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for LatestFinalizedBlock") + } + + var r0 *types.Head + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*types.Head, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) *types.Head); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Head) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // PendingCallContract provides a mock function with given fields: ctx, msg func (_m *RPCClient) PendingCallContract(ctx context.Context, msg interface{}) ([]byte, error) { ret := _m.Called(ctx, msg) diff --git a/core/chains/evm/client/null_client.go b/core/chains/evm/client/null_client.go index 3cbae9e9dde..e4bd7d1dd9a 100644 --- a/core/chains/evm/client/null_client.go +++ b/core/chains/evm/client/null_client.go @@ -11,6 +11,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/logger" + commonclient "github.com/smartcontractkit/chainlink/v2/common/client" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -226,3 +227,7 @@ func (nc *NullClient) IsL2() bool { nc.lggr.Debug("IsL2") return false } + +func (nc *NullClient) LatestFinalizedBlock(_ context.Context) (*evmtypes.Head, error) { + return nil, nil +} diff --git a/core/chains/evm/client/rpc_client.go b/core/chains/evm/client/rpc_client.go index 38d6a123f49..f9745cfda11 100644 --- a/core/chains/evm/client/rpc_client.go +++ b/core/chains/evm/client/rpc_client.go @@ -479,9 +479,17 @@ func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header return } +func (r *rpcClient) LatestFinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) { + return r.blockByNumber(ctx, rpc.FinalizedBlockNumber.String()) +} + func (r *rpcClient) BlockByNumber(ctx context.Context, number *big.Int) (head *evmtypes.Head, err error) { hex := ToBlockNumArg(number) - err = r.CallContext(ctx, &head, "eth_getBlockByNumber", hex, false) + return r.blockByNumber(ctx, hex) +} + +func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evmtypes.Head, err error) { + err = r.CallContext(ctx, &head, "eth_getBlockByNumber", number, false) if err != nil { return nil, err } diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index 631ed1ffaca..5750887126a 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -693,6 +693,17 @@ func (c *SimulatedBackendClient) ethGetHeaderByNumber(ctx context.Context, resul return nil } +func (c *SimulatedBackendClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) { + block := c.b.Blockchain().CurrentFinalBlock() + return &evmtypes.Head{ + EVMChainID: ubig.NewI(c.chainId.Int64()), + Hash: block.Hash(), + Number: block.Number.Int64(), + ParentHash: block.ParentHash, + Timestamp: time.Unix(int64(block.Time), 0), + }, nil +} + func (c *SimulatedBackendClient) ethGetLogs(ctx context.Context, result interface{}, args ...interface{}) error { var from, to *big.Int var hash *common.Hash diff --git a/core/chains/evm/headtracker/config.go b/core/chains/evm/headtracker/config.go index 54ccb1f933f..85fe084470d 100644 --- a/core/chains/evm/headtracker/config.go +++ b/core/chains/evm/headtracker/config.go @@ -12,6 +12,7 @@ import ( type Config interface { BlockEmissionIdleWarningThreshold() time.Duration FinalityDepth() uint32 + FinalityTagEnabled() bool } type HeadTrackerConfig interface { diff --git a/core/chains/evm/headtracker/head_broadcaster_test.go b/core/chains/evm/headtracker/head_broadcaster_test.go index dcbb9bd0396..7c55f27c2fd 100644 --- a/core/chains/evm/headtracker/head_broadcaster_test.go +++ b/core/chains/evm/headtracker/head_broadcaster_test.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest" + commonhtrk "github.com/smartcontractkit/chainlink/v2/common/headtracker" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" @@ -61,8 +62,8 @@ func TestHeadBroadcaster_Subscribe(t *testing.T) { chchHeaders <- args.Get(1).(chan<- *evmtypes.Head) }). Return(sub, nil) - ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(1), nil).Once() - ethClient.On("HeadByHash", mock.Anything, mock.Anything).Return(cltest.Head(1), nil) + // 2 for initial and 2 for backfill + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(1), nil).Times(4) sub.On("Unsubscribe").Return() sub.On("Err").Return(nil) diff --git a/core/chains/evm/headtracker/head_saver.go b/core/chains/evm/headtracker/head_saver.go index 92eedaf153e..218f9d8366f 100644 --- a/core/chains/evm/headtracker/head_saver.go +++ b/core/chains/evm/headtracker/head_saver.go @@ -2,10 +2,12 @@ package headtracker import ( "context" + "fmt" "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/chainlink-common/pkg/logger" + commontypes "github.com/smartcontractkit/chainlink/v2/common/types" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -36,23 +38,26 @@ func (hs *headSaver) Save(ctx context.Context, head *evmtypes.Head) error { return err } - historyDepth := uint(hs.htConfig.HistoryDepth()) - hs.heads.AddHeads(historyDepth, head) + hs.heads.AddHeads(head) - return hs.orm.TrimOldHeads(ctx, historyDepth) + return nil } -func (hs *headSaver) Load(ctx context.Context) (chain *evmtypes.Head, err error) { - historyDepth := uint(hs.htConfig.HistoryDepth()) - heads, err := hs.orm.LatestHeads(ctx, historyDepth) +func (hs *headSaver) Load(ctx context.Context, latestFinalized int64) (chain *evmtypes.Head, err error) { + minBlockNumber := hs.calculateMinBlockToKeep(latestFinalized) + heads, err := hs.orm.LatestHeads(ctx, minBlockNumber) if err != nil { return nil, err } - hs.heads.AddHeads(historyDepth, heads...) + hs.heads.AddHeads(heads...) return hs.heads.LatestHead(), nil } +func (hs *headSaver) calculateMinBlockToKeep(latestFinalized int64) int64 { + return max(latestFinalized-int64(hs.htConfig.HistoryDepth()), 0) +} + func (hs *headSaver) LatestHeadFromDB(ctx context.Context) (head *evmtypes.Head, err error) { return hs.orm.LatestHead(ctx) } @@ -72,12 +77,26 @@ func (hs *headSaver) Chain(hash common.Hash) *evmtypes.Head { return hs.heads.HeadByHash(hash) } +func (hs *headSaver) MarkFinalized(ctx context.Context, finalized *evmtypes.Head) error { + minBlockToKeep := hs.calculateMinBlockToKeep(finalized.BlockNumber()) + if !hs.heads.MarkFinalized(finalized.BlockHash(), minBlockToKeep) { + return fmt.Errorf("failed to find %s block in the canonical chain to mark it as finalized", finalized) + } + + return hs.orm.TrimOldHeads(ctx, minBlockToKeep) +} + var NullSaver httypes.HeadSaver = &nullSaver{} type nullSaver struct{} -func (*nullSaver) Save(ctx context.Context, head *evmtypes.Head) error { return nil } -func (*nullSaver) Load(ctx context.Context) (*evmtypes.Head, error) { return nil, nil } +func (*nullSaver) Save(ctx context.Context, head *evmtypes.Head) error { return nil } +func (*nullSaver) Load(ctx context.Context, latestFinalized int64) (*evmtypes.Head, error) { + return nil, nil +} func (*nullSaver) LatestHeadFromDB(ctx context.Context) (*evmtypes.Head, error) { return nil, nil } func (*nullSaver) LatestChain() *evmtypes.Head { return nil } func (*nullSaver) Chain(hash common.Hash) *evmtypes.Head { return nil } +func (*nullSaver) MarkFinalized(ctx context.Context, latestFinalized *evmtypes.Head) error { + return nil +} diff --git a/core/chains/evm/headtracker/head_saver_test.go b/core/chains/evm/headtracker/head_saver_test.go index f541330bc98..e06c36c674c 100644 --- a/core/chains/evm/headtracker/head_saver_test.go +++ b/core/chains/evm/headtracker/head_saver_test.go @@ -1,14 +1,20 @@ package headtracker_test import ( + "math/big" "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" + ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" @@ -34,6 +40,7 @@ func (h *headTrackerConfig) MaxBufferSize() uint32 { type config struct { finalityDepth uint32 blockEmissionIdleWarningThreshold time.Duration + finalityTagEnabled bool } func (c *config) FinalityDepth() uint32 { return c.finalityDepth } @@ -41,20 +48,31 @@ func (c *config) BlockEmissionIdleWarningThreshold() time.Duration { return c.blockEmissionIdleWarningThreshold } -func configureSaver(t *testing.T) (httypes.HeadSaver, headtracker.ORM) { +func (c *config) FinalityTagEnabled() bool { + return c.finalityTagEnabled +} + +type saverOpts struct { + headTrackerConfig *headTrackerConfig +} + +func configureSaver(t *testing.T, opts saverOpts) (httypes.HeadSaver, headtracker.ORM) { + if opts.headTrackerConfig == nil { + opts.headTrackerConfig = &headTrackerConfig{historyDepth: 6} + } db := pgtest.NewSqlxDB(t) lggr := logger.Test(t) cfg := configtest.NewGeneralConfig(t, nil) htCfg := &config{finalityDepth: uint32(1)} orm := headtracker.NewORM(db, lggr, cfg.Database(), cltest.FixtureChainID) - saver := headtracker.NewHeadSaver(lggr, orm, htCfg, &headTrackerConfig{historyDepth: 6}) + saver := headtracker.NewHeadSaver(lggr, orm, htCfg, opts.headTrackerConfig) return saver, orm } func TestHeadSaver_Save(t *testing.T) { t.Parallel() - saver, _ := configureSaver(t) + saver, _ := configureSaver(t, saverOpts{}) head := cltest.Head(1) err := saver.Save(testutils.Context(t), head) @@ -76,19 +94,56 @@ func TestHeadSaver_Save(t *testing.T) { func TestHeadSaver_Load(t *testing.T) { t.Parallel() - saver, orm := configureSaver(t) - - for i := 0; i < 5; i++ { - err := orm.IdempotentInsertHead(testutils.Context(t), cltest.Head(i)) + saver, orm := configureSaver(t, saverOpts{ + headTrackerConfig: &headTrackerConfig{historyDepth: 4}, + }) + + // create chain + // H0 <- H1 <- H2 <- H3 <- H4 <- H5 + // \ + // H2Uncle + // + newHead := func(num int, parent common.Hash) *evmtypes.Head { + h := evmtypes.NewHead(big.NewInt(int64(num)), utils.NewHash(), parent, uint64(time.Now().Unix()), ubig.NewI(0)) + return &h + } + h0 := newHead(0, utils.NewHash()) + h1 := newHead(1, h0.Hash) + h2 := newHead(2, h1.Hash) + h3 := newHead(3, h2.Hash) + h4 := newHead(4, h3.Hash) + h5 := newHead(5, h4.Hash) + h2Uncle := newHead(2, h1.Hash) + + allHeads := []*evmtypes.Head{h0, h1, h2, h2Uncle, h3, h4, h5} + + for _, h := range allHeads { + err := orm.IdempotentInsertHead(testutils.Context(t), h) require.NoError(t, err) } - latestHead, err := saver.Load(testutils.Context(t)) + verifyLatestHead := func(latestHead *evmtypes.Head) { + // latest head matches h5 and chain does not include h0 + require.NotNil(t, latestHead) + require.Equal(t, int64(5), latestHead.Number) + require.Equal(t, uint32(5), latestHead.ChainLength()) + require.Greater(t, latestHead.EarliestHeadInChain().BlockNumber(), int64(0)) + } + + // load all from [h5-historyDepth, h5] + latestHead, err := saver.Load(testutils.Context(t), h5.BlockNumber()) require.NoError(t, err) + // verify latest head loaded from db + verifyLatestHead(latestHead) + + //verify latest head loaded from memory store + latestHead = saver.LatestChain() require.NotNil(t, latestHead) - require.Equal(t, int64(4), latestHead.Number) + verifyLatestHead(latestHead) + + // h2Uncle was loaded and has chain up to h1 + uncleChain := saver.Chain(h2Uncle.Hash) + require.NotNil(t, uncleChain) + require.Equal(t, uint32(2), uncleChain.ChainLength()) // h2Uncle -> h1 - latestChain := saver.LatestChain() - require.NotNil(t, latestChain) - require.Equal(t, int64(4), latestChain.Number) } diff --git a/core/chains/evm/headtracker/head_tracker.go b/core/chains/evm/headtracker/head_tracker.go index 3cddfb71d09..1fed1aa0c51 100644 --- a/core/chains/evm/headtracker/head_tracker.go +++ b/core/chains/evm/headtracker/head_tracker.go @@ -53,7 +53,7 @@ func (*nullTracker) Ready() error { return nil } func (*nullTracker) HealthReport() map[string]error { return map[string]error{} } func (*nullTracker) Name() string { return "" } func (*nullTracker) SetLogLevel(zapcore.Level) {} -func (*nullTracker) Backfill(ctx context.Context, headWithChain *evmtypes.Head, depth uint) (err error) { +func (*nullTracker) Backfill(ctx context.Context, headWithChain, latestFinalized *evmtypes.Head) (err error) { return nil } func (*nullTracker) LatestChain() *evmtypes.Head { return nil } diff --git a/core/chains/evm/headtracker/head_tracker_test.go b/core/chains/evm/headtracker/head_tracker_test.go index 22e931d6d0f..a2e45c59f09 100644 --- a/core/chains/evm/headtracker/head_tracker_test.go +++ b/core/chains/evm/headtracker/head_tracker_test.go @@ -16,8 +16,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "golang.org/x/exp/maps" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/jmoiron/sqlx" commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" @@ -27,7 +31,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest" commonmocks "github.com/smartcontractkit/chainlink/v2/common/types/mocks" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" + evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -56,6 +60,8 @@ func TestHeadTracker_New(t *testing.T) { config := configtest.NewGeneralConfig(t, nil) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(cltest.Head(0), nil) + // finalized + ethClient.On("HeadByNumber", mock.Anything, big.NewInt(0)).Return(cltest.Head(0), nil) orm := headtracker.NewORM(db, logger, config.Database(), cltest.FixtureChainID) assert.Nil(t, orm.IdempotentInsertHead(testutils.Context(t), cltest.Head(1))) @@ -72,12 +78,15 @@ func TestHeadTracker_New(t *testing.T) { assert.Equal(t, last.Number, latest.Number) } -func TestHeadTracker_Save_InsertsAndTrimsTable(t *testing.T) { +func TestHeadTracker_MarkFinalized_MarksAndTrimsTable(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) logger := logger.Test(t) - config := cltest.NewTestChainScopedConfig(t) + gCfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, _ *chainlink.Secrets) { + c.EVM[0].HeadTracker.HistoryDepth = ptr[uint32](100) + }) + config := evmtest.NewChainScopedConfig(t, gCfg) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) orm := headtracker.NewORM(db, logger, config.Database(), cltest.FixtureChainID) @@ -86,18 +95,21 @@ func TestHeadTracker_Save_InsertsAndTrimsTable(t *testing.T) { assert.Nil(t, orm.IdempotentInsertHead(testutils.Context(t), cltest.Head(idx))) } - ht := createHeadTracker(t, ethClient, config.EVM(), config.EVM().HeadTracker(), orm) + latest := cltest.Head(201) + assert.Nil(t, orm.IdempotentInsertHead(testutils.Context(t), latest)) - h := cltest.Head(200) - require.NoError(t, ht.headSaver.Save(testutils.Context(t), h)) - assert.Equal(t, big.NewInt(200), ht.headSaver.LatestChain().ToInt()) + ht := createHeadTracker(t, ethClient, config.EVM(), config.EVM().HeadTracker(), orm) + _, err := ht.headSaver.Load(testutils.Context(t), latest.Number) + require.NoError(t, err) + require.NoError(t, ht.headSaver.MarkFinalized(testutils.Context(t), latest)) + assert.Equal(t, big.NewInt(201), ht.headSaver.LatestChain().ToInt()) firstHead := firstHead(t, db) assert.Equal(t, big.NewInt(101), firstHead.ToInt()) lastHead, err := orm.LatestHead(testutils.Context(t)) require.NoError(t, err) - assert.Equal(t, int64(200), lastHead.Number) + assert.Equal(t, int64(201), lastHead.Number) } func TestHeadTracker_Get(t *testing.T) { @@ -176,7 +188,11 @@ func TestHeadTracker_Start_NewHeads(t *testing.T) { chStarted := make(chan struct{}) mockEth := &evmtest.MockEth{EthClient: ethClient} sub := mockEth.NewSub(t) - ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(cltest.Head(0), nil) + // for initial load + ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(cltest.Head(0), nil).Once() + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(0), nil).Once() + // for backfill + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(0), nil).Maybe() ethClient.On("SubscribeNewHead", mock.Anything, mock.Anything). Run(func(mock.Arguments) { close(chStarted) @@ -189,43 +205,73 @@ func TestHeadTracker_Start_NewHeads(t *testing.T) { <-chStarted } -func TestHeadTracker_Start_CancelContext(t *testing.T) { +func TestHeadTracker_Start(t *testing.T) { t.Parallel() - db := pgtest.NewSqlxDB(t) - logger := logger.Test(t) - config := cltest.NewTestChainScopedConfig(t) - orm := headtracker.NewORM(db, logger, config.Database(), cltest.FixtureChainID) - ethClient := evmtest.NewEthClientMockWithDefaultChain(t) - chStarted := make(chan struct{}) - ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Run(func(args mock.Arguments) { - ctx := args.Get(0).(context.Context) - select { - case <-ctx.Done(): - return - case <-time.After(10 * time.Second): - assert.FailNow(t, "context was not cancelled within 10s") - } - }).Return(cltest.Head(0), nil) - mockEth := &evmtest.MockEth{EthClient: ethClient} - sub := mockEth.NewSub(t) - ethClient.On("SubscribeNewHead", mock.Anything, mock.Anything). - Run(func(mock.Arguments) { - close(chStarted) - }). - Return(sub, nil). - Maybe() - - ht := createHeadTracker(t, ethClient, config.EVM(), config.EVM().HeadTracker(), orm) + const historyDepth = 100 + newHeadTracker := func(t *testing.T) *headTrackerUniverse { + db := pgtest.NewSqlxDB(t) + gCfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, _ *chainlink.Secrets) { + c.EVM[0].FinalityTagEnabled = ptr[bool](true) + c.EVM[0].HeadTracker.HistoryDepth = ptr[uint32](historyDepth) + }) + config := evmtest.NewChainScopedConfig(t, gCfg) + orm := headtracker.NewORM(db, logger.Test(t), config.Database(), cltest.FixtureChainID) + ethClient := evmtest.NewEthClientMockWithDefaultChain(t) + return createHeadTracker(t, ethClient, config.EVM(), config.EVM().HeadTracker(), orm) + } - ctx, cancel := context.WithCancel(testutils.Context(t)) - go func() { - time.Sleep(1 * time.Second) - cancel() - }() - err := ht.headTracker.Start(ctx) - require.NoError(t, err) - require.NoError(t, ht.headTracker.Close()) + t.Run("Fail start if context was canceled", func(t *testing.T) { + ctx, cancel := context.WithCancel(testutils.Context(t)) + ht := newHeadTracker(t) + ht.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Run(func(args mock.Arguments) { + cancel() + }).Return(cltest.Head(0), context.Canceled) + err := ht.headTracker.Start(ctx) + require.ErrorIs(t, err, context.Canceled) + }) + t.Run("Starts even if failed to get initialHead", func(t *testing.T) { + ht := newHeadTracker(t) + ht.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(cltest.Head(0), errors.New("failed to get init head")) + ht.Start(t) + tests.AssertLogEventually(t, ht.observer, "Error handling initial head") + }) + t.Run("Starts even if received invalid head", func(t *testing.T) { + ht := newHeadTracker(t) + ht.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(nil, nil) + ht.Start(t) + tests.AssertLogEventually(t, ht.observer, "Got nil initial head") + }) + t.Run("Starts even if fails to get finalizedHead", func(t *testing.T) { + ht := newHeadTracker(t) + head := cltest.Head(1000) + ht.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(head, nil).Once() + ht.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(nil, errors.New("failed to load latest finalized")).Once() + ht.Start(t) + tests.AssertLogEventually(t, ht.observer, "Error handling initial head") + }) + t.Run("Starts even if latest finalizedHead is nil", func(t *testing.T) { + ht := newHeadTracker(t) + head := cltest.Head(1000) + ht.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(head, nil).Once() + ht.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(nil, nil).Once() + ht.Start(t) + tests.AssertLogEventually(t, ht.observer, "Error handling initial head") + }) + t.Run("Happy path", func(t *testing.T) { + head := cltest.Head(1000) + ht := newHeadTracker(t) + ctx := testutils.Context(t) + require.NoError(t, ht.orm.IdempotentInsertHead(ctx, cltest.Head(799))) + ht.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(head, nil).Once() + finalizedHead := cltest.Head(800) + // on start + ht.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(finalizedHead, nil).Once() + // on backfill + ht.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(nil, errors.New("backfill call to finalized failed")).Maybe() + ht.Start(t) + tests.AssertLogEventually(t, ht.observer, "Loaded chain from DB") + }) } func TestHeadTracker_CallsHeadTrackableCallbacks(t *testing.T) { @@ -289,7 +335,7 @@ func TestHeadTracker_ReconnectOnError(t *testing.T) { func(ctx context.Context, ch chan<- *evmtypes.Head) ethereum.Subscription { return mockEth.NewSub(t) }, func(ctx context.Context, ch chan<- *evmtypes.Head) error { return nil }, ) - ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(cltest.Head(0), nil) + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(0), nil) checker := &cltest.MockHeadTrackable{} ht := createHeadTrackerWithChecker(t, ethClient, config.EVM(), config.EVM().HeadTracker(), orm, checker) @@ -325,7 +371,7 @@ func TestHeadTracker_ResubscribeOnSubscriptionError(t *testing.T) { }, func(ctx context.Context, ch chan<- *evmtypes.Head) error { return nil }, ) - ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(0), nil).Once() + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(cltest.Head(0), nil) ethClient.On("HeadByHash", mock.Anything, mock.Anything).Return(cltest.Head(0), nil).Maybe() checker := &cltest.MockHeadTrackable{} @@ -373,6 +419,7 @@ func TestHeadTracker_Start_LoadsLatestChain(t *testing.T) { parentHash = heads[i].Hash } ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(heads[3], nil).Maybe() + ethClient.On("HeadByNumber", mock.Anything, big.NewInt(0)).Return(heads[0], nil).Maybe() ethClient.On("HeadByHash", mock.Anything, heads[2].Hash).Return(heads[2], nil).Maybe() ethClient.On("HeadByHash", mock.Anything, heads[1].Hash).Return(heads[1], nil).Maybe() ethClient.On("HeadByHash", mock.Anything, heads[0].Hash).Return(heads[0], nil).Maybe() @@ -454,6 +501,8 @@ func TestHeadTracker_SwitchesToLongestChainWithHeadSamplingEnabled(t *testing.T) head0 := blocks.Head(0) // Initial query ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(head0, nil) + // backfill query + ethClient.On("HeadByNumber", mock.Anything, big.NewInt(0)).Return(head0, nil) ht.Start(t) headSeq := cltest.NewHeadBuffer(t) @@ -582,6 +631,8 @@ func TestHeadTracker_SwitchesToLongestChainWithHeadSamplingDisabled(t *testing.T head0 := blocks.Head(0) // evmtypes.Head{Number: 0, Hash: utils.NewHash(), ParentHash: utils.NewHash(), Timestamp: time.Unix(0, 0)} // Initial query ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(head0, nil) + // backfill + ethClient.On("HeadByNumber", mock.Anything, big.NewInt(0)).Return(head0, nil) headSeq := cltest.NewHeadBuffer(t) headSeq.Append(blocks.Head(0)) @@ -773,45 +824,69 @@ func TestHeadTracker_Backfill(t *testing.T) { ctx := testutils.Context(t) - t.Run("does nothing if all the heads are in database", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) + type opts struct { + Heads []evmtypes.Head + } + newHeadTrackerUniverse := func(t *testing.T, opts opts) *headTrackerUniverse { cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - for i := range heads { - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &heads[i])) + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + lggr := logger.Test(t) + db := pgtest.NewSqlxDB(t) + orm := headtracker.NewORM(db, lggr, cfg.Database(), cltest.FixtureChainID) + for i := range opts.Heads { + require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &opts.Heads[i])) } - ethClient := evmtest.NewEthClientMock(t) ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) - - err := ht.Backfill(ctx, &h12, 2) + ht := createHeadTracker(t, ethClient, evmcfg.EVM(), evmcfg.EVM().HeadTracker(), orm) + _, err := ht.headSaver.Load(testutils.Context(t), 0) require.NoError(t, err) + return ht + } + + t.Run("returns error if latestFinalized is not valid", func(t *testing.T) { + htu := newHeadTrackerUniverse(t, opts{}) + + err := htu.headTracker.Backfill(ctx, &h12, nil) + require.EqualError(t, err, "can not perform backfill without a valid latestFinalized head") }) + t.Run("Returns error if finalized head is ahead of canonical", func(t *testing.T) { + htu := newHeadTrackerUniverse(t, opts{}) - t.Run("fetches a missing head", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - for i := range heads { - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &heads[i])) - } + err := htu.headTracker.Backfill(ctx, &h12, &h14Orphaned) + require.EqualError(t, err, "invariant violation: expected head of canonical chain to be ahead of the latestFinalized") + }) + t.Run("Returns error if finalizedHead is not present in the canonical chain", func(t *testing.T) { + htu := newHeadTrackerUniverse(t, opts{Heads: heads}) - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - ethClient.On("HeadByHash", mock.Anything, head10.Hash). - Return(&head10, nil) + err := htu.headTracker.Backfill(ctx, &h15, &h14Orphaned) + require.EqualError(t, err, "expected finalized block to be present in canonical chain") + }) + t.Run("Marks all blocks in chain that are older than finalized", func(t *testing.T) { + htu := newHeadTrackerUniverse(t, opts{Heads: heads}) + + assertFinalized := func(expectedFinalized bool, msg string, heads ...evmtypes.Head) { + for _, h := range heads { + storedHead := htu.headSaver.Chain(h.Hash) + assert.Equal(t, expectedFinalized, storedHead != nil && storedHead.IsFinalized, msg, "block_number", h.Number) + } + } - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) + err := htu.headTracker.Backfill(ctx, &h15, &h14) + require.NoError(t, err) + assertFinalized(true, "expected heads to be marked as finalized after backfill", h14, h13, h12, h11) + assertFinalized(false, "expected heads to remain unfinalized", h15, head10) + }) - var depth uint = 3 + t.Run("fetches a missing head", func(t *testing.T) { + htu := newHeadTrackerUniverse(t, opts{Heads: heads}) + htu.ethClient.On("HeadByHash", mock.Anything, head10.Hash). + Return(&head10, nil) - err := ht.Backfill(ctx, &h12, depth) + err := htu.headTracker.Backfill(ctx, &h12, &h9) require.NoError(t, err) - h := ht.headSaver.Chain(h12.Hash) + h := htu.headSaver.Chain(h12.Hash) assert.Equal(t, int64(12), h.Number) require.NotNil(t, h.Parent) @@ -821,37 +896,23 @@ func TestHeadTracker_Backfill(t *testing.T) { require.NotNil(t, h.Parent.Parent.Parent) assert.Equal(t, int64(9), h.Parent.Parent.Parent.Number) - writtenHead, err := orm.HeadByHash(testutils.Context(t), head10.Hash) + writtenHead, err := htu.orm.HeadByHash(testutils.Context(t), head10.Hash) require.NoError(t, err) assert.Equal(t, int64(10), writtenHead.Number) }) t.Run("fetches only heads that are missing", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - for i := range heads { - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &heads[i])) - } - - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) + htu := newHeadTrackerUniverse(t, opts{Heads: heads}) - ethClient.On("HeadByHash", mock.Anything, head10.Hash). + htu.ethClient.On("HeadByHash", mock.Anything, head10.Hash). Return(&head10, nil) - ethClient.On("HeadByHash", mock.Anything, head8.Hash). + htu.ethClient.On("HeadByHash", mock.Anything, head8.Hash). Return(&head8, nil) - // Needs to be 8 because there are 8 heads in chain (15,14,13,12,11,10,9,8) - var depth uint = 8 - - err := ht.Backfill(ctx, &h15, depth) + err := htu.headTracker.Backfill(ctx, &h15, &head8) require.NoError(t, err) - h := ht.headSaver.Chain(h15.Hash) + h := htu.headSaver.Chain(h15.Hash) require.Equal(t, uint32(8), h.ChainLength()) earliestInChain := h.EarliestInChain() @@ -859,77 +920,20 @@ func TestHeadTracker_Backfill(t *testing.T) { assert.Equal(t, head8.Hash, earliestInChain.BlockHash()) }) - t.Run("does not backfill if chain length is already greater than or equal to depth", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - for i := range heads { - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &heads[i])) - } - - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) - - err := ht.Backfill(ctx, &h15, 3) - require.NoError(t, err) - - err = ht.Backfill(ctx, &h15, 5) - require.NoError(t, err) - }) - - t.Run("only backfills to height 0 if chain length would otherwise cause it to try and fetch a negative head", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - ethClient.On("HeadByHash", mock.Anything, head0.Hash). - Return(&head0, nil) - - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &h1)) - - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) - - err := ht.Backfill(ctx, &h1, 400) - require.NoError(t, err) - - h := ht.headSaver.Chain(h1.Hash) - require.NotNil(t, h) - - require.Equal(t, uint32(2), h.ChainLength()) - require.Equal(t, int64(0), h.EarliestInChain().BlockNumber()) - }) - t.Run("abandons backfill and returns error if the eth node returns not found", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - for i := range heads { - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &heads[i])) - } - - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - ethClient.On("HeadByHash", mock.Anything, head10.Hash). + htu := newHeadTrackerUniverse(t, opts{Heads: heads}) + htu.ethClient.On("HeadByHash", mock.Anything, head10.Hash). Return(&head10, nil). Once() - ethClient.On("HeadByHash", mock.Anything, head8.Hash). + htu.ethClient.On("HeadByHash", mock.Anything, head8.Hash). Return(nil, ethereum.NotFound). Once() - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) - - err := ht.Backfill(ctx, &h12, 400) + err := htu.headTracker.Backfill(ctx, &h12, &head8) require.Error(t, err) require.EqualError(t, err, "fetchAndSaveHead failed: not found") - h := ht.headSaver.Chain(h12.Hash) + h := htu.headSaver.Chain(h12.Hash) // Should contain 12, 11, 10, 9 assert.Equal(t, 4, int(h.ChainLength())) @@ -937,28 +941,20 @@ func TestHeadTracker_Backfill(t *testing.T) { }) t.Run("abandons backfill and returns error if the context time budget is exceeded", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - for i := range heads { - require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), &heads[i])) - } - - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - ethClient.On("HeadByHash", mock.Anything, head10.Hash). + htu := newHeadTrackerUniverse(t, opts{Heads: heads}) + htu.ethClient.On("HeadByHash", mock.Anything, head10.Hash). Return(&head10, nil) - ethClient.On("HeadByHash", mock.Anything, head8.Hash). - Return(nil, context.DeadlineExceeded) - - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) + lctx, cancel := context.WithCancel(ctx) + htu.ethClient.On("HeadByHash", mock.Anything, head8.Hash). + Return(nil, context.DeadlineExceeded).Run(func(args mock.Arguments) { + cancel() + }) - err := ht.Backfill(ctx, &h12, 400) + err := htu.headTracker.Backfill(lctx, &h12, &head8) require.Error(t, err) - require.EqualError(t, err, "fetchAndSaveHead failed: context deadline exceeded") + require.EqualError(t, err, "fetchAndSaveHead failed: context canceled") - h := ht.headSaver.Chain(h12.Hash) + h := htu.headSaver.Chain(h12.Hash) // Should contain 12, 11, 10, 9 assert.Equal(t, 4, int(h.ChainLength())) @@ -966,33 +962,40 @@ func TestHeadTracker_Backfill(t *testing.T) { }) t.Run("abandons backfill and returns error when fetching a block by hash fails, indicating a reorg", func(t *testing.T) { - db := pgtest.NewSqlxDB(t) - cfg := configtest.NewGeneralConfig(t, nil) - logger := logger.Test(t) - orm := headtracker.NewORM(db, logger, cfg.Database(), cltest.FixtureChainID) - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("ConfiguredChainID", mock.Anything).Return(evmtest.MustGetDefaultChainID(t, cfg.EVMConfigs()), nil) - ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(&h14, nil).Once() - ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(&h13, nil).Once() - ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(nil, errors.New("not found")).Once() + htu := newHeadTrackerUniverse(t, opts{}) + htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(&h14, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(&h13, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(nil, errors.New("not found")).Once() - ht := createHeadTrackerWithNeverSleeper(t, ethClient, cfg, orm) - - err := ht.Backfill(ctx, &h15, 400) + err := htu.headTracker.Backfill(ctx, &h15, &h11) require.Error(t, err) require.EqualError(t, err, "fetchAndSaveHead failed: not found") - h := ht.headSaver.Chain(h14.Hash) + h := htu.headSaver.Chain(h14.Hash) // Should contain 14, 13 (15 was never added). When trying to get the parent of h13 by hash, a reorg happened and backfill exited. assert.Equal(t, 2, int(h.ChainLength())) assert.Equal(t, int64(13), h.EarliestInChain().BlockNumber()) }) + t.Run("marks head as finalized, if latestHead = finalizedHead (0 finality depth)", func(t *testing.T) { + htu := newHeadTrackerUniverse(t, opts{Heads: []evmtypes.Head{h15}}) + finalizedH15 := h15 // copy h15 to have different addresses + err := htu.headTracker.Backfill(ctx, &h15, &finalizedH15) + require.NoError(t, err) + + h := htu.headSaver.LatestChain() + + // Should contain 14, 13 (15 was never added). When trying to get the parent of h13 by hash, a reorg happened and backfill exited. + assert.Equal(t, 1, int(h.ChainLength())) + assert.True(t, h.IsFinalized) + assert.Equal(t, h15.BlockNumber(), h.BlockNumber()) + assert.Equal(t, h15.Hash, h.Hash) + }) } -func createHeadTracker(t *testing.T, ethClient evmclient.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM) *headTrackerUniverse { - lggr := logger.Test(t) +func createHeadTracker(t *testing.T, ethClient *evmclimocks.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM) *headTrackerUniverse { + lggr, ob := logger.TestObserved(t, zap.DebugLevel) hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig) mailMon := mailboxtest.NewMonitor(t) @@ -1002,29 +1005,14 @@ func createHeadTracker(t *testing.T, ethClient evmclient.Client, config headtrac headBroadcaster: hb, headSaver: hs, mailMon: mailMon, + observer: ob, + orm: orm, + ethClient: ethClient, } } -func createHeadTrackerWithNeverSleeper(t *testing.T, ethClient evmclient.Client, cfg chainlink.GeneralConfig, orm headtracker.ORM) *headTrackerUniverse { - evmcfg := evmtest.NewChainScopedConfig(t, cfg) - lggr := logger.Test(t) - hb := headtracker.NewHeadBroadcaster(lggr) - hs := headtracker.NewHeadSaver(lggr, orm, evmcfg.EVM(), evmcfg.EVM().HeadTracker()) - mailMon := mailboxtest.NewMonitor(t) - ht := headtracker.NewHeadTracker(lggr, ethClient, evmcfg.EVM(), evmcfg.EVM().HeadTracker(), hb, hs, mailMon) - _, err := hs.Load(testutils.Context(t)) - require.NoError(t, err) - return &headTrackerUniverse{ - mu: new(sync.Mutex), - headTracker: ht, - headBroadcaster: hb, - headSaver: hs, - mailMon: mailMon, - } -} - -func createHeadTrackerWithChecker(t *testing.T, ethClient evmclient.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM, checker httypes.HeadTrackable) *headTrackerUniverse { - lggr := logger.Test(t) +func createHeadTrackerWithChecker(t *testing.T, ethClient *evmclimocks.Client, config headtracker.Config, htConfig headtracker.HeadTrackerConfig, orm headtracker.ORM, checker httypes.HeadTrackable) *headTrackerUniverse { + lggr, ob := logger.TestObserved(t, zap.DebugLevel) hb := headtracker.NewHeadBroadcaster(lggr) hs := headtracker.NewHeadSaver(lggr, orm, config, htConfig) hb.Subscribe(checker) @@ -1036,6 +1024,9 @@ func createHeadTrackerWithChecker(t *testing.T, ethClient evmclient.Client, conf headBroadcaster: hb, headSaver: hs, mailMon: mailMon, + observer: ob, + orm: orm, + ethClient: ethClient, } } @@ -1046,10 +1037,13 @@ type headTrackerUniverse struct { headBroadcaster httypes.HeadBroadcaster headSaver httypes.HeadSaver mailMon *mailbox.Monitor + observer *observer.ObservedLogs + orm headtracker.ORM + ethClient *evmclimocks.Client } -func (u *headTrackerUniverse) Backfill(ctx context.Context, head *evmtypes.Head, depth uint) error { - return u.headTracker.Backfill(ctx, head, depth) +func (u *headTrackerUniverse) Backfill(ctx context.Context, head, finalizedHead *evmtypes.Head) error { + return u.headTracker.Backfill(ctx, head, finalizedHead) } func (u *headTrackerUniverse) Start(t *testing.T) { diff --git a/core/chains/evm/headtracker/heads.go b/core/chains/evm/headtracker/heads.go index ccb7d9b7336..1edfb3e3788 100644 --- a/core/chains/evm/headtracker/heads.go +++ b/core/chains/evm/headtracker/heads.go @@ -17,9 +17,12 @@ type Heads interface { HeadByHash(hash common.Hash) *evmtypes.Head // AddHeads adds newHeads to the collection, eliminates duplicates, // sorts by head number, fixes parents and cuts off old heads (historyDepth). - AddHeads(historyDepth uint, newHeads ...*evmtypes.Head) + AddHeads(newHeads ...*evmtypes.Head) // Count returns number of heads in the collection. Count() int + // MarkFinalized - finds `finalized` in the LatestHead and marks it and all direct ancestors as finalized. + // Trims old blocks whose height is smaller than minBlockToKeep + MarkFinalized(finalized common.Hash, minBlockToKeep int64) bool } type heads struct { @@ -60,31 +63,62 @@ func (h *heads) Count() int { return len(h.heads) } -func (h *heads) AddHeads(historyDepth uint, newHeads ...*evmtypes.Head) { +// MarkFinalized - marks block with has equal to finalized and all it's direct ancestors as finalized. +// Trims old blocks whose height is smaller than minBlockToKeep +func (h *heads) MarkFinalized(finalized common.Hash, minBlockToKeep int64) bool { h.mu.Lock() defer h.mu.Unlock() - headsMap := make(map[common.Hash]*evmtypes.Head, len(h.heads)+len(newHeads)) - for _, head := range append(h.heads, newHeads...) { + if len(h.heads) == 0 { + return false + } + + // deep copy to avoid race on head.Parent + h.heads = deepCopy(h.heads, minBlockToKeep) + + head := h.heads[0] + foundFinalized := false + for head != nil { + if head.Hash == finalized { + foundFinalized = true + } + + // we might see finalized to move back in chain due to request to lagging RPC, + // we should not override the flag in such cases + head.IsFinalized = head.IsFinalized || foundFinalized + head = head.Parent + } + + return foundFinalized +} + +func deepCopy(oldHeads []*evmtypes.Head, minBlockToKeep int64) []*evmtypes.Head { + headsMap := make(map[common.Hash]*evmtypes.Head, len(oldHeads)) + for _, head := range oldHeads { if head.Hash == head.ParentHash { // shouldn't happen but it is untrusted input continue } + if head.BlockNumber() < minBlockToKeep { + // trim redundant blocks + continue + } // copy all head objects to avoid races when a previous head chain is used // elsewhere (since we mutate Parent here) headCopy := *head headCopy.Parent = nil // always build it from scratch in case it points to a head too old to be included // map eliminates duplicates - headsMap[head.Hash] = &headCopy + // prefer head that was already in heads as it might have been marked as finalized on previous run + if _, ok := headsMap[head.Hash]; !ok { + headsMap[head.Hash] = &headCopy + } } - heads := make([]*evmtypes.Head, len(headsMap)) + heads := make([]*evmtypes.Head, 0, len(headsMap)) // unsorted unique heads { - var i int for _, head := range headsMap { - heads[i] = head - i++ + heads = append(heads, head) } } @@ -94,13 +128,8 @@ func (h *heads) AddHeads(historyDepth uint, newHeads ...*evmtypes.Head) { return heads[i].Number > heads[j].Number }) - // cut off the oldest - if uint(len(heads)) > historyDepth { - heads = heads[:historyDepth] - } - // assign parents - for i := 0; i < len(heads)-1; i++ { + for i := 0; i < len(heads); i++ { head := heads[i] parent, exists := headsMap[head.ParentHash] if exists { @@ -108,6 +137,13 @@ func (h *heads) AddHeads(historyDepth uint, newHeads ...*evmtypes.Head) { } } - // set - h.heads = heads + return heads +} + +func (h *heads) AddHeads(newHeads ...*evmtypes.Head) { + h.mu.Lock() + defer h.mu.Unlock() + + // deep copy to avoid race on head.Parent + h.heads = deepCopy(append(h.heads, newHeads...), 0) } diff --git a/core/chains/evm/headtracker/heads_test.go b/core/chains/evm/headtracker/heads_test.go index 9fa5ed4e548..4241b462363 100644 --- a/core/chains/evm/headtracker/heads_test.go +++ b/core/chains/evm/headtracker/heads_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" @@ -19,18 +20,18 @@ func TestHeads_LatestHead(t *testing.T) { t.Parallel() heads := headtracker.NewHeads() - heads.AddHeads(3, cltest.Head(100), cltest.Head(200), cltest.Head(300)) + heads.AddHeads(cltest.Head(100), cltest.Head(200), cltest.Head(300)) latest := heads.LatestHead() require.NotNil(t, latest) require.Equal(t, int64(300), latest.Number) - heads.AddHeads(3, cltest.Head(250)) + heads.AddHeads(cltest.Head(250)) latest = heads.LatestHead() require.NotNil(t, latest) require.Equal(t, int64(300), latest.Number) - heads.AddHeads(3, cltest.Head(400)) + heads.AddHeads(cltest.Head(400)) latest = heads.LatestHead() require.NotNil(t, latest) require.Equal(t, int64(400), latest.Number) @@ -45,7 +46,7 @@ func TestHeads_HeadByHash(t *testing.T) { cltest.Head(300), } heads := headtracker.NewHeads() - heads.AddHeads(3, testHeads...) + heads.AddHeads(testHeads...) head := heads.HeadByHash(testHeads[1].Hash) require.NotNil(t, head) @@ -61,11 +62,11 @@ func TestHeads_Count(t *testing.T) { heads := headtracker.NewHeads() require.Zero(t, heads.Count()) - heads.AddHeads(3, cltest.Head(100), cltest.Head(200), cltest.Head(300)) + heads.AddHeads(cltest.Head(100), cltest.Head(200), cltest.Head(300)) require.Equal(t, 3, heads.Count()) - heads.AddHeads(1, cltest.Head(400)) - require.Equal(t, 1, heads.Count()) + heads.AddHeads(cltest.Head(400)) + require.Equal(t, 4, heads.Count()) } func TestHeads_AddHeads(t *testing.T) { @@ -88,9 +89,10 @@ func TestHeads_AddHeads(t *testing.T) { parentHash = hash } - heads.AddHeads(6, testHeads...) + heads.AddHeads(testHeads...) + require.Equal(t, 6, heads.Count()) // Add duplicates (should be ignored) - heads.AddHeads(6, testHeads[2:5]...) + heads.AddHeads(testHeads[2:5]...) require.Equal(t, 6, heads.Count()) head := heads.LatestHead() @@ -100,11 +102,62 @@ func TestHeads_AddHeads(t *testing.T) { head = heads.HeadByHash(uncleHash) require.NotNil(t, head) require.Equal(t, 3, int(head.ChainLength())) +} + +func TestHeads_MarkFinalized(t *testing.T) { + t.Parallel() + + heads := headtracker.NewHeads() + + // create chain + // H0 <- H1 <- H2 <- H3 <- H4 <- H5 + // \ \ + // H1Uncle H2Uncle + // + newHead := func(num int, parent common.Hash) *evmtypes.Head { + h := evmtypes.NewHead(big.NewInt(int64(num)), utils.NewHash(), parent, uint64(time.Now().Unix()), ubig.NewI(0)) + return &h + } + h0 := newHead(0, utils.NewHash()) + h1 := newHead(1, h0.Hash) + h1Uncle := newHead(1, h0.Hash) + h2 := newHead(2, h1.Hash) + h3 := newHead(3, h2.Hash) + h4 := newHead(4, h3.Hash) + h5 := newHead(5, h4.Hash) + h2Uncle := newHead(2, h1.Hash) + + allHeads := []*evmtypes.Head{h0, h1, h1Uncle, h2, h2Uncle, h3, h4, h5} + heads.AddHeads(allHeads...) + // mark h3 and all ancestors as finalized + require.True(t, heads.MarkFinalized(h3.Hash, h1.BlockNumber()), "expected MarkFinalized succeed") + + // original heads remain unchanged + for _, h := range allHeads { + assert.False(t, h.IsFinalized, "expected original heads to remain unfinalized") + } + + // h0 is too old. It should not be available directly or through its children + assert.Nil(t, heads.HeadByHash(h0.Hash)) + assert.Nil(t, heads.HeadByHash(h1.Hash).Parent) + assert.Nil(t, heads.HeadByHash(h1Uncle.Hash).Parent) + assert.Nil(t, heads.HeadByHash(h2Uncle.Hash).Parent.Parent) + + require.False(t, heads.MarkFinalized(utils.NewHash(), 0), "expected false if finalized hash was not found in existing LatestHead chain") + + ensureProperFinalization := func(t *testing.T) { + t.Helper() + for _, head := range []*evmtypes.Head{h5, h4} { + require.False(t, heads.HeadByHash(head.Hash).IsFinalized, "expected h4-h5 not to be finalized", head.BlockNumber()) + } + for _, head := range []*evmtypes.Head{h3, h2, h1} { + require.True(t, heads.HeadByHash(head.Hash).IsFinalized, "expected h3 and all ancestors to be finalized", head.BlockNumber()) + } + require.False(t, heads.HeadByHash(h2Uncle.Hash).IsFinalized, "expected uncle block not to be marked as finalized") + + } + t.Run("blocks were correctly marked as finalized", ensureProperFinalization) + heads.AddHeads(h0, h1, h2, h2Uncle, h3, h4, h5) + t.Run("blocks remain finalized after re adding them to the Heads", ensureProperFinalization) - // Adding beyond the limit truncates - heads.AddHeads(2, testHeads...) - require.Equal(t, 2, heads.Count()) - head = heads.LatestHead() - require.NotNil(t, head) - require.Equal(t, 2, int(head.ChainLength())) } diff --git a/core/chains/evm/headtracker/mocks/config.go b/core/chains/evm/headtracker/mocks/config.go index 74376a71362..6cc3900ba42 100644 --- a/core/chains/evm/headtracker/mocks/config.go +++ b/core/chains/evm/headtracker/mocks/config.go @@ -49,6 +49,24 @@ func (_m *Config) FinalityDepth() uint32 { return r0 } +// FinalityTagEnabled provides a mock function with given fields: +func (_m *Config) FinalityTagEnabled() bool { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for FinalityTagEnabled") + } + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // NewConfig creates a new instance of Config. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewConfig(t interface { diff --git a/core/chains/evm/headtracker/orm.go b/core/chains/evm/headtracker/orm.go index a1957388b9b..d2c32581d2f 100644 --- a/core/chains/evm/headtracker/orm.go +++ b/core/chains/evm/headtracker/orm.go @@ -11,6 +11,7 @@ import ( "github.com/jmoiron/sqlx" "github.com/smartcontractkit/chainlink-common/pkg/logger" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/services/pg" @@ -20,12 +21,12 @@ type ORM interface { // IdempotentInsertHead inserts a head only if the hash is new. Will do nothing if hash exists already. // No advisory lock required because this is thread safe. IdempotentInsertHead(ctx context.Context, head *evmtypes.Head) error - // TrimOldHeads deletes heads such that only the top N block numbers remain - TrimOldHeads(ctx context.Context, n uint) (err error) + // TrimOldHeads deletes heads such that only blocks >= minBlockNumber remain + TrimOldHeads(ctx context.Context, minBlockNumber int64) (err error) // LatestHead returns the highest seen head LatestHead(ctx context.Context) (head *evmtypes.Head, err error) - // LatestHeads returns the latest heads up to given limit - LatestHeads(ctx context.Context, limit uint) (heads []*evmtypes.Head, err error) + // LatestHeads returns the latest heads with blockNumbers >= minBlockNumber + LatestHeads(ctx context.Context, minBlockNumber int64) (heads []*evmtypes.Head, err error) // HeadByHash fetches the head with the given hash from the db, returns nil if none exists HeadByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) } @@ -50,19 +51,11 @@ func (orm *orm) IdempotentInsertHead(ctx context.Context, head *evmtypes.Head) e return pkgerrors.Wrap(err, "IdempotentInsertHead failed to insert head") } -func (orm *orm) TrimOldHeads(ctx context.Context, n uint) (err error) { +func (orm *orm) TrimOldHeads(ctx context.Context, minBlockNumber int64) (err error) { q := orm.q.WithOpts(pg.WithParentCtx(ctx)) return q.ExecQ(` DELETE FROM evm.heads - WHERE evm_chain_id = $1 AND number < ( - SELECT min(number) FROM ( - SELECT number - FROM evm.heads - WHERE evm_chain_id = $1 - ORDER BY number DESC - LIMIT $2 - ) numbers - )`, orm.chainID, n) + WHERE evm_chain_id = $1 AND number < $2`, orm.chainID, minBlockNumber) } func (orm *orm) LatestHead(ctx context.Context) (head *evmtypes.Head, err error) { @@ -76,9 +69,9 @@ func (orm *orm) LatestHead(ctx context.Context) (head *evmtypes.Head, err error) return } -func (orm *orm) LatestHeads(ctx context.Context, limit uint) (heads []*evmtypes.Head, err error) { +func (orm *orm) LatestHeads(ctx context.Context, minBlockNumber int64) (heads []*evmtypes.Head, err error) { q := orm.q.WithOpts(pg.WithParentCtx(ctx)) - err = q.Select(&heads, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 ORDER BY number DESC, created_at DESC, id DESC LIMIT $2`, orm.chainID, limit) + err = q.Select(&heads, `SELECT * FROM evm.heads WHERE evm_chain_id = $1 AND number >= $2 ORDER BY number DESC, created_at DESC, id DESC`, orm.chainID, minBlockNumber) err = pkgerrors.Wrap(err, "LatestHeads failed") return } diff --git a/core/chains/evm/headtracker/orm_test.go b/core/chains/evm/headtracker/orm_test.go index c9a2146daf2..7f99e535093 100644 --- a/core/chains/evm/headtracker/orm_test.go +++ b/core/chains/evm/headtracker/orm_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink-common/pkg/logger" + "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" @@ -56,13 +57,17 @@ func TestORM_TrimOldHeads(t *testing.T) { require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), head)) } + uncleHead := cltest.Head(5) + require.NoError(t, orm.IdempotentInsertHead(testutils.Context(t), uncleHead)) + err := orm.TrimOldHeads(testutils.Context(t), 5) require.NoError(t, err) - heads, err := orm.LatestHeads(testutils.Context(t), 10) + heads, err := orm.LatestHeads(testutils.Context(t), 0) require.NoError(t, err) - require.Equal(t, 5, len(heads)) + // uncle block was loaded too + require.Equal(t, 6, len(heads)) for i := 0; i < 5; i++ { require.LessOrEqual(t, int64(5), heads[i].Number) } diff --git a/core/chains/evm/types/head_test.go b/core/chains/evm/types/head_test.go new file mode 100644 index 00000000000..b4f1de25c6e --- /dev/null +++ b/core/chains/evm/types/head_test.go @@ -0,0 +1,51 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestHead_LatestFinalizedHead(t *testing.T) { + t.Parallel() + cases := []struct { + Name string + Head *Head + Finalized *Head + }{ + { + Name: "Empty chain returns nil on finalized", + Head: nil, + Finalized: nil, + }, + { + Name: "Chain without finalized returns nil", + Head: &Head{Parent: &Head{Parent: &Head{}}}, + Finalized: nil, + }, + { + Name: "Returns head if it's finalized", + Head: &Head{Number: 2, IsFinalized: true, Parent: &Head{Number: 1, IsFinalized: true}}, + Finalized: &Head{Number: 2}, + }, + { + Name: "Returns first block in chain if it's finalized", + Head: &Head{Number: 3, IsFinalized: false, Parent: &Head{Number: 2, IsFinalized: true, Parent: &Head{Number: 1, IsFinalized: true}}}, + Finalized: &Head{Number: 2}, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + actual := tc.Head.LatestFinalizedHead() + if tc.Finalized == nil { + assert.Nil(t, actual) + } else { + require.NotNil(t, actual) + assert.Equal(t, tc.Finalized.Number, actual.BlockNumber()) + } + }) + } + +} diff --git a/core/chains/evm/types/models.go b/core/chains/evm/types/models.go index 464eb901005..7f312401f7d 100644 --- a/core/chains/evm/types/models.go +++ b/core/chains/evm/types/models.go @@ -17,6 +17,7 @@ import ( "github.com/ugorji/go/codec" "github.com/smartcontractkit/chainlink-common/pkg/utils/hex" + htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" commontypes "github.com/smartcontractkit/chainlink/v2/common/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" @@ -43,6 +44,7 @@ type Head struct { StateRoot common.Hash Difficulty *big.Int TotalDifficulty *big.Int + IsFinalized bool } var _ commontypes.Head[common.Hash] = &Head{} @@ -165,6 +167,14 @@ func (h *Head) ChainHashes() []common.Hash { return hashes } +func (h *Head) LatestFinalizedHead() commontypes.Head[common.Hash] { + for h != nil && !h.IsFinalized { + h = h.Parent + } + + return h +} + func (h *Head) ChainID() *big.Int { return h.EVMChainID.ToInt() } diff --git a/core/config/docs/chains-evm.toml b/core/config/docs/chains-evm.toml index 7ddd24276c6..dcf2f6e688e 100644 --- a/core/config/docs/chains-evm.toml +++ b/core/config/docs/chains-evm.toml @@ -288,8 +288,8 @@ TransactionPercentile = 60 # Default # # In addition to these settings, it log warnings if `EVM.NoNewHeadsThreshold` is exceeded without any new blocks being emitted. [EVM.HeadTracker] -# HistoryDepth tracks the top N block numbers to keep in the `heads` database table. -# Note that this can easily result in MORE than N records since in the case of re-orgs we keep multiple heads for a particular block height. +# HistoryDepth tracks the top N blocks on top of the latest finalized block to keep in the `heads` database table. +# Note that this can easily result in MORE than `N + finality depth` records since in the case of re-orgs we keep multiple heads for a particular block height. # This number should be at least as large as `FinalityDepth`. # There may be a small performance penalty to setting this to something very large (10,000+) HistoryDepth = 100 # Default diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 08766d64c8b..6fcd1006fd7 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -469,7 +469,7 @@ func NewEthMocksWithStartupAssertions(t testing.TB) *evmclimocks.Client { c.On("Dial", mock.Anything).Maybe().Return(nil) c.On("SubscribeNewHead", mock.Anything, mock.Anything).Maybe().Return(EmptyMockSubscription(t), nil) c.On("SendTransaction", mock.Anything, mock.Anything).Maybe().Return(nil) - c.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Maybe().Return(Head(0), nil) + c.On("HeadByNumber", mock.Anything, mock.Anything).Maybe().Return(Head(0), nil) c.On("ConfiguredChainID").Maybe().Return(&FixtureChainID) c.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Maybe().Return([]byte{}, nil) c.On("SubscribeFilterLogs", mock.Anything, mock.Anything, mock.Anything).Maybe().Return(nil, errors.New("mocked")) @@ -497,6 +497,7 @@ func NewEthMocksWithTransactionsOnBlocksAssertions(t testing.TB) *evmclimocks.Cl h1 := HeadWithHash(1, h2.ParentHash) h0 := HeadWithHash(0, h1.ParentHash) c.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Maybe().Return(h2, nil) + c.On("HeadByNumber", mock.Anything, big.NewInt(0)).Maybe().Return(h0, nil) // finalized block c.On("HeadByHash", mock.Anything, h1.Hash).Maybe().Return(h1, nil) c.On("HeadByHash", mock.Anything, h0.Hash).Maybe().Return(h0, nil) c.On("BatchCallContext", mock.Anything, mock.Anything).Maybe().Return(nil).Run(func(args mock.Arguments) { diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index e66bfc2f4ab..48ec28cbbeb 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -29,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Minimum required version of Postgres is now >= 12. Postgres 11 was EOL'd in November 2023. Added a new version check that will prevent Chainlink from running on EOL'd Postgres. If you are running Postgres <= 11 you should upgrade to the latest version. The check can be forcibly overridden by setting SKIP_PG_VERSION_CHECK=true. +- HeadTracker now respects the `FinalityTagEnabled` config option. If the flag is enabled, HeadTracker backfills blocks up to the latest finalized block provided by the corresponding RPC call. To address potential misconfigurations, `HistoryDepth` is now calculated from the latest finalized block instead of the head. NOTE: Consumers (e.g. TXM and LogPoller) do not fully utilize Finality Tag yet. - Updated the `LimitDefault` and `LimitMax` configs types to `uint64` diff --git a/docs/CONFIG.md b/docs/CONFIG.md index c7b9edf3cad..025995f115b 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -6703,8 +6703,8 @@ In addition to these settings, it log warnings if `EVM.NoNewHeadsThreshold` is e ```toml HistoryDepth = 100 # Default ``` -HistoryDepth tracks the top N block numbers to keep in the `heads` database table. -Note that this can easily result in MORE than N records since in the case of re-orgs we keep multiple heads for a particular block height. +HistoryDepth tracks the top N blocks on top of the latest finalized block to keep in the `heads` database table. +Note that this can easily result in MORE than `N + finality depth` records since in the case of re-orgs we keep multiple heads for a particular block height. This number should be at least as large as `FinalityDepth`. There may be a small performance penalty to setting this to something very large (10,000+)