Skip to content

Commit

Permalink
feat(consensus): Ensure we dont check for an empty slot if we started…
Browse files Browse the repository at this point in the history
… up during the slot
  • Loading branch information
samcm committed Jul 12, 2022
1 parent 5a9696d commit b0cb577
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 63 deletions.
24 changes: 17 additions & 7 deletions pkg/exporter/consensus/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type node struct {

func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient, client eth2client.Service, broker *nats.EncodedConn) Node {
return &node{
log: log,
log: log.WithField("module", "consensus/beacon"),
api: ap,
client: client,
broker: broker,
Expand Down Expand Up @@ -272,7 +272,7 @@ func (n *node) subscribeToSelf(ctx context.Context) error {
}

func (n *node) subscribeDownstream(ctx context.Context) error {
if err := n.state.OnEpochChanged(ctx, n.handleStateEpochChanged); err != nil {
if err := n.state.OnEpochSlotChanged(ctx, n.handleStateEpochSlotChanged); err != nil {
return err
}

Expand Down Expand Up @@ -319,12 +319,22 @@ func (n *node) handleDownstreamEmptySlot(ctx context.Context, epoch phase0.Epoch
return nil
}

func (n *node) handleStateEpochChanged(ctx context.Context, epoch phase0.Epoch) error {
func (n *node) handleStateEpochSlotChanged(ctx context.Context, epochNumber phase0.Epoch, slot phase0.Slot) error {
n.log.WithFields(logrus.Fields{
"epoch": epoch,
}).Info("Current epoch changed")
"epoch": epochNumber,
"slot": slot,
}).Info("Current epoch/slot changed")

for i := epochNumber; i < epochNumber+1; i++ {
epoch, err := n.state.GetEpoch(ctx, i)
if err != nil {
return err
}

if epoch.HaveProposerDuties() {
continue
}

for i := epoch; i < epoch+1; i++ {
if err := n.fetchEpochProposerDuties(ctx, i); err != nil {
return err
}
Expand Down Expand Up @@ -367,7 +377,7 @@ func (n *node) initializeState(ctx context.Context) error {

n.state = &st

n.log.Info("Beacon state initialized!")
n.log.Info("Beacon state initialized! Ready to serve requests...")

return nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/exporter/consensus/beacon/state/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Epoch struct {
EndTime time.Time
Duration time.Duration
bundle BlockTimeCalculatorBundle

haveProposerDuties bool
}

func NewEpoch(epochNumber phase0.Epoch, slotsPerEpoch phase0.Slot, bundle BlockTimeCalculatorBundle) Epoch {
Expand All @@ -34,6 +36,8 @@ func NewEpoch(epochNumber phase0.Epoch, slotsPerEpoch phase0.Slot, bundle BlockT
EndTime: bundle.Genesis.GenesisTime.Add((time.Duration(lastSlot) * bundle.SecondsPerSlot)).Add(bundle.SecondsPerSlot),
Duration: bundle.SecondsPerSlot * time.Duration(slotsPerEpoch),
bundle: bundle,

haveProposerDuties: false,
}

return e
Expand Down Expand Up @@ -81,9 +85,15 @@ func (e *Epoch) SetProposerDuties(duties []*v1.ProposerDuty) error {
}
}

e.haveProposerDuties = true

return nil
}

func (e *Epoch) HaveProposerDuties() bool {
return e.haveProposerDuties
}

func (e *Epoch) GetSlot(slotNumber phase0.Slot) (*Slot, error) {
return e.slots.Get(slotNumber)
}
Expand Down
21 changes: 11 additions & 10 deletions pkg/exporter/consensus/beacon/state/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,38 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
)

func (c *Container) handleCallbackError(err error, topic string) {
if err != nil {
c.log.WithError(err).WithField("topic", topic).Error("Receieved error from subscriber callback")
}
}

func (c *Container) publishEpochChanged(ctx context.Context, epoch phase0.Epoch) {
for _, cb := range c.callbacksEpochChanged {
//nolint:errcheck // we dont care if the callback fails
cb(ctx, epoch)
c.handleCallbackError(cb(ctx, epoch), "epochs_changed")
}
}

func (c *Container) publishSlotChanged(ctx context.Context, slot phase0.Slot) {
for _, cb := range c.callbacksSlotChanged {
//nolint:errcheck // we dont care if the callback fails
cb(ctx, slot)
c.handleCallbackError(cb(ctx, slot), "slots_changed")
}
}

func (c *Container) publishEpochSlotChanged(ctx context.Context, epoch phase0.Epoch, slot phase0.Slot) {
for _, cb := range c.callbacksEpochSlotChanged {
//nolint:errcheck // we dont care if the callback fails
cb(ctx, epoch, slot)
c.handleCallbackError(cb(ctx, epoch, slot), "epoch_slots_changed")
}
}

func (c *Container) publishBlockInserted(ctx context.Context, epoch phase0.Epoch, slot Slot) {
for _, cb := range c.callbacksBlockInserted {
//nolint:errcheck // we dont care if the callback fails
cb(ctx, epoch, slot)
c.handleCallbackError(cb(ctx, epoch, slot), "block_inserted")
}
}

func (c *Container) publishEmptySlot(ctx context.Context, epoch phase0.Epoch, slot Slot) {
for _, cb := range c.callbacksEmptySlot {
//nolint:errcheck // we dont care if the callback fails
cb(ctx, epoch, slot)
c.handleCallbackError(cb(ctx, epoch, slot), "empty_slot")
}
}
39 changes: 28 additions & 11 deletions pkg/exporter/consensus/beacon/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Container struct {

currentEpoch phase0.Epoch
currentSlot phase0.Slot
startingSlot phase0.Slot

callbacksEpochChanged []func(ctx context.Context, epoch phase0.Epoch) error
callbacksSlotChanged []func(ctx context.Context, slot phase0.Slot) error
Expand All @@ -37,13 +38,14 @@ const (
// NewContainer creates a new state container instance
func NewContainer(ctx context.Context, log logrus.FieldLogger, sp *Spec, genesis *v1.Genesis) Container {
return Container{
log: log,
log: log.WithField("sub_module", "state"),
spec: sp,

genesis: genesis,

currentEpoch: 0,
currentSlot: 0,
startingSlot: 0,

epochs: NewEpochs(sp, genesis),
}
Expand Down Expand Up @@ -107,10 +109,6 @@ func (c *Container) tick(ctx context.Context) {
if err := c.hydrateEpochs(ctx); err != nil {
c.log.WithError(err).Error("Failed to hydrate epochs")
}

if err := c.checkForNewCurrentEpochAndSlot(ctx); err != nil {
c.log.WithError(err).Error("Failed to check for new current epoch and slot")
}
}

// AddBeaconBlock adds a beacon block to the state container.
Expand All @@ -128,9 +126,7 @@ func (c *Container) AddBeaconBlock(ctx context.Context, beaconBlock *spec.Versio
epochNumber := c.calculateEpochFromSlot(slotNumber)

if exists := c.epochs.Exists(epochNumber); !exists {
if _, err = c.createEpoch(ctx, epochNumber); err != nil {
return err
}
return fmt.Errorf("epoch %d does not exist", epochNumber)
}

// Get the epoch
Expand Down Expand Up @@ -227,11 +223,17 @@ func (c *Container) SetProposerDuties(ctx context.Context, epochNumber phase0.Ep
}

func (c *Container) createEpoch(ctx context.Context, epochNumber phase0.Epoch) (*Epoch, error) {
if _, err := c.epochs.GetEpoch(epochNumber); err == nil {
return nil, fmt.Errorf("epoch %d already exists", epochNumber)
}

epoch, err := c.epochs.NewInitializedEpoch(epochNumber)
if err != nil {
return nil, err
}

c.log.WithField("epoch", epochNumber).Info("Created new epoch")

return epoch, nil
}

Expand All @@ -248,6 +250,10 @@ func (c *Container) checkForNewCurrentEpochAndSlot(ctx context.Context) error {

epochChanged = true

if err := c.hydrateEpochs(ctx); err != nil {
return err
}

// Notify the listeners of the new epoch.
go c.publishEpochChanged(ctx, epoch)

Expand All @@ -261,16 +267,27 @@ func (c *Container) checkForNewCurrentEpochAndSlot(ctx context.Context) error {
slotChanged := false

if slot != c.currentSlot {
if err := c.checkForEmptySlot(ctx, c.currentSlot); err != nil {
c.log.WithError(err).Error("Failed to check for empty slot")
}
previousSlot := c.currentSlot

c.currentSlot = slot

slotChanged = true

// Notify the listeners of the new slot.
go c.publishSlotChanged(ctx, slot)

// We can't safely check if the previous slot was missed if
// we potentially started up _after_ the slot had started.
// So we'll just not bother checking in that case.
if c.startingSlot == 0 {
if previousSlot != 0 {
c.startingSlot = previousSlot
} else {
if err := c.checkForEmptySlot(ctx, previousSlot); err != nil {
c.log.WithError(err).Error("Failed to check for empty slot")
}
}
}
}

if epochChanged || slotChanged {
Expand Down
59 changes: 24 additions & 35 deletions pkg/exporter/consensus/beacon/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,123 +8,112 @@ import (
"github.com/nats-io/nats.go"
)

func (n *node) handleSubscriberError(err error, topic string) {
if err != nil {
n.log.WithError(err).WithField("topic", topic).Error("Subscriber error")
}
}

// Official Beacon events
func (n *node) OnBlock(ctx context.Context, handler func(ctx context.Context, event *v1.BlockEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicBlock, func(event *v1.BlockEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicBlock)
})
}

func (n *node) OnAttestation(ctx context.Context, handler func(ctx context.Context, event *phase0.Attestation) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicAttestation, func(event *phase0.Attestation) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicAttestation)
})
}

func (n *node) OnChainReOrg(ctx context.Context, handler func(ctx context.Context, event *v1.ChainReorgEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicChainReorg, func(event *v1.ChainReorgEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicChainReorg)
})
}

func (n *node) OnFinalizedCheckpoint(ctx context.Context, handler func(ctx context.Context, event *v1.FinalizedCheckpointEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicFinalizedCheckpoint, func(event *v1.FinalizedCheckpointEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicFinalizedCheckpoint)
})
}

func (n *node) OnHead(ctx context.Context, handler func(ctx context.Context, event *v1.HeadEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicHead, func(event *v1.HeadEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicHead)
})
}

func (n *node) OnVoluntaryExit(ctx context.Context, handler func(ctx context.Context, event *phase0.VoluntaryExit) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicHead, func(event *phase0.VoluntaryExit) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
return n.broker.Subscribe(topicVoluntaryExit, func(event *phase0.VoluntaryExit) {
n.handleSubscriberError(handler(ctx, event), topicVoluntaryExit)
})
}

func (n *node) OnEvent(ctx context.Context, handler func(ctx context.Context, event *v1.Event) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicEvent, func(event *v1.Event) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicEvent)
})
}

// Custom Events
func (n *node) OnEpochChanged(ctx context.Context, handler func(ctx context.Context, event *EpochChangedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicEpochChanged, func(event *EpochChangedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicEpochChanged)
})
}

func (n *node) OnSlotChanged(ctx context.Context, handler func(ctx context.Context, event *SlotChangedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicSlotChanged, func(event *SlotChangedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicSlotChanged)
})
}

func (n *node) OnEpochSlotChanged(ctx context.Context, handler func(ctx context.Context, event *EpochSlotChangedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicEpochSlotChanged, func(event *EpochSlotChangedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicEpochSlotChanged)
})
}

func (n *node) OnBlockInserted(ctx context.Context, handler func(ctx context.Context, event *BlockInsertedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicBlockInserted, func(event *BlockInsertedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicBlockInserted)
})
}

func (n *node) OnReady(ctx context.Context, handler func(ctx context.Context, event *ReadyEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicReady, func(event *ReadyEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicReady)
})
}

func (n *node) OnSyncStatus(ctx context.Context, handler func(ctx context.Context, event *SyncStatusEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicSyncStatus, func(event *SyncStatusEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicSyncStatus)
})
}

func (n *node) OnNodeVersionUpdated(ctx context.Context, handler func(ctx context.Context, event *NodeVersionUpdatedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicNodeVersionUpdated, func(event *NodeVersionUpdatedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicNodeVersionUpdated)
})
}

func (n *node) OnPeersUpdated(ctx context.Context, handler func(ctx context.Context, event *PeersUpdatedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicPeersUpdated, func(event *PeersUpdatedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicPeersUpdated)
})
}

func (n *node) OnSpecUpdated(ctx context.Context, handler func(ctx context.Context, event *SpecUpdatedEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicSpecUpdated, func(event *SpecUpdatedEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicSpecUpdated)
})
}

func (n *node) OnEmptySlot(ctx context.Context, handler func(ctx context.Context, event *EmptySlotEvent) error) (*nats.Subscription, error) {
return n.broker.Subscribe(topicEmptySlot, func(event *EmptySlotEvent) {
//nolint:errcheck // safe to ignore)
handler(ctx, event)
n.handleSubscriberError(handler(ctx, event), topicEmptySlot)
})
}

0 comments on commit b0cb577

Please sign in to comment.