Skip to content

Commit

Permalink
Make LogPoller more robust against local finality violations (#12605)
Browse files Browse the repository at this point in the history
* Reduce unnecessary code duplication

getCurrentBlockMaybeHandleReorg is called just before the for loop over
unfinalized blocks begins, and at the end of each iteration. Simplifying
by moving them both to the beginning of the for loop

* Fix bugs in TestLogPoller_BackupPollAndSaveLogsSkippingLogsThatAreTooOld

This fixes 2 bugs on develop branch in this test, and removes some
unused commented code.

First Bug
=========
The first bug was causing a false positive PASS on develop branch, which
was obscuring a (very minor) bug in BackupPoller that's been fixed in this PR.

The comment here about what the test was intended to test is still correct:

 // Only the 2nd batch + 1 log from a previous batch should be backfilled, because we perform backfill starting
 // from one block behind the latest finalized block

Contrary to the comment, the code was returning 2 logs from the 1st batch (Data=9 & Data=10), plus 9 of 10 logs
from the 2nd batch. This was incorrect behavior, but the test was also checking for the same incorrect behavior
(looking for 11 logs with first one being Data=9) instead of what's described in the comment.

The bug in the production code was that it starts the Backup Poller at Finalized - 1 instead of Finalized.
This is a harmless "bug", just unnecessarily starting a block too early, since there's no reason for backup
logpoller to re-request the same finalized logs that's already been processed.

Now, the code returns the last log from the 1st batch + all but one logs
from the 2nd batch, which is correct. (It can't return the last log
because that goes beyond the last safe block.) So the test checks that
there are 10 logs with first one being Data=10 (last log from the first
batch.)

Second Bug
==========
The second bug was passing firstBatchBlock and secondBatchBlock directly
to markBlockAsFinalized() instead of passing firstBatchBlock - 1 and
secondBatchBlock - 1. This was only working because of a bug in the
version of geth we're currently using: when you request the pending
block from simulated geth, it gives you back the current block (1 block
prior) instead of the current block. (For example, in the first case,
even though we wanted block 11, the latest current block, we request
block 12 and get back block 11.) This has been fixed in the latest
version of geth... so presumably if we don't fix this here the test
would have started failing as soon as we upgrade to the latest version
of geth. It doesn't change any behavior of the test for the present
version of geth, just makes it more clear that we want block 11 not 12.

* Check that all results from batchFetchBlocks() are finalized aside from "latest"

batchFetchBlocks() will now fetch the "finalized" block along with the
rest of each batch, and validate that all of the block numbers (aside from the
special when "lateest" is requested) are <= the finalized block number
returned.

Also, change backfill() to always save the last block of each batch of
logs requested, rather than the last block of the logs returned. This
only makes a difference if the last block requested has no logs matching
the filter, but this change is essential for being able to safely change
lastSafeBlockNumber from latestFinalizedBlock - 1 to latestFinalizedBlock

* Update logpoller tests

* fix merge conflict

* reduce cognitive complexity

* Add validationReqType type definition

* Fix comments

* Add Test_FetchBlocks
  • Loading branch information
reductionista committed Apr 26, 2024
1 parent aab1519 commit 1d9dd46
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 89 deletions.
5 changes: 5 additions & 0 deletions .changeset/fresh-moles-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

core/chains/evm/logpoller: Stricter finality checks in LogPoller, to be more robust during rpc failover events #updated
189 changes: 134 additions & 55 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
}
return
}
// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
// (or at block 0 if whole blockchain is too short)
lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0)
}
Expand Down Expand Up @@ -771,11 +771,16 @@ func convertTopics(topics []common.Hash) [][]byte {
return topicsForDB
}

func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log) (blocks []LogPollerBlock, err error) {
// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber,
// whether or not there are any logs in the list from that block
func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) {
var numbers []uint64
for _, log := range logs {
numbers = append(numbers, log.BlockNumber)
}
if numbers[len(numbers)-1] != endBlockNumber {
numbers = append(numbers, endBlockNumber)
}
return lp.GetBlocksRange(ctx, numbers)
}

Expand All @@ -789,6 +794,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
batchSize := lp.backfillBatchSize
for from := start; from <= end; from += batchSize {
to := mathutil.Min(from+batchSize-1, end)

gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil))
if err != nil {
var rpcErr client.JsonError
Expand All @@ -810,13 +816,19 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
if len(gethLogs) == 0 {
continue
}
blocks, err := lp.blocksFromLogs(ctx, gethLogs)
blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to))
if err != nil {
return err
}

endblock := blocks[len(blocks)-1]
if gethLogs[len(gethLogs)-1].BlockNumber != uint64(to) {
// Pop endblock if there were no logs for it, so that length of blocks & gethLogs are the same to pass to convertLogs
blocks = blocks[:len(blocks)-1]
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), blocks[len(blocks)-1])
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock)
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -955,19 +967,18 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
currentBlockNumber = lastSafeBackfillBlock + 1
}

if currentBlockNumber > currentBlock.Number {
// If we successfully backfilled we have logs up to and including lastSafeBackfillBlock,
// now load the first unfinalized block.
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err)
return
for {
if currentBlockNumber > currentBlock.Number {
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err)
return
}
currentBlockNumber = currentBlock.Number
}
}

for {
h := currentBlock.Hash
var logs []types.Log
logs, err = lp.ec.FilterLogs(ctx, lp.Filter(nil, nil, &h))
Expand All @@ -992,14 +1003,6 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
if currentBlockNumber > latestBlockNumber {
break
}
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err)
return
}
currentBlockNumber = currentBlock.Number
}
}

Expand Down Expand Up @@ -1252,12 +1255,16 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo
return blocks, nil
}

// fillRemainingBlocksFromRPC sends a batch request for each block in blocksRequested, and converts them from
// geth blocks into LogPollerBlock structs. This is only intended to be used for requesting finalized blocks,
// if any of the blocks coming back are not finalized, an error will be returned
func (lp *logPoller) fillRemainingBlocksFromRPC(
ctx context.Context,
blocksRequested map[uint64]struct{},
blocksFound map[uint64]LogPollerBlock,
) (map[uint64]LogPollerBlock, error) {
var remainingBlocks []string

for num := range blocksRequested {
if _, ok := blocksFound[num]; !ok {
remainingBlocks = append(remainingBlocks, hexutil.EncodeBig(new(big.Int).SetUint64(num)))
Expand Down Expand Up @@ -1287,54 +1294,126 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
return logPollerBlocks, nil
}

func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) {
reqs := make([]rpc.BatchElem, 0, len(blocksRequested))
for _, num := range blocksRequested {
req := rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{num, false},
Result: &evmtypes.Head{},
}
reqs = append(reqs, req)
// newBlockReq constructs an eth_getBlockByNumber request for particular block number
func newBlockReq(num string) rpc.BatchElem {
return rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{num, false},
Result: &evmtypes.Head{},
}
}

for i := 0; i < len(reqs); i += int(batchSize) {
j := i + int(batchSize)
if j > len(reqs) {
j = len(reqs)
}
type blockValidationType string

err := lp.ec.BatchCallContext(ctx, reqs[i:j])
if err != nil {
return nil, err
var (
latestBlock blockValidationType = blockValidationType(rpc.LatestBlockNumber.String())
finalizedBlock blockValidationType = blockValidationType(rpc.FinalizedBlockNumber.String())
)

// fetchBlocks fetches a list of blocks in a single batch. validationReq is the string to use for the
// additional validation request (either the "finalized" or "latest" string defined in rpc module), which
// will be used to validate the finality of the other blocks.
func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, validationReq blockValidationType) (blocks []*evmtypes.Head, err error) {
n := len(blocksRequested)
blocks = make([]*evmtypes.Head, 0, n+1)
reqs := make([]rpc.BatchElem, 0, n+1)

validationBlockIndex := n
for k, num := range blocksRequested {
if num == string(validationReq) {
validationBlockIndex = k
}
reqs = append(reqs, newBlockReq(num))
}

if validationBlockIndex == n {
// Add validation req if it wasn't in there already
reqs = append(reqs, newBlockReq(string(validationReq)))
}

err = lp.ec.BatchCallContext(ctx, reqs)
if err != nil {
return nil, err
}

var blocks = make([]*evmtypes.Head, 0, len(reqs))
for _, r := range reqs {
if r.Error != nil {
return nil, r.Error
validationBlock, err := validateBlockResponse(reqs[validationBlockIndex])
if err != nil {
return nil, err
}
latestFinalizedBlockNumber := validationBlock.Number
if validationReq == latestBlock {
// subtract finalityDepth from "latest" to get finalized, when useFinalityTags = false
latestFinalizedBlockNumber = mathutil.Max(latestFinalizedBlockNumber-lp.finalityDepth, 0)
}
if len(reqs) == n+1 {
reqs = reqs[:n] // ignore last req if we added it explicitly for validation
}

for k, r := range reqs {
if k == validationBlockIndex {
// Already validated this one, just insert it in proper place
blocks = append(blocks, validationBlock)
continue
}
block, is := r.Result.(*evmtypes.Head)

if !is {
return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result)
block, err2 := validateBlockResponse(r)
if err2 != nil {
return nil, err2
}
if block == nil {
return nil, pkgerrors.New("invariant violation: got nil block")

blockRequested := r.Args[0].(string)
if blockRequested != string(latestBlock) && block.Number > latestFinalizedBlockNumber {
return nil, fmt.Errorf(
"Received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)",
block.Number, latestFinalizedBlockNumber)
}
if block.Hash == (common.Hash{}) {
return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number)

blocks = append(blocks, block)
}
return blocks, nil
}

func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) {
var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1)

validationReq := finalizedBlock
if !lp.useFinalityTag {
validationReq = latestBlock
}

for i := 0; i < len(blocksRequested); i += int(batchSize) {
j := i + int(batchSize)
if j > len(blocksRequested) {
j = len(blocksRequested)
}
if block.Number < 0 {
return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number)
moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq)
if err != nil {
return nil, err
}
blocks = append(blocks, block)
blocks = append(blocks, moreBlocks...)
}

return blocks, nil
}

func validateBlockResponse(r rpc.BatchElem) (*evmtypes.Head, error) {
block, is := r.Result.(*evmtypes.Head)

if !is {
return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result)
}
if block == nil {
return nil, pkgerrors.New("invariant violation: got nil block")
}
if block.Hash == (common.Hash{}) {
return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number)
}
if block.Number < 0 {
return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number)
}
return block, nil
}

// IndexedLogsWithSigsExcluding returns the set difference(A-B) of logs with signature sigA and sigB, matching is done on the topics index
//
// For example, query to retrieve unfulfilled requests by querying request log events without matching fulfillment log events.
Expand Down
Loading

0 comments on commit 1d9dd46

Please sign in to comment.