Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing created_at with the block_timestamp #9728

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (o *ORM) SelectLatestLogEventSigWithConfs(eventSig common.Hash, address com
WHERE evm_chain_id = $1
AND event_sig = $2
AND address = $3
AND (block_number + $4) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $4
ORDER BY (block_number, log_index) DESC LIMIT 1`, utils.NewBig(o.chainID), eventSig, address, confs); err != nil {
return nil, err
}
Expand Down Expand Up @@ -239,9 +239,9 @@ func (o *ORM) SelectLogsCreatedAfter(eventSig []byte, address common.Address, af
WHERE evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND created_at > $4
AND (block_number + $5) <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1)
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig, after, confs)
AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $4 ORDER BY block_number LIMIT 1)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $5
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig, after, confs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -504,12 +504,13 @@ func (o *ORM) SelectIndexedLogsCreatedAfter(address common.Address, eventSig com
// Add 1 since postgresql arrays are 1-indexed.
err := q.Select(&logs, `
SELECT * FROM evm.logs
WHERE evm.logs.evm_chain_id = $1
AND address = $2 AND event_sig = $3
WHERE evm_chain_id = $1
AND address = $2
AND event_sig = $3
AND topics[$4] = ANY($5)
AND created_at > $6
AND block_number >= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 and block_timestamp > $6 ORDER BY block_number LIMIT 1)
AND block_number <= (SELECT COALESCE(block_number, 0) FROM evm.log_poller_blocks WHERE evm_chain_id = $1 ORDER BY block_number DESC LIMIT 1) - $7
ORDER BY created_at ASC`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs)
ORDER BY (block_number, log_index)`, utils.NewBig(o.chainID), address, eventSig.Bytes(), topicIndex+1, topicValuesBytes, after, confs)
if err != nil {
return nil, err
}
Expand Down
156 changes: 156 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ func GenLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string,
}
}

func GenTimestampedLog(chainID *big.Int, logIndex int64, blockNum int64, blockHash string, topic1 []byte, address common.Address, timestamp time.Time) logpoller.Log {
log := GenLog(chainID, logIndex, blockNum, blockHash, topic1, address)
log.BlockTimestamp = timestamp
return log
}

func TestLogPoller_Batching(t *testing.T) {
t.Parallel()
th := SetupTH(t, 2, 3, 2)
Expand Down Expand Up @@ -1017,6 +1023,156 @@ func TestSelectLogsWithSigsExcluding(t *testing.T) {
require.Len(t, logs, 0)
}

func TestOrderingInSelectLogsAndIndexedLogsCreatedAfter(t *testing.T) {
th := SetupTH(t, 2, 3, 2)

currentTime := time.Now()
past := currentTime.Add(-24 * time.Hour)
expectedOrder := []string{"0x6", "0x2", "0x1", "0x4"}

event := EmitterABI.Events["Log1"].ID
address := common.HexToAddress("0xA")

logs := []logpoller.Log{
GenTimestampedLog(th.ChainID, 3, 1, "0x1", event[:], address, currentTime),
GenTimestampedLog(th.ChainID, 2, 1, "0x2", event[:], address, currentTime),
GenTimestampedLog(th.ChainID, 4, 1, "0x4", event[:], address, currentTime),
GenTimestampedLog(th.ChainID, 1, 1, "0x6", event[:], address, currentTime),
}
require.NoError(t, th.ORM.InsertLogs(logs))
for _, l := range logs {
require.NoError(t, th.ORM.InsertBlock(l.BlockHash, l.BlockNumber, l.BlockTimestamp))
}

indexedLogsCreatedAfter, err := th.ORM.SelectIndexedLogsCreatedAfter(address, event, 0, []common.Hash{event}, past, 0)
require.NoError(t, err)
require.Len(t, indexedLogsCreatedAfter, len(expectedOrder))

logsCreatedAfter, err := th.ORM.SelectLogsCreatedAfter(event[:], address, past, 0)
require.NoError(t, err)
require.Len(t, logsCreatedAfter, len(expectedOrder))

for i, hash := range expectedOrder {
require.Equal(t, indexedLogsCreatedAfter[i].BlockHash, common.HexToHash(hash))
require.Equal(t, logsCreatedAfter[i].BlockHash, common.HexToHash(hash))
}
}

func TestSelectLogsAndIndexedLogsCreatedAfter(t *testing.T) {
th := SetupTH(t, 2, 3, 2)

firstBlockTimestamp := time.Date(2010, 1, 1, 10, 0, 0, 0, time.UTC)
secondBlockTimestamp := time.Date(2012, 1, 1, 10, 0, 0, 0, time.UTC)
thirdBlockTimestamp := time.Date(2014, 1, 1, 10, 0, 0, 0, time.UTC)

beforeFirstBlock := firstBlockTimestamp.Add(-(time.Hour * 10))
beforeSecondBlock := secondBlockTimestamp.Add(-(time.Hour * 10))

event := EmitterABI.Events["Log1"].ID
address1 := common.HexToAddress("0xA")
address2 := common.HexToAddress("0xB")

logs := []logpoller.Log{
GenTimestampedLog(th.ChainID, 1, 1, "0x1", event[:], address2, firstBlockTimestamp),
GenTimestampedLog(th.ChainID, 2, 1, "0x2", event[:], address1, firstBlockTimestamp),
GenTimestampedLog(th.ChainID, 2, 2, "0x4", event[:], address1, secondBlockTimestamp),
GenTimestampedLog(th.ChainID, 2, 3, "0x6", event[:], address1, thirdBlockTimestamp),
}
require.NoError(t, th.ORM.InsertLogs(logs))

for _, l := range logs {
require.NoError(t, th.ORM.InsertBlock(l.BlockHash, l.BlockNumber, l.BlockTimestamp))
}

tests := []struct {
name string
addr common.Address
event common.Hash
topicValues []common.Hash
after time.Time
confs int
expectedLogs []string
}{
{
name: "pick all logs",
addr: address1,
event: event,
topicValues: []common.Hash{event},
after: beforeFirstBlock,
confs: 0,
expectedLogs: []string{"0x2", "0x4", "0x6"},
},
{
name: "pick only logs after first timestamp",
addr: address1,
event: event,
topicValues: []common.Hash{event},
after: beforeSecondBlock,
confs: 0,
expectedLogs: []string{"0x4", "0x6"},
},
{
name: "pick only the logs after first timestamp with at least 1 confirmation",
addr: address1,
event: event,
topicValues: []common.Hash{event},
after: beforeSecondBlock,
confs: 1,
expectedLogs: []string{"0x4"},
},
{
name: "no results when created at is after all blocks",
addr: address1,
event: event,
topicValues: []common.Hash{event},
after: time.Date(2030, 1, 1, 10, 0, 0, 0, time.UTC),
confs: 0,
expectedLogs: nil,
},
{
name: "no results when too many confirmations are required",
addr: address1,
event: event,
topicValues: []common.Hash{event},
after: beforeFirstBlock,
confs: 3,
expectedLogs: nil,
},
{
name: "no results when address doesn't match",
addr: common.HexToAddress("0xC"),
event: event,
topicValues: []common.Hash{event},
after: beforeFirstBlock,
confs: 0,
expectedLogs: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
indexedLogsCreatedAfter, err := th.ORM.SelectIndexedLogsCreatedAfter(tt.addr, tt.event, 0, tt.topicValues, tt.after, tt.confs)
require.NoError(t, err)

logsCreatedAfter, err := th.ORM.SelectLogsCreatedAfter(tt.event[:], tt.addr, tt.after, tt.confs)
require.NoError(t, err)

if tt.expectedLogs == nil {
assert.Nil(t, indexedLogsCreatedAfter)
assert.Nil(t, logsCreatedAfter)
return
}

assert.Len(t, indexedLogsCreatedAfter, len(tt.expectedLogs))
assert.Len(t, logsCreatedAfter, len(tt.expectedLogs))
for i, hash := range tt.expectedLogs {
assert.Equal(t, indexedLogsCreatedAfter[i].BlockHash, common.HexToHash(hash))
assert.Equal(t, logsCreatedAfter[i].BlockHash, common.HexToHash(hash))
}
})
}
}

func TestSelectLatestBlockNumberEventSigsAddrsWithConfs(t *testing.T) {
th := SetupTH(t, 2, 3, 2)
event1 := EmitterABI.Events["Log1"].ID
Expand Down
Loading