From a6ac8b32dfa6bd00aaf7a6a1f553aa7e32ed1b99 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Fri, 30 Jun 2023 17:04:32 -0400 Subject: [PATCH 1/2] Replacing created_at with the block_timestamp. Ordering alignment with other log poller functions (block_no, log_index) --- core/chains/evm/logpoller/orm.go | 13 ++- core/chains/evm/logpoller/orm_test.go | 156 ++++++++++++++++++++++++++ 2 files changed, 163 insertions(+), 6 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index 3a0fb52cd6c..a2de51b7c56 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -239,9 +239,9 @@ func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, af WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 - AND created_at > $4 + AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $4 ORDER BY block_number LIMIT 1) AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig, after, confs) + ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, after, confs) if err != nil { return nil, err } @@ -504,12 +504,13 @@ func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig com // Add 1 since postgresql arrays are 1-indexed. err := q.Select(&logs, ` SELECT * FROM evm.logs - WHERE evm.logs.evm_chain_id = $1 - AND address = $2 AND event_sig = $3 + WHERE evm_chain_id = $1 + AND address = $2 + AND event_sig = $3 AND topics[$4] = ANY($5) - AND created_at > $6 + AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $6 ORDER BY block_number LIMIT 1) AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7 - ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs) + ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs) if err != nil { return nil, err } diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index e8b5a90ce0c..7cf945dedb3 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -40,6 +40,12 @@ func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, } } +func GenTimestampedLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address, timestamp time.Time) logpoller.Log { + log := GenLog(chainID, logIndex, blockNum, blockHash, topic1, address) + log.BlockTimestamp = timestamp + return log +} + func TestLogPoller_Batching(t *testing.T) { t.Parallel() th := SetupTH(t, 2, 3, 2) @@ -1017,6 +1023,156 @@ func TestSelectLogsWithSigsExcluding(t *testing.T) { require.Len(t, logs, 0) } +func TestOrderingInSelectLogsAndIndexedLogsCreatedAfter(t *testing.T) { + th := SetupTH(t, 2, 3, 2) + + currentTime := time.Now() + past := currentTime.Add(-24 * time.Hour) + expectedOrder := []string{"0x6", "0x2", "0x1", "0x4"} + + event := EmitterABI.Events["Log1"].ID + address := common.HexToAddress("0xA") + + logs := []logpoller.Log{ + GenTimestampedLog(th.ChainID, 3, 1, "0x1", event[:], address, currentTime), + GenTimestampedLog(th.ChainID, 2, 1, "0x2", event[:], address, currentTime), + GenTimestampedLog(th.ChainID, 4, 1, "0x4", event[:], address, currentTime), + GenTimestampedLog(th.ChainID, 1, 1, "0x6", event[:], address, currentTime), + } + require.NoError(t, th.ORM.InsertLogs(logs)) + for _, l := range logs { + require.NoError(t, th.ORM.InsertBlock(l.BlockHash, l.BlockNumber, l.BlockTimestamp)) + } + + indexedLogsCreatedAfter, err := th.ORM.SelectIndexedLogsCreatedAfter(address, event, 0, []common.Hash{event}, past, 0) + require.NoError(t, err) + require.Len(t, indexedLogsCreatedAfter, len(expectedOrder)) + + logsCreatedAfter, err := th.ORM.SelectLogsCreatedAfter(event[:], address, past, 0) + require.NoError(t, err) + require.Len(t, logsCreatedAfter, len(expectedOrder)) + + for i, hash := range expectedOrder { + require.Equal(t, indexedLogsCreatedAfter[i].BlockHash, common.HexToHash(hash)) + require.Equal(t, logsCreatedAfter[i].BlockHash, common.HexToHash(hash)) + } +} + +func TestSelectLogsAndIndexedLogsCreatedAfter(t *testing.T) { + th := SetupTH(t, 2, 3, 2) + + firstBlockTimestamp := time.Date(2010, 1, 1, 10, 0, 0, 0, time.UTC) + secondBlockTimestamp := time.Date(2012, 1, 1, 10, 0, 0, 0, time.UTC) + thirdBlockTimestamp := time.Date(2014, 1, 1, 10, 0, 0, 0, time.UTC) + + beforeFirstBlock := firstBlockTimestamp.Add(-(time.Hour * 10)) + beforeSecondBlock := secondBlockTimestamp.Add(-(time.Hour * 10)) + + event := EmitterABI.Events["Log1"].ID + address1 := common.HexToAddress("0xA") + address2 := common.HexToAddress("0xB") + + logs := []logpoller.Log{ + GenTimestampedLog(th.ChainID, 1, 1, "0x1", event[:], address2, firstBlockTimestamp), + GenTimestampedLog(th.ChainID, 2, 1, "0x2", event[:], address1, firstBlockTimestamp), + GenTimestampedLog(th.ChainID, 2, 2, "0x4", event[:], address1, secondBlockTimestamp), + GenTimestampedLog(th.ChainID, 2, 3, "0x6", event[:], address1, thirdBlockTimestamp), + } + require.NoError(t, th.ORM.InsertLogs(logs)) + + for _, l := range logs { + require.NoError(t, th.ORM.InsertBlock(l.BlockHash, l.BlockNumber, l.BlockTimestamp)) + } + + tests := []struct { + name string + addr common.Address + event common.Hash + topicValues []common.Hash + after time.Time + confs int + expectedLogs []string + }{ + { + name: "pick all logs", + addr: address1, + event: event, + topicValues: []common.Hash{event}, + after: beforeFirstBlock, + confs: 0, + expectedLogs: []string{"0x2", "0x4", "0x6"}, + }, + { + name: "pick only logs after first timestamp", + addr: address1, + event: event, + topicValues: []common.Hash{event}, + after: beforeSecondBlock, + confs: 0, + expectedLogs: []string{"0x4", "0x6"}, + }, + { + name: "pick only the logs after first timestamp with at least 1 confirmation", + addr: address1, + event: event, + topicValues: []common.Hash{event}, + after: beforeSecondBlock, + confs: 1, + expectedLogs: []string{"0x4"}, + }, + { + name: "no results when created at is after all blocks", + addr: address1, + event: event, + topicValues: []common.Hash{event}, + after: time.Date(2030, 1, 1, 10, 0, 0, 0, time.UTC), + confs: 0, + expectedLogs: nil, + }, + { + name: "no results when too many confirmations are required", + addr: address1, + event: event, + topicValues: []common.Hash{event}, + after: beforeFirstBlock, + confs: 3, + expectedLogs: nil, + }, + { + name: "no results when address doesn't match", + addr: common.HexToAddress("0xC"), + event: event, + topicValues: []common.Hash{event}, + after: beforeFirstBlock, + confs: 0, + expectedLogs: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + indexedLogsCreatedAfter, err := th.ORM.SelectIndexedLogsCreatedAfter(tt.addr, tt.event, 0, tt.topicValues, tt.after, tt.confs) + require.NoError(t, err) + + logsCreatedAfter, err := th.ORM.SelectLogsCreatedAfter(tt.event[:], tt.addr, tt.after, tt.confs) + require.NoError(t, err) + + if tt.expectedLogs == nil { + assert.Nil(t, indexedLogsCreatedAfter) + assert.Nil(t, logsCreatedAfter) + return + } + + assert.Len(t, indexedLogsCreatedAfter, len(tt.expectedLogs)) + assert.Len(t, logsCreatedAfter, len(tt.expectedLogs)) + for i, hash := range tt.expectedLogs { + assert.Equal(t, indexedLogsCreatedAfter[i].BlockHash, common.HexToHash(hash)) + assert.Equal(t, logsCreatedAfter[i].BlockHash, common.HexToHash(hash)) + } + }) + } +} + func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) { th := SetupTH(t, 2, 3, 2) event1 := EmitterABI.Events["Log1"].ID From c62b4171701b2c2b04ef56b340ed320b0fd73284 Mon Sep 17 00:00:00 2001 From: Mateusz Sekara Date: Sun, 30 Jul 2023 17:19:28 +0200 Subject: [PATCH 2/2] Post rebase fixes --- core/chains/evm/logpoller/orm.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index a2de51b7c56..22b0df298ca 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -122,7 +122,7 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com WHERE evm_chain_id = $1 AND event_sig = $2 AND address = $3 - AND (block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) + AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4 ORDER BY (block_number, log_index) DESC LIMIT 1`, utils.NewBig(o.chainID), eventSig, address, confs); err != nil { return nil, err } @@ -239,8 +239,8 @@ func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, af WHERE evm_chain_id = $1 AND address = $2 AND event_sig = $3 - AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $4 ORDER BY block_number LIMIT 1) - AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) + AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $4 ORDER BY block_number LIMIT 1) + AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $5 ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, after, confs) if err != nil { return nil, err @@ -508,7 +508,7 @@ func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig com AND address = $2 AND event_sig = $3 AND topics[$4] = ANY($5) - AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm_log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $6 ORDER BY block_number LIMIT 1) + AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $6 ORDER BY block_number LIMIT 1) AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7 ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs) if err != nil {