Skip to content

Commit

Permalink
simplify bootstrap process
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jul 12, 2022
1 parent b0cb577 commit 7628720
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 71 deletions.
46 changes: 22 additions & 24 deletions pkg/exporter/consensus/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient
}

func (n *node) Start(ctx context.Context) error {
if err := n.bootstrap(ctx); err != nil {
return err
}

s := gocron.NewScheduler(time.Local)

if _, err := s.Every("15s").Do(func() {
Expand All @@ -136,12 +140,6 @@ func (n *node) Start(ctx context.Context) error {
return err
}

if _, err := s.Every("1s").Do(func() {
n.tick(ctx)
}); err != nil {
return err
}

s.StartAsync()

return nil
Expand Down Expand Up @@ -193,27 +191,27 @@ func (n *node) GetGenesis(ctx context.Context) (*v1.Genesis, error) {
return n.genesis, nil
}

func (n *node) tick(ctx context.Context) {
if n.state == nil {
if err := n.initializeState(ctx); err != nil {
n.log.WithError(err).Error("Failed to initialize state")
}

if err := n.subscribeDownstream(ctx); err != nil {
n.log.WithError(err).Error("Failed to subscribe to downstream")
}
func (n *node) bootstrap(ctx context.Context) error {
if err := n.initializeState(ctx); err != nil {
return err
}

if err := n.subscribeToSelf(ctx); err != nil {
n.log.WithError(err).Error("Failed to subscribe to self")
}
if err := n.subscribeDownstream(ctx); err != nil {
return err
}

if err := n.publishReady(ctx); err != nil {
n.log.WithError(err).Error("Failed to publish ready")
}
if err := n.subscribeToSelf(ctx); err != nil {
return err
}

//nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine
go n.ensureBeaconSubscription(ctx)
if err := n.publishReady(ctx); err != nil {
return err
}

//nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine
go n.ensureBeaconSubscription(ctx)

return nil
}

func (n *node) fetchSyncStatus(ctx context.Context) error {
Expand Down Expand Up @@ -323,7 +321,7 @@ func (n *node) handleStateEpochSlotChanged(ctx context.Context, epochNumber phas
n.log.WithFields(logrus.Fields{
"epoch": epochNumber,
"slot": slot,
}).Info("Current epoch/slot changed")
}).Debug("Current epoch/slot changed")

for i := epochNumber; i < epochNumber+1; i++ {
epoch, err := n.state.GetEpoch(ctx, i)
Expand Down
16 changes: 7 additions & 9 deletions pkg/exporter/consensus/beacon/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type Container struct {

currentEpoch phase0.Epoch
currentSlot phase0.Slot
startingSlot phase0.Slot

startedAt time.Time

callbacksEpochChanged []func(ctx context.Context, epoch phase0.Epoch) error
callbacksSlotChanged []func(ctx context.Context, slot phase0.Slot) error
Expand All @@ -45,7 +46,8 @@ func NewContainer(ctx context.Context, log logrus.FieldLogger, sp *Spec, genesis

currentEpoch: 0,
currentSlot: 0,
startingSlot: 0,

startedAt: time.Now(),

epochs: NewEpochs(sp, genesis),
}
Expand Down Expand Up @@ -279,13 +281,9 @@ func (c *Container) checkForNewCurrentEpochAndSlot(ctx context.Context) error {
// We can't safely check if the previous slot was missed if
// we potentially started up _after_ the slot had started.
// So we'll just not bother checking in that case.
if c.startingSlot == 0 {
if previousSlot != 0 {
c.startingSlot = previousSlot
} else {
if err := c.checkForEmptySlot(ctx, previousSlot); err != nil {
c.log.WithError(err).Error("Failed to check for empty slot")
}
if time.Since(c.startedAt) > (c.spec.SecondsPerSlot * 2) {
if err := c.checkForEmptySlot(ctx, previousSlot); err != nil {
return err
}
}
}
Expand Down
81 changes: 43 additions & 38 deletions pkg/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,20 @@ func (e *exporter) Config(ctx context.Context) *Config {
}

func (e *exporter) Serve(ctx context.Context, port int) error {
e.log.
WithField("consensus_url", e.config.Consensus.URL).
WithField("execution_url", e.execution.URL()).
Info(fmt.Sprintf("Starting metrics server on :%v", port))

http.Handle("/metrics", promhttp.Handler())

go func() {
err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
if err != nil {
e.log.Fatal(err)
}
}()

if e.config.Execution.Enabled {
e.log.WithField("execution_url", e.execution.URL()).Info("Starting execution metrics...")

Expand All @@ -140,56 +154,47 @@ func (e *exporter) Serve(ctx context.Context, port int) error {
}

if e.config.Pair.Enabled && e.config.Execution.Enabled && e.config.Consensus.Enabled {
go func() {
if err := e.ensureConsensusClients(ctx); err != nil {
e.log.Fatal(err)
}
if err := e.ensureConsensusClients(ctx); err != nil {
e.log.Fatal(err)
}

if _, err := e.beacon.OnReady(ctx, func(ctx context.Context, event *beacon.ReadyEvent) error {
e.pairMetrics.StartAsync(ctx)
return nil
}); err != nil {
e.log.WithError(err).Error("Failed to subscribe to beacon node ready event")
}
if _, err := e.beacon.OnReady(ctx, func(ctx context.Context, event *beacon.ReadyEvent) error {
e.pairMetrics.StartAsync(ctx)
return nil
}); err != nil {
e.log.WithError(err).Error("Failed to subscribe to beacon node ready event")
}

if err := e.startPairExporter(ctx); err != nil {
e.log.WithError(err).Error("failed to start pair metrics")
if err := e.startPairExporter(ctx); err != nil {
e.log.WithError(err).Error("failed to start pair metrics")

e.log.Fatal(err)
}
}()
e.log.Fatal(err)
}
}

if e.config.Consensus.Enabled {
go func() {
if err := e.startConsensusExporter(ctx); err != nil {
e.log.WithError(err).Error("failed to start consensus")

e.log.Fatal(err)
}
if err := e.ensureConsensusClients(ctx); err != nil {
e.log.Fatal(err)
}

if _, err := e.beacon.OnReady(ctx, func(ctx context.Context, event *beacon.ReadyEvent) error {
e.consensus.StartAsync(ctx)
if err := e.startConsensusExporter(ctx); err != nil {
e.log.WithError(err).Error("failed to start consensus")

return nil
}); err != nil {
e.log.WithError(err).Error("Failed to subscribe to beacon node ready event")
}
e.log.Fatal(err)
}

e.beacon.StartAsync(ctx)
}()
}
if _, err := e.beacon.OnReady(ctx, func(ctx context.Context, event *beacon.ReadyEvent) error {
e.consensus.StartAsync(ctx)

e.log.
WithField("consensus_url", e.config.Consensus.URL).
WithField("execution_url", e.execution.URL()).
Info(fmt.Sprintf("Starting metrics server on :%v", port))

http.Handle("/metrics", promhttp.Handler())
return nil
}); err != nil {
e.log.WithError(err).Error("Failed to subscribe to beacon node ready event")
}

err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
e.beacon.StartAsync(ctx)
}

return err
return nil
}

func (e *exporter) bootstrapConsensusClients(ctx context.Context) error {
Expand Down

0 comments on commit 7628720

Please sign in to comment.