Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automation v2.1: fixes and leftovers #10599

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
lookbackDepth = 1024
// blockHistorySize decides the block history size sent to subscribers
blockHistorySize = int64(256)
)

var (
BlockSubscriberServiceName = "BlockSubscriber"
blockSubscriberServiceName = "BlockSubscriber"
)

type BlockSubscriber struct {
Expand Down Expand Up @@ -66,7 +64,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
lggr: lggr.Named(blockSubscriberServiceName),
}
}

Expand Down Expand Up @@ -148,7 +146,7 @@ func (bs *BlockSubscriber) initialize(ctx context.Context) {
}

func (bs *BlockSubscriber) Start(ctx context.Context) error {
return bs.StartOnce(BlockSubscriberServiceName, func() error {
return bs.StartOnce(blockSubscriberServiceName, func() error {
bs.lggr.Info("block subscriber started.")
bs.initialize(ctx)
// poll from head broadcaster channel and push to subscribers
Expand Down Expand Up @@ -184,7 +182,7 @@ func (bs *BlockSubscriber) Start(ctx context.Context) error {
}

func (bs *BlockSubscriber) Close() error {
return bs.StopOnce(BlockSubscriberServiceName, func() error {
return bs.StopOnce(blockSubscriberServiceName, func() error {
bs.lggr.Info("stop block subscriber")
bs.threadCtrl.Close()
bs.unsubscribe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type logEventBuffer struct {

func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
lggr: lggr.Named("LogEventBuffer"),
amirylm marked this conversation as resolved.
Show resolved Hide resolved
size: int32(size),
blocks: make([]fetchedBlock, size),
maxBlockLogs: maxBlockLogs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) {
TriggerConfig: cfg,
UpdateBlock: bn.Uint64() - 1,
})
require.Error(t, err)
require.NoError(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like the opposite assert now. not sure why this needs to change with the naming changes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will log (debug) rather than returning an error, therefore the error check is irrelevant

// new block
b, err = ethClient.BlockByHash(ctx, backend.Commit())
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

var (
LogProviderServiceName = "LogEventProvider"
logProviderServiceName = "LogEventProvider"

ErrHeadNotAvailable = fmt.Errorf("head not available")
ErrBlockLimitExceeded = fmt.Errorf("block limit exceeded")
Expand Down Expand Up @@ -104,7 +104,7 @@ type logEventProvider struct {
func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
lggr: lggr.Named(logProviderServiceName),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
poller: poller,
Expand All @@ -114,20 +114,21 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
}

func (p *logEventProvider) Start(context.Context) error {
return p.StartOnce(LogProviderServiceName, func() error {
return p.StartOnce(logProviderServiceName, func() error {

readQ := make(chan []*big.Int, readJobQueueSize)

p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads)

for i := 0; i < readerThreads; i++ {
tid := i + 1
p.threadCtrl.Go(func(ctx context.Context) {
p.startReader(ctx, readQ)
p.startReader(ctx, readQ, tid)
})
}

p.threadCtrl.Go(func(ctx context.Context) {
lggr := p.lggr.With("where", "scheduler")
lggr := p.lggr.Named("Scheduler")

p.scheduleReadJobs(ctx, func(ids []*big.Int) {
select {
Expand All @@ -144,14 +145,14 @@ func (p *logEventProvider) Start(context.Context) error {
}

func (p *logEventProvider) Close() error {
return p.StopOnce(LogProviderServiceName, func() error {
return p.StopOnce(logProviderServiceName, func() error {
p.threadCtrl.Close()
return nil
})
}

func (p *logEventProvider) HealthReport() map[string]error {
return map[string]error{LogProviderServiceName: p.Healthy()}
return map[string]error{p.lggr.Name(): p.Healthy()}
}

func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) {
Expand Down Expand Up @@ -251,11 +252,11 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([
}

// startReader starts a reader that reads logs from the ids coming from readQ.
func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int) {
func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int, tid int) {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

lggr := p.lggr.With("where", "reader")
lggr := p.lggr.Named(fmt.Sprintf("ReaderThread-%d", tid))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably tid is thread ID here? Could we perhaps rename the function to startReaderThread?


for {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func (p *logEventProvider) RegisterFilter(ctx context.Context, opts FilterOption
if currentFilter != nil {
if currentFilter.configUpdateBlock > opts.UpdateBlock {
// already registered with a config from a higher block number
return fmt.Errorf("filter for upkeep with id %s already registered with newer config", upkeepID.String())
p.lggr.Debugf("filter for upkeep with id %s already registered with newer config", upkeepID.String())
return nil
} else if currentFilter.configUpdateBlock == opts.UpdateBlock {
// already registered with the same config
p.lggr.Debugf("filter for upkeep with id %s already registered with the same config", upkeepID.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) {
},
{
"existing config with old block",
true,
false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't expect you to change this, but maybe in future as a team we should strive to use field names against these values, just for more context on what they represent if anything, especially in code reviews

big.NewInt(111),
LogTriggerConfig{
ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *logRecoverer) Close() error {
}

func (r *logRecoverer) HealthReport() map[string]error {
return map[string]error{LogRecovererServiceName: r.Healthy()}
return map[string]error{r.lggr.Name(): r.Healthy()}
}

func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) {
Expand Down Expand Up @@ -572,14 +572,14 @@ func (r *logRecoverer) clean(ctx context.Context) {
}
}
r.lock.RUnlock()
lggr := r.lggr.With("where", "clean")

if len(expired) == 0 {
lggr.Debug("no expired upkeeps")
r.lggr.Debug("no expired upkeeps")
return
}
err := r.tryExpire(ctx, expired...)
if err != nil {
lggr.Warnw("failed to clean visited upkeeps", "err", err)
r.lggr.Warnw("failed to clean visited upkeeps", "err", err)
}
}

Expand All @@ -595,7 +595,6 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
if err != nil {
return fmt.Errorf("failed to get states: %w", err)
}
lggr := r.lggr.With("where", "clean")
start, _ := r.getRecoveryWindow(latestBlock)
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -612,7 +611,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
}
if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start {
// we can't recover this log anymore, so we remove it from the visited list
lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID,
r.lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID,
"latestBlock", latestBlock, "logBlock", logBlock, "start", start)
r.removePending(rec.payload.WorkID)
delete(r.visited, ids[i])
Expand All @@ -629,7 +628,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
}

if removed > 0 {
lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed)
r.lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed)
}

return nil
Expand Down
21 changes: 9 additions & 12 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
)

var (
RegistryServiceName = "AutomationRegistry"
registryServiceName = "EvmRegistry"

ErrLogReadFailure = fmt.Errorf("failure reading logs")
ErrHeadNotAvailable = fmt.Errorf("head not available")
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewEvmRegistry(
return &EvmRegistry{
ctx: context.Background(),
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("EvmRegistry"),
lggr: lggr.Named(registryServiceName),
poller: client.LogPoller(),
addr: addr,
client: client.Client(),
Expand Down Expand Up @@ -159,16 +159,15 @@ func (r *EvmRegistry) Name() string {
}

func (r *EvmRegistry) Start(ctx context.Context) error {
return r.StartOnce(RegistryServiceName, func() error {
return r.StartOnce(registryServiceName, func() error {
if err := r.registerEvents(r.chainID, r.addr); err != nil {
return fmt.Errorf("logPoller error while registering automation events: %w", err)
}

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "upkeeps_referesh")
err := r.refreshActiveUpkeeps()
if err != nil {
lggr.Errorf("failed to initialize upkeeps", err)
r.lggr.Errorf("failed to initialize upkeeps", err)
}

ticker := time.NewTicker(refreshInterval)
Expand All @@ -179,7 +178,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
case <-ticker.C:
err = r.refreshActiveUpkeeps()
if err != nil {
lggr.Errorf("failed to refresh upkeeps", err)
r.lggr.Errorf("failed to refresh upkeeps", err)
}
case <-ctx.Done():
return
Expand All @@ -188,7 +187,6 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "logs_polling")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -197,7 +195,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
case <-ticker.C:
err := r.pollUpkeepStateLogs()
if err != nil {
lggr.Errorf("failed to poll logs for upkeeps", err)
r.lggr.Errorf("failed to poll logs for upkeeps", err)
}
case <-ctx.Done():
return
Expand All @@ -206,15 +204,14 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "logs_processing")
ch := r.chLog

for {
select {
case l := <-ch:
err := r.processUpkeepStateLog(l)
if err != nil {
lggr.Errorf("failed to process log for upkeep", err)
r.lggr.Errorf("failed to process log for upkeep", err)
}
case <-ctx.Done():
return
Expand All @@ -227,14 +224,14 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
}

func (r *EvmRegistry) Close() error {
return r.StopOnce(RegistryServiceName, func() error {
return r.StopOnce(registryServiceName, func() error {
r.threadCtrl.Close()
return nil
})
}

func (r *EvmRegistry) HealthReport() map[string]error {
return map[string]error{RegistryServiceName: r.Healthy()}
return map[string]error{r.lggr.Name(): r.Healthy()}
}

func (r *EvmRegistry) refreshActiveUpkeeps() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type UpkeepPrivilegeConfig struct {

// streamsLookup looks through check upkeep results looking for any that need off chain lookup
func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult {
lggr := r.lggr.With("where", "StreamsLookup")
lggr := r.lggr.Named("StreamsLookup")
lookups := map[int]*StreamsLookup{}
for i, res := range checkResults {
if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
UpkeepStateStoreServiceName = "UpkeepStateStore"
upkeepStateStoreServiceName = "UpkeepStateStore"
// CacheExpiration is the amount of time that we keep a record in the cache.
CacheExpiration = 24 * time.Hour
// GCInterval is the amount of time between cache cleanups.
Expand Down Expand Up @@ -81,7 +81,7 @@ type upkeepStateStore struct {
func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore {
return &upkeepStateStore{
orm: orm,
lggr: lggr.Named(UpkeepStateStoreServiceName),
lggr: lggr.Named(upkeepStateStoreServiceName),
cache: map[string]*upkeepStateRecord{},
scanner: scanner,
retention: CacheExpiration,
Expand All @@ -97,7 +97,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann
// it does background cleanup of the cache every GCInterval,
// and flush records to DB every flushCadence.
func (u *upkeepStateStore) Start(pctx context.Context) error {
return u.StartOnce(UpkeepStateStoreServiceName, func() error {
return u.StartOnce(upkeepStateStoreServiceName, func() error {
if err := u.scanner.Start(pctx); err != nil {
return fmt.Errorf("failed to start scanner")
}
Expand Down Expand Up @@ -159,14 +159,14 @@ func (u *upkeepStateStore) flush(ctx context.Context) {

// Close stops the service of pruning stale data; implements io.Closer
func (u *upkeepStateStore) Close() error {
return u.StopOnce(UpkeepStateStoreServiceName, func() error {
return u.StopOnce(upkeepStateStoreServiceName, func() error {
u.threadCtrl.Close()
return nil
})
}

func (u *upkeepStateStore) HealthReport() map[string]error {
return map[string]error{UpkeepStateStoreServiceName: u.Healthy()}
return map[string]error{u.lggr.Name(): u.Healthy()}
}

// SelectByWorkIDs returns the current state of the upkeep for the provided ids.
Expand Down
Loading