Skip to content

Commit

Permalink
Post review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusz-sekara committed Mar 7, 2024
1 parent 8e60ec1 commit b3f1c9b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
19 changes: 11 additions & 8 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,16 @@ type logPoller struct {
cachedAddresses []common.Address
cachedEventSigs []common.Hash

replayStart chan int64
replayComplete chan error
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
replayStart chan int64
replayComplete chan error
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// This flag is raised whenever the log poller detects that the chain's finality has been violated.
// It can happen when reorg is deeper than the latest finalized block that LogPoller saw in a previous PollAndSave tick.
// Usually the only way to recover is to manually remove the offending logs and block from the database.
// LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should
// recover automatically without needing to restart the LogPoller.
finalityViolated *atomic.Bool
}

Expand All @@ -137,8 +142,6 @@ type logPoller struct {
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Duration,
useFinalityTag bool, finalityDepth int64, backfillBatchSize int64, rpcBatchSize int64, keepFinalizedBlocksDepth int64) *logPoller {
ctx, cancel := context.WithCancel(context.Background())
isReorged := atomic.Bool{}
isReorged.Store(false)
return &logPoller{
ctx: ctx,
cancel: cancel,
Expand All @@ -155,7 +158,7 @@ func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, pollPeriod time.Durati
keepFinalizedBlocksDepth: keepFinalizedBlocksDepth,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: &isReorged,
finalityViolated: new(atomic.Bool),
}
}

Expand Down
6 changes: 3 additions & 3 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@ func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {

// Fork deeper than finality depth
// Chain gen <- 1 <- 2 <- 3 (finalized) <- 4 (L1_1)
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
lca, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(1))
require.NoError(t, err)
require.NoError(t, th.Client.Fork(testutils.Context(t), lca.Hash()))
Expand All @@ -1049,12 +1049,12 @@ func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {

secondPoll := th.PollAndSaveLogs(testutils.Context(t), firstPoll)
assert.Equal(t, firstPoll, secondPoll)
assert.Error(t, th.LogPoller.Ready())
assert.Equal(t, logpoller.ErrFinalityViolated, th.LogPoller.Ready())

// Manually remove latest block from the log poller to bring it back to life
// LogPoller should be healthy again after first poll
// Chain gen <- 1
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
// \ 2' <- 3' <- 4' <- 5' <- 6' (finalized) <- 7' <- 8' <- 9' <- 10' (L1_2)
require.NoError(t, th.ORM.DeleteLogsAndBlocksAfter(2))
// Poll from latest
recoveryPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
Expand Down

0 comments on commit b3f1c9b

Please sign in to comment.