Skip to content

Commit

Permalink
Upkeep states performed (#10183)
Browse files Browse the repository at this point in the history
* added performed events scanner

scanning for work ids in DedupKeyAdded events

* refactor upkeep states store

* tests

* lint

* lint in logprovider test

* comments and fix scanner close

* fix scanner tests

* test errors
  • Loading branch information
amirylm authored Aug 14, 2023
1 parent 3dbc025 commit 27621ae
Show file tree
Hide file tree
Showing 7 changed files with 649 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ func TestIntegration_LogEventProvider(t *testing.T) {
}

func TestIntegration_LogEventProvider_RateLimit(t *testing.T) {
type deferableFunc func()

setupTest := func(
t *testing.T,
opts *logprovider.LogEventProviderOptions,
Expand Down Expand Up @@ -318,7 +316,7 @@ func TestIntegration_LogEventProvider_RateLimit(t *testing.T) {
}

func TestIntegration_LogEventProvider_Backfill(t *testing.T) {
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second*30)
ctx, cancel := context.WithTimeout(testutils.Context(t), time.Second*60)
defer cancel()

backend, stopMining, accounts := setupBackend(t)
Expand Down Expand Up @@ -350,8 +348,8 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) {
b, err := ethClient.BlockByHash(ctx, backend.Commit())
latestBlock := b.Number().Int64()
for {
latestPolled, err := lp.LatestBlock(pg.WithParentCtx(ctx))
require.NoError(t, err)
latestPolled, lberr := lp.LatestBlock(pg.WithParentCtx(ctx))
require.NoError(t, lberr)
if latestPolled >= latestBlock {
break
}
Expand All @@ -360,8 +358,8 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) {

// starting the log provider should backfill logs
go func() {
if err := logProvider.Start(ctx); err != nil {
t.Logf("error starting log provider: %s", err)
if startErr := logProvider.Start(ctx); err != nil {
t.Logf("error starting log provider: %s", startErr)
t.Fail()
}
}()
Expand All @@ -370,7 +368,7 @@ func TestIntegration_LogEventProvider_Backfill(t *testing.T) {
t.Log("waiting for log provider to do some backfilling")
for ctx.Err() == nil {
currentPartition := logProvider.CurrentPartitionIdx()
if currentPartition >= 2 {
if currentPartition > 2 { // make sure we went over all items
break
}
}
Expand Down
92 changes: 0 additions & 92 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeep_state_store.go

This file was deleted.

137 changes: 0 additions & 137 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeep_state_store_test.go

This file was deleted.

86 changes: 86 additions & 0 deletions core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/scanner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package upkeepstate

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
iregistry21 "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated/i_keeper_registry_master_wrapper_2_1"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
)

type PerformedLogsScanner interface {
WorkIDsInRange(ctx context.Context, start, end int64) ([]string, error)
}

type performedEventsScanner struct {
lggr logger.Logger
poller logpoller.LogPoller
registryAddress common.Address
}

func NewPerformedEventsScanner(
lggr logger.Logger,
poller logpoller.LogPoller,
registryAddress common.Address,
) *performedEventsScanner {
return &performedEventsScanner{
lggr: lggr,
poller: poller,
registryAddress: registryAddress,
}
}

func (s *performedEventsScanner) Start(_ context.Context) error {
return s.poller.RegisterFilter(logpoller.Filter{
Name: dedupFilterName(s.registryAddress),
EventSigs: []common.Hash{
// listening to dedup key added event
iregistry21.IKeeperRegistryMasterDedupKeyAdded{}.Topic(),
},
Addresses: []common.Address{s.registryAddress},
})
}

// implements io.Closer, does nothing upon close
func (s *performedEventsScanner) Close() error {
return nil
}

func (s *performedEventsScanner) WorkIDsInRange(ctx context.Context, start, end int64) ([]string, error) {
logs, err := s.poller.LogsWithSigs(
start,
end,
[]common.Hash{
iregistry21.IKeeperRegistryMasterDedupKeyAdded{}.Topic(),
},
s.registryAddress,
pg.WithParentCtx(ctx),
)
if err != nil {
return nil, fmt.Errorf("error fetching logs: %w", err)
}

return s.logsToWorkIDs(logs), nil
}

func (s *performedEventsScanner) logsToWorkIDs(logs []logpoller.Log) []string {
workIDs := make([]string, 0)
for _, log := range logs {
topics := log.GetTopics()
if len(topics) < 2 {
s.lggr.Debugw("unexpected log topics", "topics", topics)
continue
}
workIDs = append(workIDs, hexutil.Encode(topics[1].Bytes()))
}
return workIDs
}

func dedupFilterName(addr common.Address) string {
return logpoller.FilterName("KeepersRegistry UpkeepStates Deduped", addr)
}
Loading

0 comments on commit 27621ae

Please sign in to comment.