Skip to content

Commit

Permalink
avoid redundant named loggers and unexport vars
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 13, 2023
1 parent 0683e15 commit d633bf6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 27 deletions.
10 changes: 4 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/block_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ const (
lookbackDepth = 1024
// blockHistorySize decides the block history size sent to subscribers
blockHistorySize = int64(256)
)

var (
BlockSubscriberServiceName = "BlockSubscriber"
blockSubscriberServiceName = "BlockSubscriber"
)

type BlockSubscriber struct {
Expand Down Expand Up @@ -66,7 +64,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named(BlockSubscriberServiceName),
lggr: lggr.Named(blockSubscriberServiceName),
}
}

Expand Down Expand Up @@ -148,7 +146,7 @@ func (bs *BlockSubscriber) initialize(ctx context.Context) {
}

func (bs *BlockSubscriber) Start(ctx context.Context) error {
return bs.StartOnce(BlockSubscriberServiceName, func() error {
return bs.StartOnce(blockSubscriberServiceName, func() error {
bs.lggr.Info("block subscriber started.")
bs.initialize(ctx)
// poll from head broadcaster channel and push to subscribers
Expand Down Expand Up @@ -184,7 +182,7 @@ func (bs *BlockSubscriber) Start(ctx context.Context) error {
}

func (bs *BlockSubscriber) Close() error {
return bs.StopOnce(BlockSubscriberServiceName, func() error {
return bs.StopOnce(blockSubscriberServiceName, func() error {
bs.lggr.Info("stop block subscriber")
bs.threadCtrl.Close()
bs.unsubscribe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

var (
LogProviderServiceName = "LogEventProvider"
logProviderServiceName = "LogEventProvider"

ErrHeadNotAvailable = fmt.Errorf("head not available")
ErrBlockLimitExceeded = fmt.Errorf("block limit exceeded")
Expand Down Expand Up @@ -104,7 +104,7 @@ type logEventProvider struct {
func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named(LogProviderServiceName),
lggr: lggr.Named(logProviderServiceName),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
poller: poller,
Expand All @@ -114,7 +114,7 @@ func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDa
}

func (p *logEventProvider) Start(context.Context) error {
return p.StartOnce(LogProviderServiceName, func() error {
return p.StartOnce(logProviderServiceName, func() error {

readQ := make(chan []*big.Int, readJobQueueSize)

Expand Down Expand Up @@ -145,7 +145,7 @@ func (p *logEventProvider) Start(context.Context) error {
}

func (p *logEventProvider) Close() error {
return p.StopOnce(LogProviderServiceName, func() error {
return p.StopOnce(logProviderServiceName, func() error {
p.threadCtrl.Close()
return nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,14 @@ func (r *logRecoverer) clean(ctx context.Context) {
}
}
r.lock.RUnlock()
lggr := r.lggr.Named("Cleaner")

if len(expired) == 0 {
lggr.Debug("no expired upkeeps")
r.lggr.Debug("no expired upkeeps")
return
}
err := r.tryExpire(ctx, expired...)
if err != nil {
lggr.Warnw("failed to clean visited upkeeps", "err", err)
r.lggr.Warnw("failed to clean visited upkeeps", "err", err)
}
}

Expand All @@ -595,7 +595,6 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
if err != nil {
return fmt.Errorf("failed to get states: %w", err)
}
lggr := r.lggr.Named("TryExpire")
start, _ := r.getRecoveryWindow(latestBlock)
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -612,7 +611,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
}
if logBlock := rec.payload.Trigger.LogTriggerExtension.BlockNumber; int64(logBlock) < start {
// we can't recover this log anymore, so we remove it from the visited list
lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID,
r.lggr.Debugw("removing expired log: old block", "upkeepID", rec.payload.UpkeepID,
"latestBlock", latestBlock, "logBlock", logBlock, "start", start)
r.removePending(rec.payload.WorkID)
delete(r.visited, ids[i])
Expand All @@ -629,7 +628,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
}

if removed > 0 {
lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed)
r.lggr.Debugw("expired upkeeps", "expired", len(ids), "cleaned", removed)
}

return nil
Expand Down
19 changes: 8 additions & 11 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
)

var (
RegistryServiceName = "EvmRegistry"
registryServiceName = "EvmRegistry"

ErrLogReadFailure = fmt.Errorf("failure reading logs")
ErrHeadNotAvailable = fmt.Errorf("head not available")
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewEvmRegistry(
return &EvmRegistry{
ctx: context.Background(),
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named(RegistryServiceName),
lggr: lggr.Named(registryServiceName),
poller: client.LogPoller(),
addr: addr,
client: client.Client(),
Expand Down Expand Up @@ -159,16 +159,15 @@ func (r *EvmRegistry) Name() string {
}

func (r *EvmRegistry) Start(ctx context.Context) error {
return r.StartOnce(RegistryServiceName, func() error {
return r.StartOnce(registryServiceName, func() error {
if err := r.registerEvents(r.chainID, r.addr); err != nil {
return fmt.Errorf("logPoller error while registering automation events: %w", err)
}

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.Named("UpkeepRefreshThread")
err := r.refreshActiveUpkeeps()
if err != nil {
lggr.Errorf("failed to initialize upkeeps", err)
r.lggr.Errorf("failed to initialize upkeeps", err)
}

ticker := time.NewTicker(refreshInterval)
Expand All @@ -179,7 +178,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
case <-ticker.C:
err = r.refreshActiveUpkeeps()
if err != nil {
lggr.Errorf("failed to refresh upkeeps", err)
r.lggr.Errorf("failed to refresh upkeeps", err)
}
case <-ctx.Done():
return
Expand All @@ -188,7 +187,6 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.Named("LogPollingThread")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -197,7 +195,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
case <-ticker.C:
err := r.pollUpkeepStateLogs()
if err != nil {
lggr.Errorf("failed to poll logs for upkeeps", err)
r.lggr.Errorf("failed to poll logs for upkeeps", err)
}
case <-ctx.Done():
return
Expand All @@ -206,15 +204,14 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.Named("LogProcessingThread")
ch := r.chLog

for {
select {
case l := <-ch:
err := r.processUpkeepStateLog(l)
if err != nil {
lggr.Errorf("failed to process log for upkeep", err)
r.lggr.Errorf("failed to process log for upkeep", err)
}
case <-ctx.Done():
return
Expand All @@ -227,7 +224,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
}

func (r *EvmRegistry) Close() error {
return r.StopOnce(RegistryServiceName, func() error {
return r.StopOnce(registryServiceName, func() error {
r.threadCtrl.Close()
return nil
})
Expand Down

0 comments on commit d633bf6

Please sign in to comment.