Skip to content

Commit

Permalink
aligned named logger
Browse files Browse the repository at this point in the history
  • Loading branch information
amirylm committed Sep 13, 2023
1 parent 6fabc6d commit a707c34
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, lggr
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
lggr: lggr.Named(BlockSubscriberServiceName),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type logEventBuffer struct {

func newLogEventBuffer(lggr logger.Logger, size, maxBlockLogs, maxUpkeepLogsPerBlock int) *logEventBuffer {
return &logEventBuffer{
lggr: lggr.Named("KeepersRegistry.LogEventBuffer"),
lggr: lggr.Named("LogEventBuffer"),
size: int32(size),
blocks: make([]fetchedBlock, size),
maxBlockLogs: maxBlockLogs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type logEventProvider struct {
func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
lggr: lggr.Named(LogProviderServiceName),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), maxLogsPerBlock, maxLogsPerUpkeepInBlock),
poller: poller,
Expand All @@ -121,13 +121,14 @@ func (p *logEventProvider) Start(context.Context) error {
p.lggr.Infow("starting log event provider", "readInterval", p.opts.ReadInterval, "readMaxBatchSize", readMaxBatchSize, "readers", readerThreads)

for i := 0; i < readerThreads; i++ {
tid := i + 1
p.threadCtrl.Go(func(ctx context.Context) {
p.startReader(ctx, readQ)
p.startReader(ctx, readQ, tid)
})
}

p.threadCtrl.Go(func(ctx context.Context) {
lggr := p.lggr.With("where", "scheduler")
lggr := p.lggr.Named("Scheduler")

p.scheduleReadJobs(ctx, func(ids []*big.Int) {
select {
Expand All @@ -151,7 +152,7 @@ func (p *logEventProvider) Close() error {
}

func (p *logEventProvider) HealthReport() map[string]error {
return map[string]error{LogProviderServiceName: p.Healthy()}
return map[string]error{p.lggr.Name(): p.Healthy()}
}

func (p *logEventProvider) GetLatestPayloads(ctx context.Context) ([]ocr2keepers.UpkeepPayload, error) {
Expand Down Expand Up @@ -251,11 +252,11 @@ func (p *logEventProvider) scheduleReadJobs(pctx context.Context, execute func([
}

// startReader starts a reader that reads logs from the ids coming from readQ.
func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int) {
func (p *logEventProvider) startReader(pctx context.Context, readQ <-chan []*big.Int, tid int) {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

lggr := p.lggr.With("where", "reader")
lggr := p.lggr.Named(fmt.Sprintf("ReaderThread-%d", tid))

for {
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (r *logRecoverer) Close() error {
}

func (r *logRecoverer) HealthReport() map[string]error {
return map[string]error{LogRecovererServiceName: r.Healthy()}
return map[string]error{r.lggr.Name(): r.Healthy()}
}

func (r *logRecoverer) GetProposalData(ctx context.Context, proposal ocr2keepers.CoordinatedBlockProposal) ([]byte, error) {
Expand Down Expand Up @@ -572,7 +572,7 @@ func (r *logRecoverer) clean(ctx context.Context) {
}
}
r.lock.RUnlock()
lggr := r.lggr.With("where", "clean")
lggr := r.lggr.Named("Cleaner")
if len(expired) == 0 {
lggr.Debug("no expired upkeeps")
return
Expand All @@ -595,7 +595,7 @@ func (r *logRecoverer) tryExpire(ctx context.Context, ids ...string) error {
if err != nil {
return fmt.Errorf("failed to get states: %w", err)
}
lggr := r.lggr.With("where", "clean")
lggr := r.lggr.Named("TryExpire")
start, _ := r.getRecoveryWindow(latestBlock)
r.lock.Lock()
defer r.lock.Unlock()
Expand Down
12 changes: 6 additions & 6 deletions core/services/ocr2/plugins/ocr2keeper/evm21/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
)

var (
RegistryServiceName = "AutomationRegistry"
RegistryServiceName = "EvmRegistry"

ErrLogReadFailure = fmt.Errorf("failure reading logs")
ErrHeadNotAvailable = fmt.Errorf("head not available")
Expand Down Expand Up @@ -87,7 +87,7 @@ func NewEvmRegistry(
return &EvmRegistry{
ctx: context.Background(),
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("EvmRegistry"),
lggr: lggr.Named(RegistryServiceName),
poller: client.LogPoller(),
addr: addr,
client: client.Client(),
Expand Down Expand Up @@ -165,7 +165,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
}

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "upkeeps_referesh")
lggr := r.lggr.Named("UpkeepRefreshThread")
err := r.refreshActiveUpkeeps()
if err != nil {
lggr.Errorf("failed to initialize upkeeps", err)
Expand All @@ -188,7 +188,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "logs_polling")
lggr := r.lggr.Named("LogPollingThread")
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -206,7 +206,7 @@ func (r *EvmRegistry) Start(ctx context.Context) error {
})

r.threadCtrl.Go(func(ctx context.Context) {
lggr := r.lggr.With("where", "logs_processing")
lggr := r.lggr.Named("LogProcessingThread")
ch := r.chLog

for {
Expand Down Expand Up @@ -234,7 +234,7 @@ func (r *EvmRegistry) Close() error {
}

func (r *EvmRegistry) HealthReport() map[string]error {
return map[string]error{RegistryServiceName: r.Healthy()}
return map[string]error{r.lggr.Name(): r.Healthy()}
}

func (r *EvmRegistry) refreshActiveUpkeeps() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type UpkeepPrivilegeConfig struct {

// streamsLookup looks through check upkeep results looking for any that need off chain lookup
func (r *EvmRegistry) streamsLookup(ctx context.Context, checkResults []ocr2keepers.CheckResult) []ocr2keepers.CheckResult {
lggr := r.lggr.With("where", "StreamsLookup")
lggr := r.lggr.Named("StreamsLookup")
lookups := map[int]*StreamsLookup{}
for i, res := range checkResults {
if res.IneligibilityReason != uint8(encoding.UpkeepFailureReasonTargetCheckReverted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (u *upkeepStateStore) Close() error {
}

func (u *upkeepStateStore) HealthReport() map[string]error {
return map[string]error{UpkeepStateStoreServiceName: u.Healthy()}
return map[string]error{u.lggr.Name(): u.Healthy()}
}

// SelectByWorkIDs returns the current state of the upkeep for the provided ids.
Expand Down

0 comments on commit a707c34

Please sign in to comment.