Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1906 Raising a flag when finality is violated #12564

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silent-pets-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Exposing information about LogPoller finality violation via Healthy method. It's raised whenever LogPoller sees reorg deeper than the finality
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (disabled) Start(ctx context.Context) error { return ErrDisabled }

func (disabled) Close() error { return ErrDisabled }

func (disabled) Healthy() error {
return ErrDisabled
}

func (disabled) Ready() error { return ErrDisabled }

func (disabled) HealthReport() map[string]error {
Expand Down
26 changes: 25 additions & 1 deletion core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -34,6 +35,7 @@ import (
//go:generate mockery --quiet --name LogPoller --output ./mocks/ --case=underscore --structname LogPoller --filename log_poller.go
type LogPoller interface {
services.Service
Healthy() error
Replay(ctx context.Context, fromBlock int64) error
ReplayAsync(fromBlock int64)
RegisterFilter(ctx context.Context, filter Filter) error
Expand Down Expand Up @@ -92,6 +94,7 @@ var (
ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled")
ErrReplayInProgress = pkgerrors.New("replay request cancelled, but replay is already in progress")
ErrLogPollerShutdown = pkgerrors.New("replay aborted due to log poller shutdown")
ErrFinalityViolated = pkgerrors.New("finality violated")
)

type logPoller struct {
Expand Down Expand Up @@ -120,6 +123,12 @@ type logPoller struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// This flag is raised whenever the log poller detects that the chain's finality has been violated.
// It can happen when reorg is deeper than the latest finalized block that LogPoller saw in a previous PollAndSave tick.
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
}

type Opts struct {
Expand Down Expand Up @@ -163,6 +172,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
}
}

Expand Down Expand Up @@ -466,6 +476,13 @@ func (lp *logPoller) Close() error {
})
}

func (lp *logPoller) Healthy() error {
if lp.finalityViolated.Load() {
return ErrFinalityViolated
}
return nil
}

func (lp *logPoller) Name() string {
return lp.lggr.Name()
}
Expand Down Expand Up @@ -786,7 +803,13 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
// 1. Find the LCA by following parent hashes.
// 2. Delete all logs and blocks after the LCA
// 3. Return the LCA+1, i.e. our new current (unprocessed) block.
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (*evmtypes.Head, error) {
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) {
defer func() {
if err == nil {
lp.finalityViolated.Store(false)
}
}()

var err1 error
if currentBlock == nil {
// If we don't have the current block already, lets get it.
Expand Down Expand Up @@ -1012,6 +1035,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He
lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber)
rerr := pkgerrors.New("Reorg greater than finality depth")
lp.SvcErrBuffer.Append(rerr)
lp.finalityViolated.Store(true)
return nil, rerr
}

Expand Down
89 changes: 89 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,93 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) {
}
}

func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {
tests := []struct {
name string
finalityDepth int64
finalityTag bool
}{
{
name: "fixed finality depth without finality tag",
finalityDepth: 1,
finalityTag: false,
},
{
name: "chain finality in use",
finalityDepth: 0,
finalityTag: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, logpoller.Opts{
UseFinalityTag: tt.finalityTag,
FinalityDepth: tt.finalityDepth,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
BackupPollerBlockDelay: 100,
})
// Set up a log poller listening for log emitter logs.
err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{
Name: "Test Emitter",
EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID},
Addresses: []common.Address{th.EmitterAddress1},
})
require.NoError(t, err)

// Test scenario
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)})
require.NoError(t, err)
th.Client.Commit()
th.Client.Commit()
th.Client.Commit()
markBlockAsFinalized(t, th, 3)

// Polling should get us the L1 log.
firstPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(5), firstPoll)
assert.NoError(t, th.LogPoller.Healthy())

// Fork deeper than finality depth
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
lca, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1))
require.NoError(t, err)
require.NoError(t, th.Client.Fork(testutils.Context(t), lca.Hash()))

// Create 2'
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)})
require.NoError(t, err)
th.Client.Commit()

// Create 3-10
for i := 3; i < 10; i++ {
_, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))})
require.NoError(t, err)
th.Client.Commit()
}
markBlockAsFinalized(t, th, 6)

secondPoll := th.PollAndSaveLogs(testutils.Context(t), firstPoll)
assert.Equal(t, firstPoll, secondPoll)
assert.Equal(t, logpoller.ErrFinalityViolated, th.LogPoller.Healthy())

// Manually remove latest block from the log poller to bring it back to life
// LogPoller should be healthy again after first poll
// Chain gen <- 1
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
require.NoError(t, th.ORM.DeleteLogsAndBlocksAfter(testutils.Context(t), 2))
// Poll from latest
recoveryPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(10), recoveryPoll)
assert.NoError(t, th.LogPoller.Healthy())
})
}
}

func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1089,6 +1176,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

// Polling should get us the L1 log.
newStart := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.NoError(t, th.LogPoller.Healthy())
assert.Equal(t, int64(3), newStart)
// Check that L1_1 has a proper data payload
lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2)
Expand All @@ -1115,6 +1203,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) {

newStart = th.PollAndSaveLogs(testutils.Context(t), newStart)
assert.Equal(t, int64(10), newStart)
assert.NoError(t, th.LogPoller.Healthy())

// Expect L1_2 to be properly updated
lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2)
Expand Down
18 changes: 18 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading