Skip to content

Commit

Permalink
Backporting LogPoller's function used in CCIP (#10645)
Browse files Browse the repository at this point in the history
* Backporting LogPoller's function used in CCIP

* Missing tests added as well
  • Loading branch information
mateusz-sekara authored Sep 18, 2023
1 parent 157870f commit fb645c1
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 1 deletion.
4 changes: 4 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,7 @@ func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.A
func (d disabled) LatestBlockByEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}

func (d disabled) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}
7 changes: 7 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type LogPoller interface {
IndexedLogsWithSigsExcluding(address common.Address, eventSigA, eventSigB common.Hash, topicIndex int, fromBlock, toBlock int64, confs int, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordRange(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
LogsDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -976,6 +977,12 @@ func (lp *logPoller) IndexedLogsTopicGreaterThan(eventSig common.Hash, address c
return lp.orm.SelectIndexLogsTopicGreaterThan(address, eventSig, topicIndex, topicValueMin, confs, qopts...)
}

// LogsUntilBlockHashDataWordGreaterThan note index is 0 based.
// If the blockhash is not found (i.e. a stale fork) it will error.
func (lp *logPoller) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectUntilBlockHashDataWordGreaterThan(address, eventSig, wordIndex, wordValueMin, untilBlockHash, qopts...)
}

func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.Address, topicIndex int, topicValueMin common.Hash, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return lp.orm.SelectIndexLogsTopicRange(address, eventSig, topicIndex, topicValueMin, topicValueMax, confs, qopts...)
}
Expand Down
33 changes: 33 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func (o *ObservedLogPoller) LogsDataWordGreaterThan(eventSig common.Hash, addres
})
}

func (o *ObservedLogPoller) LogsUntilBlockHashDataWordGreaterThan(eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQueryAndResults(o, "LogsUntilBlockHashDataWordGreaterThan", func() ([]Log, error) {
return o.LogPoller.LogsUntilBlockHashDataWordGreaterThan(eventSig, address, wordIndex, wordValueMin, untilBlockHash, qopts...)
})
}

func withObservedQueryAndResults[T any](o *ObservedLogPoller, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
Expand Down
3 changes: 2 additions & 1 deletion core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = lp.LatestLogByEventSigWithConfs(common.Hash{}, common.Address{}, 0, pg.WithParentCtx(ctx))
_, _ = lp.LatestLogEventSigsAddrsWithConfs(0, []common.Hash{{}}, []common.Address{{}}, 1, pg.WithParentCtx(ctx))
_, _ = lp.IndexedLogsCreatedAfter(common.Hash{}, common.Address{}, 0, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx))
_, _ = lp.LogsUntilBlockHashDataWordGreaterThan(common.Hash{}, common.Address{}, 0, common.Hash{}, common.Hash{}, pg.WithParentCtx(ctx))

require.Equal(t, 11, testutil.CollectAndCount(lp.queryDuration))
require.Equal(t, 12, testutil.CollectAndCount(lp.queryDuration))
require.Equal(t, 10, testutil.CollectAndCount(lp.datasetSize))
resetMetrics(*lp)
}
Expand Down
25 changes: 25 additions & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,31 @@ func (o *ORM) SelectIndexLogsTopicGreaterThan(address common.Address, eventSig c
return logs, nil
}

func (o *ORM) SelectUntilBlockHashDataWordGreaterThan(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, untilBlockHash common.Hash, qopts ...pg.QOpt) ([]Log, error) {
var logs []Log
q := o.q.WithOpts(qopts...)
err := q.Transaction(func(tx pg.Queryer) error {
// We want to mimic the behaviour of the ETH RPC which errors if blockhash not found.
var block LogPollerBlock
if err := tx.Get(&block,
`SELECT * FROM evm.log_poller_blocks
WHERE evm_chain_id = $1 AND block_hash = $2`, utils.NewBig(o.chainID), untilBlockHash); err != nil {
return err
}
return q.Select(&logs,
`SELECT * FROM evm.logs
WHERE evm_chain_id = $1
AND address = $2 AND event_sig = $3
AND substring(data from 32*$4+1 for 32) >= $5
AND block_number <= $6
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), wordIndex, wordValueMin.Bytes(), block.BlockNumber)
})
if err != nil {
return nil, err
}
return logs, nil
}

func (o *ORM) SelectIndexLogsTopicRange(address common.Address, eventSig common.Hash, topicIndex int, topicValueMin, topicValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
if err := validateTopicIndex(topicIndex); err != nil {
return nil, err
Expand Down
15 changes: 15 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,21 @@ func TestORM_DataWords(t *testing.T) {
lgs, err = o1.SelectDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), 0)
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))

// Unknown hash should an error
lgs, err = o1.SelectUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x3"))
require.Error(t, err)
assert.Equal(t, 0, len(lgs))

// 1 block should include first log
lgs, err = o1.SelectUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x1"))
require.NoError(t, err)
assert.Equal(t, 1, len(lgs))

// 2 block should include both
lgs, err = o1.SelectUntilBlockHashDataWordGreaterThan(addr, eventSig, 0, logpoller.EvmWord(1), common.HexToHash("0x2"))
require.NoError(t, err)
assert.Equal(t, 2, len(lgs))
}

func TestORM_SelectLogsWithSigsByBlockRangeFilter(t *testing.T) {
Expand Down

0 comments on commit fb645c1

Please sign in to comment.