diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go index 87b46c9785b..bcc73ad5ba9 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go @@ -27,10 +27,8 @@ const ( lookbackDepth = 1024 // blockHistorySize decides the block history size sent to subscribers blockHistorySize = int64(256) -) -var ( - BlockSubscriberServiceName = "BlockSubscriber" + blockSubscriberServiceName = "BlockSubscriber" ) type BlockSubscriber struct { @@ -66,7 +64,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr blockHistorySize: blockHistorySize, blockSize: lookbackDepth, latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{}, - lggr: lggr.Named("BlockSubscriber"), + lggr: lggr.Named(blockSubscriberServiceName), } } @@ -148,7 +146,7 @@ func (bs *BlockSubscriber) initialize(ctx context.Context) { } func (bs *BlockSubscriber) Start(ctx context.Context) error { - return bs.StartOnce(BlockSubscriberServiceName, func() error { + return bs.StartOnce(blockSubscriberServiceName, func() error { bs.lggr.Info("block subscriber started.") bs.initialize(ctx) // poll from head broadcaster channel and push to subscribers @@ -184,7 +182,7 @@ func (bs *BlockSubscriber) Start(ctx context.Context) error { } func (bs *BlockSubscriber) Close() error { - return bs.StopOnce(BlockSubscriberServiceName, func() error { + return bs.StopOnce(blockSubscriberServiceName, func() error { bs.lggr.Info("stop block subscriber") bs.threadCtrl.Close() bs.unsubscribe() diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go index b06a3ca809f..15f9080c563 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/buffer.go @@ -152,7 +152,7 @@ type logEventBuffer struct { func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer { return &logEventBuffer{ - lggr: lggr.Named("KeepersRegistry.LogEventBuffer"), + lggr: lggr.Named("LogEventBuffer"), size: int32(size), blocks: make([]fetchedBlock, size), maxBlockLogs: maxBlockLogs, diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go index b5f229f6015..77d9d4ec044 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/integration_test.go @@ -175,7 +175,7 @@ func TestIntegration_LogEventProvider_UpdateConfig(t *testing.T) { TriggerConfig: cfg, UpdateBlock: bn.Uint64() - 1, }) - require.Error(t, err) + require.NoError(t, err) // new block b, err = ethClient.BlockByHash(ctx, backend.Commit()) require.NoError(t, err) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go index b62fb370847..a3d84f010a6 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider.go @@ -26,7 +26,7 @@ import ( ) var ( - LogProviderServiceName = "LogEventProvider" + logProviderServiceName = "LogEventProvider" ErrHeadNotAvailable = fmt.Errorf("head not available") ErrBlockLimitExceeded = fmt.Errorf("block limit exceeded") @@ -104,7 +104,7 @@ type logEventProvider struct { func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider { return &logEventProvider{ threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named("KeepersRegistry.LogEventProvider"), + lggr: lggr.Named(logProviderServiceName), packer: packer, buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock), poller: poller, @@ -114,20 +114,21 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa } func (p *logEventProvider) Start(context.Context) error { - return p.StartOnce(LogProviderServiceName, func() error { + return p.StartOnce(logProviderServiceName, func() error { readQ := make(chan []*big.Int, readJobQueueSize) p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads) for i := 0; i < readerThreads; i++ { + tid := i + 1 p.threadCtrl.Go(func(ctx context.Context) { - p.startReader(ctx, readQ) + p.startReader(ctx, readQ, tid) }) } p.threadCtrl.Go(func(ctx context.Context) { - lggr := p.lggr.With("where", "scheduler") + lggr := p.lggr.Named("Scheduler") p.scheduleReadJobs(ctx, func(ids []*big.Int) { select { @@ -144,14 +145,14 @@ func (p *logEventProvider) Start(context.Context) error { } func (p *logEventProvider) Close() error { - return p.StopOnce(LogProviderServiceName, func() error { + return p.StopOnce(logProviderServiceName, func() error { p.threadCtrl.Close() return nil }) } func (p *logEventProvider) HealthReport() map[string]error { - return map[string]error{LogProviderServiceName: p.Healthy()} + return map[string]error{p.lggr.Name(): p.Healthy()} } func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) { @@ -251,11 +252,11 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([ } // startReader starts a reader that reads logs from the ids coming from readQ. -func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int) { +func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int, tid int) { ctx, cancel := context.WithCancel(pctx) defer cancel() - lggr := p.lggr.With("where", "reader") + lggr := p.lggr.Named(fmt.Sprintf("ReaderThread-%d", tid)) for { select { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go index ab816adb1b3..2796c4b3b0e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle.go @@ -76,7 +76,8 @@ func (p *logEventProvider) RegisterFilter(ctx context.Context, opts FilterOption if currentFilter != nil { if currentFilter.configUpdateBlock > opts.UpdateBlock { // already registered with a config from a higher block number - return fmt.Errorf("filter for upkeep with id %s already registered with newer config", upkeepID.String()) + p.lggr.Debugf("filter for upkeep with id %s already registered with newer config", upkeepID.String()) + return nil } else if currentFilter.configUpdateBlock == opts.UpdateBlock { // already registered with the same config p.lggr.Debugf("filter for upkeep with id %s already registered with the same config", upkeepID.String()) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go index 4b1ff06f316..2c19d37df6c 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/provider_life_cycle_test.go @@ -70,7 +70,7 @@ func TestLogEventProvider_LifeCycle(t *testing.T) { }, { "existing config with old block", - true, + false, big.NewInt(111), LogTriggerConfig{ ContractAddress: common.BytesToAddress(common.LeftPadBytes([]byte{1, 2, 3, 4}, 20)), diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index c5b06701737..a8dd8a6aed3 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -181,7 +181,7 @@ func (r *logRecoverer) Close() error { } func (r *logRecoverer) HealthReport() map[string]error { - return map[string]error{LogRecovererServiceName: r.Healthy()} + return map[string]error{r.lggr.Name(): r.Healthy()} } func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) { @@ -572,14 +572,14 @@ func (r *logRecoverer) clean(ctx context.Context) { } } r.lock.RUnlock() - lggr := r.lggr.With("where", "clean") + if len(expired) == 0 { - lggr.Debug("no expired upkeeps") + r.lggr.Debug("no expired upkeeps") return } err := r.tryExpire(ctx, expired...) if err != nil { - lggr.Warnw("failed to clean visited upkeeps", "err", err) + r.lggr.Warnw("failed to clean visited upkeeps", "err", err) } } @@ -595,7 +595,6 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { if err != nil { return fmt.Errorf("failed to get states: %w", err) } - lggr := r.lggr.With("where", "clean") start, _ := r.getRecoveryWindow(latestBlock) r.lock.Lock() defer r.lock.Unlock() @@ -612,7 +611,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { } if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start { // we can't recover this log anymore, so we remove it from the visited list - lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID, + r.lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID, "latestBlock", latestBlock, "logBlock", logBlock, "start", start) r.removePending(rec.payload.WorkID) delete(r.visited, ids[i]) @@ -629,7 +628,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error { } if removed > 0 { - lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed) + r.lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed) } return nil diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go index 590e5138b3e..08d75184c23 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/registry.go @@ -41,7 +41,7 @@ const ( ) var ( - RegistryServiceName = "AutomationRegistry" + registryServiceName = "EvmRegistry" ErrLogReadFailure = fmt.Errorf("failure reading logs") ErrHeadNotAvailable = fmt.Errorf("head not available") @@ -87,7 +87,7 @@ func NewEvmRegistry( return &EvmRegistry{ ctx: context.Background(), threadCtrl: utils.NewThreadControl(), - lggr: lggr.Named("EvmRegistry"), + lggr: lggr.Named(registryServiceName), poller: client.LogPoller(), addr: addr, client: client.Client(), @@ -159,16 +159,15 @@ func (r *EvmRegistry) Name() string { } func (r *EvmRegistry) Start(ctx context.Context) error { - return r.StartOnce(RegistryServiceName, func() error { + return r.StartOnce(registryServiceName, func() error { if err := r.registerEvents(r.chainID, r.addr); err != nil { return fmt.Errorf("logPoller error while registering automation events: %w", err) } r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "upkeeps_referesh") err := r.refreshActiveUpkeeps() if err != nil { - lggr.Errorf("failed to initialize upkeeps", err) + r.lggr.Errorf("failed to initialize upkeeps", err) } ticker := time.NewTicker(refreshInterval) @@ -179,7 +178,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { case <-ticker.C: err = r.refreshActiveUpkeeps() if err != nil { - lggr.Errorf("failed to refresh upkeeps", err) + r.lggr.Errorf("failed to refresh upkeeps", err) } case <-ctx.Done(): return @@ -188,7 +187,6 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "logs_polling") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -197,7 +195,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { case <-ticker.C: err := r.pollUpkeepStateLogs() if err != nil { - lggr.Errorf("failed to poll logs for upkeeps", err) + r.lggr.Errorf("failed to poll logs for upkeeps", err) } case <-ctx.Done(): return @@ -206,7 +204,6 @@ func (r *EvmRegistry) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - lggr := r.lggr.With("where", "logs_processing") ch := r.chLog for { @@ -214,7 +211,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error { case l := <-ch: err := r.processUpkeepStateLog(l) if err != nil { - lggr.Errorf("failed to process log for upkeep", err) + r.lggr.Errorf("failed to process log for upkeep", err) } case <-ctx.Done(): return @@ -227,14 +224,14 @@ func (r *EvmRegistry) Start(ctx context.Context) error { } func (r *EvmRegistry) Close() error { - return r.StopOnce(RegistryServiceName, func() error { + return r.StopOnce(registryServiceName, func() error { r.threadCtrl.Close() return nil }) } func (r *EvmRegistry) HealthReport() map[string]error { - return map[string]error{RegistryServiceName: r.Healthy()} + return map[string]error{r.lggr.Name(): r.Healthy()} } func (r *EvmRegistry) refreshActiveUpkeeps() error { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go index 2155b383002..e2a3d083219 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/streams_lookup.go @@ -88,7 +88,7 @@ type UpkeepPrivilegeConfig struct { // streamsLookup looks through check upkeep results looking for any that need off chain lookup func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult { - lggr := r.lggr.With("where", "StreamsLookup") + lggr := r.lggr.Named("StreamsLookup") lookups := map[int]*StreamsLookup{} for i, res := range checkResults { if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go index 34bd6822d69..a54ec662518 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/upkeepstate/store.go @@ -17,7 +17,7 @@ import ( ) const ( - UpkeepStateStoreServiceName = "UpkeepStateStore" + upkeepStateStoreServiceName = "UpkeepStateStore" // CacheExpiration is the amount of time that we keep a record in the cache. CacheExpiration = 24 * time.Hour // GCInterval is the amount of time between cache cleanups. @@ -81,7 +81,7 @@ type upkeepStateStore struct { func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScanner) *upkeepStateStore { return &upkeepStateStore{ orm: orm, - lggr: lggr.Named(UpkeepStateStoreServiceName), + lggr: lggr.Named(upkeepStateStoreServiceName), cache: map[string]*upkeepStateRecord{}, scanner: scanner, retention: CacheExpiration, @@ -97,7 +97,7 @@ func NewUpkeepStateStore(orm ORM, lggr logger.Logger, scanner PerformedLogsScann // it does background cleanup of the cache every GCInterval, // and flush records to DB every flushCadence. func (u *upkeepStateStore) Start(pctx context.Context) error { - return u.StartOnce(UpkeepStateStoreServiceName, func() error { + return u.StartOnce(upkeepStateStoreServiceName, func() error { if err := u.scanner.Start(pctx); err != nil { return fmt.Errorf("failed to start scanner") } @@ -159,14 +159,14 @@ func (u *upkeepStateStore) flush(ctx context.Context) { // Close stops the service of pruning stale data; implements io.Closer func (u *upkeepStateStore) Close() error { - return u.StopOnce(UpkeepStateStoreServiceName, func() error { + return u.StopOnce(upkeepStateStoreServiceName, func() error { u.threadCtrl.Close() return nil }) } func (u *upkeepStateStore) HealthReport() map[string]error { - return map[string]error{UpkeepStateStoreServiceName: u.Healthy()} + return map[string]error{u.lggr.Name(): u.Healthy()} } // SelectByWorkIDs returns the current state of the upkeep for the provided ids.