Skip to content

Commit

Permalink
linting & startup dep refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jul 8, 2022
1 parent 3743801 commit ec818d2
Show file tree
Hide file tree
Showing 24 changed files with 332 additions and 247 deletions.
49 changes: 45 additions & 4 deletions pkg/exporter/consensus/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,59 @@ import (

type Node interface {
// Lifecycle
// Start starts the node.
Start(ctx context.Context) error
// StartAsync starts the node asynchronously.
StartAsync(ctx context.Context)

// Getters
// GetEpoch returns the epoch for the given epoch.
GetEpoch(ctx context.Context, epoch phase0.Epoch) (*state.Epoch, error)
// GetSlot returns the slot for the given slot.
GetSlot(ctx context.Context, slot phase0.Slot) (*state.Slot, error)
// GetSpec returns the spec for the node.
GetSpec(ctx context.Context) (*state.Spec, error)
// GetSyncState returns the sync state for the node.
GetSyncState(ctx context.Context) (*v1.SyncState, error)
// GetGenesis returns the genesis for the node.
GetGenesis(ctx context.Context) (*v1.Genesis, error)

// Subscriptions
// - Proxied Beacon events
// OnEvent is called when a beacon event is received.
OnEvent(ctx context.Context, handler func(ctx context.Context, ev *v1.Event) error) (*nats.Subscription, error)
// OnBlock is called when a block is received.
OnBlock(ctx context.Context, handler func(ctx context.Context, ev *v1.BlockEvent) error) (*nats.Subscription, error)
// OnAttestation is called when an attestation is received.
OnAttestation(ctx context.Context, handler func(ctx context.Context, ev *phase0.Attestation) error) (*nats.Subscription, error)
// OnFinalizedCheckpoint is called when a finalized checkpoint is received.
OnFinalizedCheckpoint(ctx context.Context, handler func(ctx context.Context, ev *v1.FinalizedCheckpointEvent) error) (*nats.Subscription, error)
// OnHead is called when the head is received.
OnHead(ctx context.Context, handler func(ctx context.Context, ev *v1.HeadEvent) error) (*nats.Subscription, error)
// OnChainReOrg is called when a chain reorg is received.
OnChainReOrg(ctx context.Context, handler func(ctx context.Context, ev *v1.ChainReorgEvent) error) (*nats.Subscription, error)
// OnVoluntaryExit is called when a voluntary exit is received.
OnVoluntaryExit(ctx context.Context, handler func(ctx context.Context, ev *phase0.VoluntaryExit) error) (*nats.Subscription, error)

// - Custom events
// OnReady is called when the node is ready.
OnReady(ctx context.Context, handler func(ctx context.Context, event *ReadyEvent) error) (*nats.Subscription, error)
// OnEpochChanged is called when the current epoch changes.
OnEpochChanged(ctx context.Context, handler func(ctx context.Context, event *EpochChangedEvent) error) (*nats.Subscription, error)
// OnSlotChanged is called when the current slot changes.
OnSlotChanged(ctx context.Context, handler func(ctx context.Context, event *SlotChangedEvent) error) (*nats.Subscription, error)
// OnEpochSlotChanged is called when the current epoch or slot changes.
OnEpochSlotChanged(ctx context.Context, handler func(ctx context.Context, event *EpochSlotChangedEvent) error) (*nats.Subscription, error)
// OnBlockInserted is called when a block is inserted.
OnBlockInserted(ctx context.Context, handler func(ctx context.Context, event *BlockInsertedEvent) error) (*nats.Subscription, error)
// OnSyncStatus is called when the sync status changes.
OnSyncStatus(ctx context.Context, handler func(ctx context.Context, event *SyncStatusEvent) error) (*nats.Subscription, error)
// OnNodeVersionUpdated is called when the node version is updated.
OnNodeVersionUpdated(ctx context.Context, handler func(ctx context.Context, event *NodeVersionUpdatedEvent) error) (*nats.Subscription, error)
// OnPeersUpdated is called when the peers are updated.
OnPeersUpdated(ctx context.Context, handler func(ctx context.Context, event *PeersUpdatedEvent) error) (*nats.Subscription, error)
// OnSpecUpdated is called when the spec is updated.
OnSpecUpdated(ctx context.Context, handler func(ctx context.Context, event *SpecUpdatedEvent) error) (*nats.Subscription, error)
}

// Node represents an Ethereum beacon node. It computes values based on the spec.
Expand Down Expand Up @@ -150,6 +175,10 @@ func (n *node) GetSyncState(ctx context.Context) (*v1.SyncState, error) {
return n.syncing, nil
}

func (n *node) GetGenesis(ctx context.Context) (*v1.Genesis, error) {
return n.genesis, nil
}

func (n *node) tick(ctx context.Context) {
if n.state == nil {
if err := n.initializeState(ctx); err != nil {
Expand All @@ -164,12 +193,12 @@ func (n *node) tick(ctx context.Context) {
n.log.WithError(err).Error("Failed to subscribe to self")
}

//nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine
go n.ensureBeaconSubscription(ctx)

if err := n.publishReady(ctx); err != nil {
n.log.WithError(err).Error("Failed to publish ready")
}

//nolint:errcheck // we dont care if this errors out since it runs indefinitely in a goroutine
go n.ensureBeaconSubscription(ctx)
}
}

Expand Down Expand Up @@ -275,6 +304,12 @@ func (n *node) handleStateEpochChanged(ctx context.Context, epoch phase0.Epoch)
}
}

// Delete old epochs
previousEpoch := epoch - 5
if err := n.state.DeleteEpoch(ctx, previousEpoch); err != nil {
return err
}

return nil
}

Expand All @@ -299,7 +334,7 @@ func (n *node) initializeState(ctx context.Context) error {
return err
}

genesis, err := n.GetGenesis(ctx)
genesis, err := n.fetchGenesis(ctx)
if err != nil {
return err
}
Expand All @@ -312,6 +347,8 @@ func (n *node) initializeState(ctx context.Context) error {

n.state = &st

n.log.Info("Beacon state initialized!")

return nil
}

Expand All @@ -328,6 +365,10 @@ func (n *node) getSpec(ctx context.Context) (*state.Spec, error) {

sp := state.NewSpec(data)

if err := n.publishSpecUpdated(ctx, &sp); err != nil {
return nil, err
}

return &sp, nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/exporter/consensus/beacon/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state"
)

const (
Expand All @@ -16,6 +17,7 @@ const (
topicSyncStatus = "sync_status"
topicNodeVersionUpdated = "node_version_updated"
topicPeersUpdated = "peers_updated"
topicSpecUpdated = "spec_updated"

// Official beacon events that are proxied
topicAttestation = "attestation"
Expand Down Expand Up @@ -59,3 +61,7 @@ type NodeVersionUpdatedEvent struct {
type PeersUpdatedEvent struct {
Peers types.Peers
}

type SpecUpdatedEvent struct {
Spec *state.Spec
}
2 changes: 1 addition & 1 deletion pkg/exporter/consensus/beacon/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
)

func (n *node) GetGenesis(ctx context.Context) (*v1.Genesis, error) {
func (n *node) fetchGenesis(ctx context.Context) (*v1.Genesis, error) {
provider, isProvider := n.client.(eth2client.GenesisProvider)
if !isProvider {
return nil, errors.New("client does not implement eth2client.GenesisProvider")
Expand Down
11 changes: 9 additions & 2 deletions pkg/exporter/consensus/beacon/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api/types"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state"
)

// Official beacon events that are proxied
Expand Down Expand Up @@ -67,9 +68,9 @@ func (n *node) publishBlockInserted(ctx context.Context, slot phase0.Slot) error
})
}

func (n *node) publishSyncStatus(ctx context.Context, state *v1.SyncState) error {
func (n *node) publishSyncStatus(ctx context.Context, st *v1.SyncState) error {
return n.broker.Publish(topicSyncStatus, &SyncStatusEvent{
State: state,
State: st,
})
}

Expand All @@ -84,3 +85,9 @@ func (n *node) publishPeersUpdated(ctx context.Context, peers types.Peers) error
Peers: peers,
})
}

func (n *node) publishSpecUpdated(ctx context.Context, spec *state.Spec) error {
return n.broker.Publish(topicSpecUpdated, &SpecUpdatedEvent{
Spec: spec,
})
}
1 change: 1 addition & 0 deletions pkg/exporter/consensus/beacon/state/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/attestantio/go-eth2-client/spec"
)

// TimedBlock is a block with a timestamp.
type TimedBlock struct {
Block *spec.VersionedSignedBeaconBlock
SeenAt time.Time
Expand Down
10 changes: 6 additions & 4 deletions pkg/exporter/consensus/beacon/state/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ func NewEpoch(epochNumber phase0.Epoch, slotsPerEpoch phase0.Slot, bundle BlockT
bundle: bundle,
}

e.InitializeSlots()

return e
}

Expand Down Expand Up @@ -90,13 +88,17 @@ func (e *Epoch) GetSlot(slotNumber phase0.Slot) (*Slot, error) {
return e.slots.Get(slotNumber)
}

func (e *Epoch) InitializeSlots() {
func (e *Epoch) InitializeSlots() error {
start := uint64(e.FirstSlot)
end := uint64(e.LastSlot)

for i := start; i <= end; i++ {
slot := NewSlot(phase0.Slot(i), e.bundle)

e.slots.Add(&slot)
if err := e.slots.Add(&slot); err != nil {
return err
}
}

return nil
}
4 changes: 4 additions & 0 deletions pkg/exporter/consensus/beacon/state/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (e *Epochs) Exists(number phase0.Epoch) bool {
func (e *Epochs) NewInitializedEpoch(number phase0.Epoch) (*Epoch, error) {
epoch := NewEpoch(number, e.spec.SlotsPerEpoch, e.bundle)

if err := epoch.InitializeSlots(); err != nil {
return nil, err
}

if err := e.AddEpoch(number, &epoch); err != nil {
return nil, err
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/exporter/consensus/beacon/state/fork_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,22 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
)

// ForkEpoch is a beacon fork that activates at a specific epoch.
type ForkEpoch struct {
Epoch phase0.Epoch
Name string
}

// Active returns true if the fork is active at the given slot.
func (f *ForkEpoch) Active(slot, slotsPerEpoch phase0.Slot) bool {
return phase0.Epoch(int(slot)/int(slotsPerEpoch)) > f.Epoch
}

// ForkEpochs is a list of forks that activate at specific epochs.
type ForkEpochs []ForkEpoch

func (f *ForkEpochs) Active(slot phase0.Slot, slotsPerEpoch phase0.Slot) []ForkEpoch {
// Active returns a list of forks that are active at the given slot.
func (f *ForkEpochs) Active(slot, slotsPerEpoch phase0.Slot) []ForkEpoch {
activated := []ForkEpoch{}

for _, fork := range *f {
Expand All @@ -29,6 +33,7 @@ func (f *ForkEpochs) Active(slot phase0.Slot, slotsPerEpoch phase0.Slot) []ForkE
return activated
}

// CurrentFork returns the current fork at the given slot.
func (f *ForkEpochs) Scheduled(slot, slotsPerEpoch phase0.Slot) []ForkEpoch {
scheduled := []ForkEpoch{}

Expand All @@ -41,6 +46,7 @@ func (f *ForkEpochs) Scheduled(slot, slotsPerEpoch phase0.Slot) []ForkEpoch {
return scheduled
}

// CurrentFork returns the current fork at the given slot.
func (f *ForkEpochs) CurrentFork(slot, slotsPerEpoch phase0.Slot) (ForkEpoch, error) {
largest := ForkEpoch{
Epoch: 0,
Expand Down
8 changes: 8 additions & 0 deletions pkg/exporter/consensus/beacon/state/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
)

// Slot is a slot in the beacon chain.
type Slot struct {
block *TimedBlock
proposerDuty *v1.ProposerDuty
Expand All @@ -17,6 +18,7 @@ type Slot struct {
mu *sync.Mutex
}

// NewSlot returns a new slot.
func NewSlot(number phase0.Slot, bundle BlockTimeCalculatorBundle) Slot {
return Slot{
block: nil,
Expand All @@ -27,10 +29,12 @@ func NewSlot(number phase0.Slot, bundle BlockTimeCalculatorBundle) Slot {
}
}

// Number returns the slot number.
func (m *Slot) Number() phase0.Slot {
return m.number
}

// Block returns the block for the slot (if it exists).
func (m *Slot) Block() (*TimedBlock, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -42,6 +46,7 @@ func (m *Slot) Block() (*TimedBlock, error) {
return m.block, nil
}

// AddBlock adds a block to the slot.
func (m *Slot) AddBlock(timedBlock *TimedBlock) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -68,6 +73,7 @@ func (m *Slot) AddBlock(timedBlock *TimedBlock) error {
return nil
}

// ProposerDelay calculates the amount of time it took for the proposer to publish the block.
func (m *Slot) ProposerDelay() (time.Duration, error) {
if m.block == nil {
return 0, errors.New("block does not exist")
Expand All @@ -80,6 +86,7 @@ func (m *Slot) ProposerDelay() (time.Duration, error) {
return delay, nil
}

// ProposerDuty returns the proposer duty for the slot (if it exists).
func (m *Slot) ProposerDuty() (*v1.ProposerDuty, error) {
if m.proposerDuty == nil {
return nil, errors.New("proposer duty does not exist")
Expand All @@ -88,6 +95,7 @@ func (m *Slot) ProposerDuty() (*v1.ProposerDuty, error) {
return m.proposerDuty, nil
}

// SetProposerDuty sets the proposer duty for the slot.
func (m *Slot) SetProposerDuty(proposerDuty *v1.ProposerDuty) error {
if proposerDuty.Slot != m.number {
return errors.New("proposer duty slot does not match slot")
Expand Down
5 changes: 5 additions & 0 deletions pkg/exporter/consensus/beacon/state/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
)

// Slots is a collection of slots.
type Slots struct {
state map[phase0.Slot]*Slot
bundle BlockTimeCalculatorBundle
mu *sync.Mutex
}

// NewSlots returns a new slots instance.
func NewSlots(bundle BlockTimeCalculatorBundle) Slots {
return Slots{
state: make(map[phase0.Slot]*Slot),
Expand All @@ -21,6 +23,7 @@ func NewSlots(bundle BlockTimeCalculatorBundle) Slots {
}
}

// Add adds a slot to the collection.
func (m *Slots) Add(slot *Slot) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -34,6 +37,7 @@ func (m *Slots) Add(slot *Slot) error {
return nil
}

// Get returns a slot from the collection.
func (m *Slots) Get(slot phase0.Slot) (*Slot, error) {
m.mu.Lock()
defer m.mu.Unlock()
Expand All @@ -45,6 +49,7 @@ func (m *Slots) Get(slot phase0.Slot) (*Slot, error) {
return m.state[slot], nil
}

// Delete deletes a slot from the collection.
func (m *Slots) Delete(slot phase0.Slot) error {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/exporter/consensus/beacon/state/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/cast"
)

// Spec represents the state of the spec.
type Spec struct {
PresetBase string
ConfigName string
Expand Down Expand Up @@ -44,6 +45,7 @@ type Spec struct {
ForkEpochs ForkEpochs
}

// NewSpec creates a new spec instance.
func NewSpec(data map[string]interface{}) Spec {
spec := Spec{
ForkEpochs: ForkEpochs{},
Expand Down
Loading

0 comments on commit ec818d2

Please sign in to comment.