diff --git a/.changeset/silent-pets-sip.md b/.changeset/silent-pets-sip.md new file mode 100644 index 00000000000..ba2417f0922 --- /dev/null +++ b/.changeset/silent-pets-sip.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Exposing information about LogPoller finality violation via Healthy method. It's raised whenever LogPoller sees reorg deeper than the finality diff --git a/core/chains/evm/logpoller/disabled.go b/core/chains/evm/logpoller/disabled.go index 8287aed22a4..0f8faba2695 100644 --- a/core/chains/evm/logpoller/disabled.go +++ b/core/chains/evm/logpoller/disabled.go @@ -21,6 +21,10 @@ func (disabled) Start(ctx context.Context) error { return ErrDisabled } func (disabled) Close() error { return ErrDisabled } +func (disabled) Healthy() error { + return ErrDisabled +} + func (disabled) Ready() error { return ErrDisabled } func (disabled) HealthReport() map[string]error { diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index ffcfa96b4d2..490a104aa05 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -11,6 +11,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum" @@ -34,6 +35,7 @@ import ( //go:generate mockery --quiet --name LogPoller --output ./mocks/ --case=underscore --structname LogPoller --filename log_poller.go type LogPoller interface { services.Service + Healthy() error Replay(ctx context.Context, fromBlock int64) error ReplayAsync(fromBlock int64) RegisterFilter(ctx context.Context, filter Filter) error @@ -92,6 +94,7 @@ var ( ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled") ErrReplayInProgress = pkgerrors.New("replay request cancelled, but replay is already in progress") ErrLogPollerShutdown = pkgerrors.New("replay aborted due to log poller shutdown") + ErrFinalityViolated = pkgerrors.New("finality violated") ) type logPoller struct { @@ -120,6 +123,12 @@ type logPoller struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup + // This flag is raised whenever the log poller detects that the chain's finality has been violated. + // It can happen when reorg is deeper than the latest finalized block that LogPoller saw in a previous PollAndSave tick. + // Usually the only way to recover is to manually remove the offending logs and block from the database. + // LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should + // recover automatically without needing to restart the LogPoller. + finalityViolated *atomic.Bool } type Opts struct { @@ -163,6 +172,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, opts Opts) *logPoller logPrunePageSize: opts.LogPrunePageSize, filters: make(map[string]Filter), filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. + finalityViolated: new(atomic.Bool), } } @@ -466,6 +476,13 @@ func (lp *logPoller) Close() error { }) } +func (lp *logPoller) Healthy() error { + if lp.finalityViolated.Load() { + return ErrFinalityViolated + } + return nil +} + func (lp *logPoller) Name() string { return lp.lggr.Name() } @@ -786,7 +803,13 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { // 1. Find the LCA by following parent hashes. // 2. Delete all logs and blocks after the LCA // 3. Return the LCA+1, i.e. our new current (unprocessed) block. -func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (*evmtypes.Head, error) { +func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) { + defer func() { + if err == nil { + lp.finalityViolated.Store(false) + } + }() + var err1 error if currentBlock == nil { // If we don't have the current block already, lets get it. @@ -1012,6 +1035,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He lp.lggr.Criticalw("Reorg greater than finality depth detected", "finalityTag", lp.useFinalityTag, "current", current.Number, "latestFinalized", latestFinalizedBlockNumber) rerr := pkgerrors.New("Reorg greater than finality depth") lp.SvcErrBuffer.Append(rerr) + lp.finalityViolated.Store(true) return nil, rerr } diff --git a/core/chains/evm/logpoller/log_poller_test.go b/core/chains/evm/logpoller/log_poller_test.go index a8301a51e47..6605272c672 100644 --- a/core/chains/evm/logpoller/log_poller_test.go +++ b/core/chains/evm/logpoller/log_poller_test.go @@ -1040,6 +1040,93 @@ func TestLogPoller_PollAndSaveLogs(t *testing.T) { } } +func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) { + tests := []struct { + name string + finalityDepth int64 + finalityTag bool + }{ + { + name: "fixed finality depth without finality tag", + finalityDepth: 1, + finalityTag: false, + }, + { + name: "chain finality in use", + finalityDepth: 0, + finalityTag: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + th := SetupTH(t, logpoller.Opts{ + UseFinalityTag: tt.finalityTag, + FinalityDepth: tt.finalityDepth, + BackfillBatchSize: 3, + RpcBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + BackupPollerBlockDelay: 100, + }) + // Set up a log poller listening for log emitter logs. + err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + Name: "Test Emitter", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + }) + require.NoError(t, err) + + // Test scenario + // Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1) + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(1)}) + require.NoError(t, err) + th.Client.Commit() + th.Client.Commit() + th.Client.Commit() + markBlockAsFinalized(t, th, 3) + + // Polling should get us the L1 log. + firstPoll := th.PollAndSaveLogs(testutils.Context(t), 1) + assert.Equal(t, int64(5), firstPoll) + assert.NoError(t, th.LogPoller.Healthy()) + + // Fork deeper than finality depth + // Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1) + // \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2) + lca, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1)) + require.NoError(t, err) + require.NoError(t, th.Client.Fork(testutils.Context(t), lca.Hash())) + + // Create 2' + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(2)}) + require.NoError(t, err) + th.Client.Commit() + + // Create 3-10 + for i := 3; i < 10; i++ { + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(i))}) + require.NoError(t, err) + th.Client.Commit() + } + markBlockAsFinalized(t, th, 6) + + secondPoll := th.PollAndSaveLogs(testutils.Context(t), firstPoll) + assert.Equal(t, firstPoll, secondPoll) + assert.Equal(t, logpoller.ErrFinalityViolated, th.LogPoller.Healthy()) + + // Manually remove latest block from the log poller to bring it back to life + // LogPoller should be healthy again after first poll + // Chain gen <- 1 + // \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2) + require.NoError(t, th.ORM.DeleteLogsAndBlocksAfter(testutils.Context(t), 2)) + // Poll from latest + recoveryPoll := th.PollAndSaveLogs(testutils.Context(t), 1) + assert.Equal(t, int64(10), recoveryPoll) + assert.NoError(t, th.LogPoller.Healthy()) + }) + } +} + func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { t.Parallel() @@ -1089,6 +1176,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { // Polling should get us the L1 log. newStart := th.PollAndSaveLogs(testutils.Context(t), 1) + assert.NoError(t, th.LogPoller.Healthy()) assert.Equal(t, int64(3), newStart) // Check that L1_1 has a proper data payload lgs, err := th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) @@ -1115,6 +1203,7 @@ func TestLogPoller_PollAndSaveLogsDeepReorg(t *testing.T) { newStart = th.PollAndSaveLogs(testutils.Context(t), newStart) assert.Equal(t, int64(10), newStart) + assert.NoError(t, th.LogPoller.Healthy()) // Expect L1_2 to be properly updated lgs, err = th.ORM.SelectLogsByBlockRange(testutils.Context(t), 2, 2) diff --git a/core/chains/evm/logpoller/mocks/log_poller.go b/core/chains/evm/logpoller/mocks/log_poller.go index 2bf24881405..ea129dfc4ef 100644 --- a/core/chains/evm/logpoller/mocks/log_poller.go +++ b/core/chains/evm/logpoller/mocks/log_poller.go @@ -105,6 +105,24 @@ func (_m *LogPoller) HealthReport() map[string]error { return r0 } +// Healthy provides a mock function with given fields: +func (_m *LogPoller) Healthy() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Healthy") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + // IndexedLogs provides a mock function with given fields: ctx, eventSig, address, topicIndex, topicValues, confs func (_m *LogPoller) IndexedLogs(ctx context.Context, eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs logpoller.Confirmations) ([]logpoller.Log, error) { ret := _m.Called(ctx, eventSig, address, topicIndex, topicValues, confs)