Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make LogPoller more robust against local finality violations #12605

Merged
merged 9 commits into from
Apr 26, 2024
5 changes: 5 additions & 0 deletions .changeset/fresh-moles-explode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

core/chains/evm/logpoller: Stricter finality checks in LogPoller, to be more robust during rpc failover events #updated
189 changes: 134 additions & 55 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,8 +699,8 @@ func (lp *logPoller) BackupPollAndSaveLogs(ctx context.Context) {
}
return
}
// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber-1, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
// If this is our first run, start from block min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-backupPollerBlockDelay)
backupStartBlock := mathutil.Min(lastProcessed.FinalizedBlockNumber, lastProcessed.BlockNumber-lp.backupPollerBlockDelay)
reductionista marked this conversation as resolved.
Show resolved Hide resolved
// (or at block 0 if whole blockchain is too short)
lp.backupPollerNextBlock = mathutil.Max(backupStartBlock, 0)
}
Expand Down Expand Up @@ -771,11 +771,16 @@ func convertTopics(topics []common.Hash) [][]byte {
return topicsForDB
}

func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log) (blocks []LogPollerBlock, err error) {
// blocksFromLogs fetches all of the blocks associated with a given list of logs. It will also unconditionally fetch endBlockNumber,
// whether or not there are any logs in the list from that block
func (lp *logPoller) blocksFromLogs(ctx context.Context, logs []types.Log, endBlockNumber uint64) (blocks []LogPollerBlock, err error) {
var numbers []uint64
for _, log := range logs {
numbers = append(numbers, log.BlockNumber)
}
if numbers[len(numbers)-1] != endBlockNumber {
numbers = append(numbers, endBlockNumber)
}
return lp.GetBlocksRange(ctx, numbers)
}

Expand All @@ -789,6 +794,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
batchSize := lp.backfillBatchSize
for from := start; from <= end; from += batchSize {
to := mathutil.Min(from+batchSize-1, end)

gethLogs, err := lp.ec.FilterLogs(ctx, lp.Filter(big.NewInt(from), big.NewInt(to), nil))
if err != nil {
var rpcErr client.JsonError
Expand All @@ -810,13 +816,19 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
if len(gethLogs) == 0 {
continue
}
blocks, err := lp.blocksFromLogs(ctx, gethLogs)
blocks, err := lp.blocksFromLogs(ctx, gethLogs, uint64(to))
if err != nil {
return err
}

endblock := blocks[len(blocks)-1]
if gethLogs[len(gethLogs)-1].BlockNumber != uint64(to) {
// Pop endblock if there were no logs for it, so that length of blocks & gethLogs are the same to pass to convertLogs
blocks = blocks[:len(blocks)-1]
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), blocks[len(blocks)-1])
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock)
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -955,19 +967,18 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
currentBlockNumber = lastSafeBackfillBlock + 1
}

if currentBlockNumber > currentBlock.Number {
// If we successfully backfilled we have logs up to and including lastSafeBackfillBlock,
// now load the first unfinalized block.
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err)
return
for {
if currentBlockNumber > currentBlock.Number {
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err)
return
}
currentBlockNumber = currentBlock.Number
}
}

for {
h := currentBlock.Hash
var logs []types.Log
logs, err = lp.ec.FilterLogs(ctx, lp.Filter(nil, nil, &h))
Expand All @@ -992,14 +1003,6 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
if currentBlockNumber > latestBlockNumber {
break
}
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
lp.lggr.Errorw("Unable to get current block", "err", err)
return
}
currentBlockNumber = currentBlock.Number
}
}

Expand Down Expand Up @@ -1252,12 +1255,16 @@ func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64) ([]Lo
return blocks, nil
}

// fillRemainingBlocksFromRPC sends a batch request for each block in blocksRequested, and converts them from
// geth blocks into LogPollerBlock structs. This is only intended to be used for requesting finalized blocks,
// if any of the blocks coming back are not finalized, an error will be returned
func (lp *logPoller) fillRemainingBlocksFromRPC(
ctx context.Context,
blocksRequested map[uint64]struct{},
blocksFound map[uint64]LogPollerBlock,
) (map[uint64]LogPollerBlock, error) {
var remainingBlocks []string

for num := range blocksRequested {
if _, ok := blocksFound[num]; !ok {
remainingBlocks = append(remainingBlocks, hexutil.EncodeBig(new(big.Int).SetUint64(num)))
Expand Down Expand Up @@ -1287,54 +1294,126 @@ func (lp *logPoller) fillRemainingBlocksFromRPC(
return logPollerBlocks, nil
}

func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) {
reqs := make([]rpc.BatchElem, 0, len(blocksRequested))
for _, num := range blocksRequested {
req := rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{num, false},
Result: &evmtypes.Head{},
}
reqs = append(reqs, req)
// newBlockReq constructs an eth_getBlockByNumber request for particular block number
func newBlockReq(num string) rpc.BatchElem {
return rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{num, false},
Result: &evmtypes.Head{},
}
}

for i := 0; i < len(reqs); i += int(batchSize) {
j := i + int(batchSize)
if j > len(reqs) {
j = len(reqs)
}
type blockValidationType string

err := lp.ec.BatchCallContext(ctx, reqs[i:j])
if err != nil {
return nil, err
var (
latestBlock blockValidationType = blockValidationType(rpc.LatestBlockNumber.String())
finalizedBlock blockValidationType = blockValidationType(rpc.FinalizedBlockNumber.String())
)

// fetchBlocks fetches a list of blocks in a single batch. validationReq is the string to use for the
// additional validation request (either the "finalized" or "latest" string defined in rpc module), which
// will be used to validate the finality of the other blocks.
func (lp *logPoller) fetchBlocks(ctx context.Context, blocksRequested []string, validationReq blockValidationType) (blocks []*evmtypes.Head, err error) {
n := len(blocksRequested)
blocks = make([]*evmtypes.Head, 0, n+1)
reqs := make([]rpc.BatchElem, 0, n+1)

validationBlockIndex := n
for k, num := range blocksRequested {
if num == string(validationReq) {
validationBlockIndex = k
}
reqs = append(reqs, newBlockReq(num))
}

if validationBlockIndex == n {
// Add validation req if it wasn't in there already
reqs = append(reqs, newBlockReq(string(validationReq)))
}

err = lp.ec.BatchCallContext(ctx, reqs)
if err != nil {
return nil, err
}

var blocks = make([]*evmtypes.Head, 0, len(reqs))
for _, r := range reqs {
if r.Error != nil {
return nil, r.Error
validationBlock, err := validateBlockResponse(reqs[validationBlockIndex])
if err != nil {
return nil, err
}
latestFinalizedBlockNumber := validationBlock.Number
if validationReq == latestBlock {
// subtract finalityDepth from "latest" to get finalized, when useFinalityTags = false
latestFinalizedBlockNumber = mathutil.Max(latestFinalizedBlockNumber-lp.finalityDepth, 0)
}
if len(reqs) == n+1 {
reqs = reqs[:n] // ignore last req if we added it explicitly for validation
}

for k, r := range reqs {
if k == validationBlockIndex {
// Already validated this one, just insert it in proper place
blocks = append(blocks, validationBlock)
continue
}
block, is := r.Result.(*evmtypes.Head)

if !is {
return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result)
block, err2 := validateBlockResponse(r)
if err2 != nil {
return nil, err2
}
if block == nil {
return nil, pkgerrors.New("invariant violation: got nil block")

blockRequested := r.Args[0].(string)
if blockRequested != string(latestBlock) && block.Number > latestFinalizedBlockNumber {
return nil, fmt.Errorf(
"Received unfinalized block %d while expecting finalized block (latestFinalizedBlockNumber = %d)",
block.Number, latestFinalizedBlockNumber)
}
if block.Hash == (common.Hash{}) {
return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number)

blocks = append(blocks, block)
}
return blocks, nil
}

func (lp *logPoller) batchFetchBlocks(ctx context.Context, blocksRequested []string, batchSize int64) ([]*evmtypes.Head, error) {
var blocks = make([]*evmtypes.Head, 0, len(blocksRequested)+1)

validationReq := finalizedBlock
if !lp.useFinalityTag {
validationReq = latestBlock
}

for i := 0; i < len(blocksRequested); i += int(batchSize) {
j := i + int(batchSize)
if j > len(blocksRequested) {
j = len(blocksRequested)
}
if block.Number < 0 {
return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number)
moreBlocks, err := lp.fetchBlocks(ctx, blocksRequested[i:j], validationReq)
if err != nil {
return nil, err
}
blocks = append(blocks, block)
blocks = append(blocks, moreBlocks...)
}

return blocks, nil
}

func validateBlockResponse(r rpc.BatchElem) (*evmtypes.Head, error) {
block, is := r.Result.(*evmtypes.Head)

if !is {
return nil, pkgerrors.Errorf("expected result to be a %T, got %T", &evmtypes.Head{}, r.Result)
}
if block == nil {
return nil, pkgerrors.New("invariant violation: got nil block")
}
if block.Hash == (common.Hash{}) {
return nil, pkgerrors.Errorf("missing block hash for block number: %d", block.Number)
}
if block.Number < 0 {
return nil, pkgerrors.Errorf("expected block number to be >= to 0, got %d", block.Number)
}
return block, nil
}

// IndexedLogsWithSigsExcluding returns the set difference(A-B) of logs with signature sigA and sigB, matching is done on the topics index
//
// For example, query to retrieve unfulfilled requests by querying request log events without matching fulfillment log events.
Expand Down
Loading
Loading