Skip to content

Commit

Permalink
Returning only block_number from log poller (#9748)
Browse files Browse the repository at this point in the history
* Adding SelectLatestBlockNumberEventSigsAddrsWithConfs that returns only block number, without returning entire evm logs dataset

* Minor formatting fix
  • Loading branch information
mateusz-sekara authored Jul 12, 2023
1 parent c9a4b6b commit 2cec7c5
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 26 deletions.
5 changes: 4 additions & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,12 @@ func (d disabled) IndexedLogsWithSigsExcluding(address common.Address, eventSigA

func (d disabled) LogsCreatedAfter(eventSig common.Hash, address common.Address, time time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled

}

func (d disabled) IndexedLogsCreatedAfter(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) LatestBlockByEventSigsAddrsWithConfs(eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
return 0, ErrDisabled
}
5 changes: 5 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type LogPoller interface {
LogsCreatedAfter(eventSig common.Hash, address common.Address, time time.Time, confs int, qopts ...pg.QOpt) ([]Log, error)
LatestLogByEventSigWithConfs(eventSig common.Hash, address common.Address, confs int, qopts ...pg.QOpt) (*Log, error)
LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) ([]Log, error)
LatestBlockByEventSigsAddrsWithConfs(eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error)

// Content based querying
IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error)
Expand Down Expand Up @@ -975,6 +976,10 @@ func (lp *logPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, eventSigs
return lp.orm.SelectLatestLogEventSigsAddrsWithConfs(fromBlock, addresses, eventSigs, confs, qopts...)
}

func (lp *logPoller) LatestBlockByEventSigsAddrsWithConfs(eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
return lp.orm.SelectLatestBlockNumberEventSigsAddrsWithConfs(eventSigs, addresses, confs, qopts...)
}

// GetBlocksRange tries to get the specified block numbers from the log pollers
// blocks table. It falls back to the RPC for any unfulfilled requested blocks.
func (lp *logPoller) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
Expand Down
31 changes: 31 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ func (o *ObservedLogPoller) LatestLogEventSigsAddrsWithConfs(fromBlock int64, ev
})
}

func (o *ObservedLogPoller) LatestBlockByEventSigsAddrsWithConfs(eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
return withObservedQuery(o.histogram, "LatestBlockByEventSigsAddrsWithConfs", common.Address{}, func() (int64, error) {
return o.LogPoller.LatestBlockByEventSigsAddrsWithConfs(eventSigs, addresses, confs, qopts...)
})
}

func (o *ObservedLogPoller) IndexedLogs(eventSig common.Hash, address common.Address, topicIndex int, topicValues []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
return withObservedQuery(o.histogram, "IndexedLogs", address, func() ([]Log, error) {
return o.LogPoller.IndexedLogs(eventSig, address, topicIndex, topicValues, confs, qopts...)
Expand Down
66 changes: 41 additions & 25 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,8 @@ func (o *ORM) GetBlocksRange(start uint64, end uint64, qopts ...pg.QOpt) ([]LogP
// SelectLatestLogEventSigsAddrsWithConfs finds the latest log by (address, event) combination that matches a list of Addresses and list of events
func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses []common.Address, eventSigs []common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
var logs []Log

var sigs [][]byte
for _, sig := range eventSigs {
sigs = append(sigs, sig.Bytes())
}
var addrs [][]byte
for _, addr := range addresses {
addrs = append(addrs, addr.Bytes())
}
sigs := concatBytes(eventSigs)
addrs := concatBytes(addresses)

q := o.q.WithOpts(qopts...)
err := q.Select(&logs, `
Expand All @@ -319,13 +312,33 @@ func (o *ORM) SelectLatestLogEventSigsAddrsWithConfs(fromBlock int64, addresses
GROUP BY event_sig, address
)
ORDER BY block_number ASC
`, o.chainID.Int64(), pq.ByteaArray(sigs), pq.ByteaArray(addrs), fromBlock, confs)
`, o.chainID.Int64(), sigs, addrs, fromBlock, confs)
if err != nil {
return nil, errors.Wrap(err, "failed to execute query")
}
return logs, nil
}

// SelectLatestBlockNumberEventSigsAddrsWithConfs finds the latest block number that matches a list of Addresses and list of events. It returns 0 if there is no matching block
func (o *ORM) SelectLatestBlockNumberEventSigsAddrsWithConfs(eventSigs []common.Hash, addresses []common.Address, confs int, qopts ...pg.QOpt) (int64, error) {
var blockNumber int64
sigs := concatBytes(eventSigs)
addrs := concatBytes(addresses)

q := o.q.WithOpts(qopts...)
err := q.Get(&blockNumber, `
SELECT COALESCE(MAX(block_number), 0) FROM evm_logs
WHERE evm_chain_id = $1 AND
event_sig = ANY($2) AND
address = ANY($3) AND
(block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)`,
o.chainID.Int64(), sigs, addrs, confs)
if err != nil {
return 0, err
}
return blockNumber, nil
}

func (o *ORM) SelectDataWordRange(address common.Address, eventSig common.Hash, wordIndex int, wordValueMin, wordValueMax common.Hash, confs int, qopts ...pg.QOpt) ([]Log, error) {
var logs []Log
q := o.q.WithOpts(qopts...)
Expand Down Expand Up @@ -407,18 +420,15 @@ func (o *ORM) SelectIndexedLogs(address common.Address, eventSig common.Hash, to

q := o.q.WithOpts(qopts...)
var logs []Log
var topicValuesBytes [][]byte
for _, topicValue := range topicValues {
topicValuesBytes = append(topicValuesBytes, topicValue.Bytes())
}
topicValuesBytes := concatBytes(topicValues)
// Add 1 since postgresql arrays are 1-indexed.
err := q.Select(&logs, `
SELECT * FROM evm_logs
WHERE evm_logs.evm_chain_id = $1
AND address = $2 AND event_sig = $3
AND topics[$4] = ANY($5)
AND (block_number + $6) <= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
ORDER BY (evm_logs.block_number, evm_logs.log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, pq.ByteaArray(topicValuesBytes), confs)
ORDER BY (evm_logs.block_number, evm_logs.log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, confs)
if err != nil {
return nil, err
}
Expand All @@ -432,17 +442,14 @@ func (o *ORM) SelectIndexedLogsByBlockRangeFilter(start, end int64, address comm
}

var logs []Log
var topicValuesBytes [][]byte
for _, topicValue := range topicValues {
topicValuesBytes = append(topicValuesBytes, topicValue.Bytes())
}
topicValuesBytes := concatBytes(topicValues)
q := o.q.WithOpts(qopts...)
err := q.Select(&logs, `
SELECT * FROM evm_logs
WHERE evm_logs.block_number >= $1 AND evm_logs.block_number <= $2 AND evm_logs.evm_chain_id = $3
AND address = $4 AND event_sig = $5
AND topics[$6] = ANY($7)
ORDER BY (evm_logs.block_number, evm_logs.log_index)`, start, end, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, pq.ByteaArray(topicValuesBytes))
ORDER BY (evm_logs.block_number, evm_logs.log_index)`, start, end, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes)
if err != nil {
return nil, err
}
Expand All @@ -460,10 +467,7 @@ func validateTopicIndex(index int) error {
func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig common.Hash, topicIndex int, topicValues []common.Hash, after time.Time, confs int, qopts ...pg.QOpt) ([]Log, error) {
q := o.q.WithOpts(qopts...)
var logs []Log
var topicValuesBytes [][]byte
for _, topicValue := range topicValues {
topicValuesBytes = append(topicValuesBytes, topicValue.Bytes())
}
topicValuesBytes := concatBytes(topicValues)
// Add 1 since postgresql arrays are 1-indexed.
err := q.Select(&logs, `
SELECT * FROM evm_logs
Expand All @@ -472,7 +476,7 @@ func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig com
AND topics[$4] = ANY($5)
AND created_at > $6
AND (block_number + $7) <= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, pq.ByteaArray(topicValuesBytes), after, confs)
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -518,3 +522,15 @@ func (o *ORM) SelectIndexedLogsWithSigsExcluding(sigA, sigB common.Hash, topicIn
return logs, nil

}

type bytesProducer interface {
Bytes() []byte
}

func concatBytes[T bytesProducer](byteSlice []T) pq.ByteaArray {
var output [][]byte
for _, b := range byteSlice {
output = append(output, b.Bytes())
}
return output
}
67 changes: 67 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1001,3 +1001,70 @@ func TestSelectLogsWithSigsExcluding(t *testing.T) {
require.NoError(t, err)
require.Len(t, logs, 0)
}

func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event1 := EmitterABI.Events["Log1"].ID
event2 := EmitterABI.Events["Log2"].ID
address1 := common.HexToAddress("0xA")
address2 := common.HexToAddress("0xB")

require.NoError(t, th.ORM.InsertLogs([]logpoller.Log{
GenLog(th.ChainID, 1, 1, "0x1", event1[:], address1),
GenLog(th.ChainID, 2, 1, "0x2", event2[:], address2),
GenLog(th.ChainID, 2, 2, "0x4", event2[:], address2),
GenLog(th.ChainID, 2, 3, "0x6", event2[:], address2),
}))
require.NoError(t, th.ORM.InsertBlock(common.HexToHash("0x1"), 3, time.Now()))

tests := []struct {
name string
events []common.Hash
addrs []common.Address
confs int
expectedBlockNumber int64
}{
{
name: "no matching logs returns 0 block number",
events: []common.Hash{event2},
addrs: []common.Address{address1},
confs: 0,
expectedBlockNumber: 0,
},
{
name: "not enough confirmations block returns 0 block number",
events: []common.Hash{event2},
addrs: []common.Address{address2},
confs: 5,
expectedBlockNumber: 0,
},
{
name: "single matching event and address returns last block",
events: []common.Hash{event1},
addrs: []common.Address{address1},
confs: 0,
expectedBlockNumber: 1,
},
{
name: "picks max block from two events",
events: []common.Hash{event1, event2},
addrs: []common.Address{address1, address2},
confs: 0,
expectedBlockNumber: 3,
},
{
name: "picks previous block number for confirmations set to 1",
events: []common.Hash{event2},
addrs: []common.Address{address2},
confs: 1,
expectedBlockNumber: 2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
blockNumber, err := th.ORM.SelectLatestBlockNumberEventSigsAddrsWithConfs(tt.events, tt.addrs, tt.confs)
require.NoError(t, err)
assert.Equal(t, tt.expectedBlockNumber, blockNumber)
})
}
}

0 comments on commit 2cec7c5

Please sign in to comment.