Skip to content

Commit

Permalink
Merge pull request #40 from samcm/reduce-duplicate
Browse files Browse the repository at this point in the history
fix(consensus): Reduce duplicate subscriptions
  • Loading branch information
samcm committed Jul 21, 2022
2 parents ea443c7 + 066c532 commit a7fd74a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 74 deletions.
19 changes: 7 additions & 12 deletions pkg/exporter/consensus/jobs/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ func (b *Beacon) setupSubscriptions(ctx context.Context) error {
return err
}

if _, err := b.beaconNode.OnFinalizedCheckpoint(ctx, b.handleFinalizedCheckpointEvent); err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -284,26 +288,17 @@ func (b *Beacon) getInitialData(ctx context.Context) {
}
}

func (b *Beacon) HandleEvent(ctx context.Context, event *v1.Event) {
if event.Topic == EventTopicFinalizedCheckpoint {
b.handleFinalizedCheckpointEvent(ctx, event)
}
}

func (b *Beacon) handleChainReorg(ctx context.Context, event *v1.ChainReorgEvent) error {
b.ReOrgs.Inc()
b.ReOrgDepth.Add(float64(event.Depth))

return nil
}

func (b *Beacon) handleFinalizedCheckpointEvent(ctx context.Context, event *v1.Event) {
_, ok := event.Data.(*v1.FinalizedCheckpointEvent)
if !ok {
return
}

func (b *Beacon) handleFinalizedCheckpointEvent(ctx context.Context, event *v1.FinalizedCheckpointEvent) error {
b.updateFinalizedCheckpoint(ctx)

return nil
}

func (b *Beacon) updateFinalizedCheckpoint(ctx context.Context) {
Expand Down
62 changes: 0 additions & 62 deletions pkg/exporter/consensus/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package consensus

import (
"context"
"errors"
"time"

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon"
Expand Down Expand Up @@ -144,63 +141,4 @@ func (m *metrics) StartAsync(ctx context.Context) {
m.log.Errorf("Failed to start event metrics: %v", err)
}
}()

go m.subscriptionLoop(ctx)
}

func (m *metrics) subscriptionLoop(ctx context.Context) {
subscribed := false

for {
if subscribed && (time.Since(m.eventMetrics.LastEventTime) > (5 * time.Minute)) {
m.log.
WithField("last_event_time", m.eventMetrics.LastEventTime.Local().String()).
Info("Haven't received any events for 5 minutes, re-subscribing")

subscribed = false
}

if !subscribed && m.client != nil {
if err := m.startSubscriptions(ctx); err != nil {
m.log.Errorf("Failed to subscribe to eth2 node: %v", err)
} else {
subscribed = true
}
}

time.Sleep(60 * time.Second)
}
}

func (m *metrics) startSubscriptions(ctx context.Context) error {
m.log.Info("starting subscriptions")

provider, isProvider := m.client.(eth2client.EventsProvider)
if !isProvider {
return errors.New("client does not implement eth2client.Subscriptions")
}

topics := []string{}

for key, supported := range v1.SupportedEventTopics {
if key == "contribution_and_proof" {
continue
}

if supported {
topics = append(topics, key)
}
}

if err := provider.Events(ctx, topics, func(event *v1.Event) {
m.handleEvent(ctx, event)
}); err != nil {
return err
}

return nil
}

func (m *metrics) handleEvent(ctx context.Context, event *v1.Event) {
m.beaconMetrics.HandleEvent(ctx, event)
}

0 comments on commit a7fd74a

Please sign in to comment.