Skip to content

Commit

Permalink
Implement DSL for LogPoller (#12793)
Browse files Browse the repository at this point in the history
* Implement DSL for LogPoller

This commit implements the query DSL within LogPoller by remapping comparer filters to
topic and word filters. A parser then creates an SQL statement from the remapped events
and queries the log poller database directly.

* added binding check on QueryKey

* update int parsing bit size

* added extensive log poller orm tests for query capability

* update topic filter test

* address comments

* add query name to logpoller ORM FilterLogs for query observability

* update tests from common and return empty list on empty query
  • Loading branch information
EasterTheBunny authored May 16, 2024
1 parent 7be3506 commit ecfab64
Show file tree
Hide file tree
Showing 15 changed files with 1,482 additions and 314 deletions.
4 changes: 2 additions & 2 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash,
return nil, ErrDisabled
}

func (d disabled) FilteredLogs(_ query.KeyFilter, _ query.LimitAndSort) ([]Log, error) {
return nil, nil
func (d disabled) FilteredLogs(_ context.Context, _ query.KeyFilter, _ query.LimitAndSort, _ string) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) {
Expand Down
7 changes: 4 additions & 3 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ type LogPoller interface {
LogsDataWordGreaterThan(ctx context.Context, eventSig common.Hash, address common.Address, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

FilteredLogs(filter query.KeyFilter, limitAndSrt query.LimitAndSort) ([]Log, error)
// chainlink-common query filtering
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type LogPollerTest interface {
Expand Down Expand Up @@ -1522,6 +1523,6 @@ func EvmWord(i uint64) common.Hash {
return common.BytesToHash(b)
}

func (lp *logPoller) FilteredLogs(queryFilter query.KeyFilter, sortAndLimit query.LimitAndSort) ([]Log, error) {
return lp.orm.FilteredLogs(queryFilter, sortAndLimit)
func (lp *logPoller) FilteredLogs(ctx context.Context, queryFilter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return lp.orm.FilteredLogs(ctx, queryFilter, limitAndSort, queryName)
}
18 changes: 9 additions & 9 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
)

type queryType string
Expand Down Expand Up @@ -261,6 +262,12 @@ func (o *ObservedORM) SelectIndexedLogsTopicRange(ctx context.Context, address c
})
}

func (o *ObservedORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error) {
return withObservedQueryAndResults(o, queryName, func() ([]Log, error) {
return o.ORM.FilteredLogs(ctx, filter, limitAndSort, queryName)
})
}

func withObservedQueryAndResults[T any](o *ObservedORM, queryName string, query func() ([]T, error)) ([]T, error) {
results, err := withObservedQuery(o, queryName, query)
if err == nil {
Expand Down
42 changes: 31 additions & 11 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type ORM interface {
SelectLogsDataWordRange(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs evmtypes.Confirmations) ([]Log, error)
SelectLogsDataWordGreaterThan(ctx context.Context, address common.Address, eventSig common.Hash, wordIndex int, wordValueMin common.Hash, confs evmtypes.Confirmations) ([]Log, error)
SelectLogsDataWordBetween(ctx context.Context, address common.Address, eventSig common.Hash, wordIndexMin int, wordIndexMax int, wordValue common.Hash, confs evmtypes.Confirmations) ([]Log, error)

// FilteredLogs accepts chainlink-common filtering DSL.
FilteredLogs(filter query.KeyFilter, sortAndLimit query.LimitAndSort) ([]Log, error)
FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, queryName string) ([]Log, error)
}

type DSORM struct {
Expand Down Expand Up @@ -92,10 +93,10 @@ func (o *DSORM) new(ds sqlutil.DataSource) *DSORM { return NewORM(o.chainID, ds,
// InsertBlock is idempotent to support replays.
func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNumber int64, blockTimestamp time.Time, finalizedBlock int64) error {
args, err := newQueryArgs(o.chainID).
withCustomHashArg("block_hash", blockHash).
withCustomArg("block_number", blockNumber).
withCustomArg("block_timestamp", blockTimestamp).
withCustomArg("finalized_block_number", finalizedBlock).
withField("block_hash", blockHash).
withField("block_number", blockNumber).
withField("block_timestamp", blockTimestamp).
withField("finalized_block_number", finalizedBlock).
toArgs()
if err != nil {
return err
Expand All @@ -115,7 +116,7 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum
func (o *DSORM) InsertFilter(ctx context.Context, filter Filter) (err error) {
topicArrays := []types.HashArray{filter.Topic2, filter.Topic3, filter.Topic4}
args, err := newQueryArgs(o.chainID).
withCustomArg("name", filter.Name).
withField("name", filter.Name).
withRetention(filter.Retention).
withMaxLogsKept(filter.MaxLogsKept).
withLogsPerBlock(filter.LogsPerBlock).
Expand Down Expand Up @@ -930,8 +931,8 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
withTopicIndex(topicIndex).
withStartBlock(startBlock).
withEndBlock(endBlock).
withCustomHashArg("sigA", sigA).
withCustomHashArg("sigB", sigB).
withField("sigA", sigA).
withField("sigB", sigB).
withConfs(confs).
toArgs()
if err != nil {
Expand Down Expand Up @@ -970,9 +971,28 @@ func (o *DSORM) SelectIndexedLogsWithSigsExcluding(ctx context.Context, sigA, si
return logs, nil
}

func (o *DSORM) FilteredLogs(_ query.KeyFilter, _ query.LimitAndSort) ([]Log, error) {
//TODO implement me
panic("implement me")
func (o *DSORM) FilteredLogs(ctx context.Context, filter query.KeyFilter, limitAndSort query.LimitAndSort, _ string) ([]Log, error) {
qs, args, err := (&pgDSLParser{}).buildQuery(o.chainID, filter.Expressions, limitAndSort)
if err != nil {
return nil, err
}

values, err := args.toArgs()
if err != nil {
return nil, err
}

query, sqlArgs, err := o.ds.BindNamed(qs, values)
if err != nil {
return nil, err
}

var logs []Log
if err = o.ds.SelectContext(ctx, &logs, query, sqlArgs...); err != nil {
return nil, err
}

return logs, nil
}

func nestedBlockNumberQuery(confs evmtypes.Confirmations) string {
Expand Down
Loading

0 comments on commit ecfab64

Please sign in to comment.