Skip to content

Commit

Permalink
add beacon state and decorated event publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jun 30, 2022
1 parent 14fc812 commit 883394d
Show file tree
Hide file tree
Showing 17 changed files with 676 additions and 130 deletions.
2 changes: 1 addition & 1 deletion pkg/exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type DiskUsage struct {
func DefaultConfig() *Config {
return &Config{
Execution: ExecutionNode{
Enabled: true,
Enabled: false,
Name: "execution",
URL: "http://localhost:8545",
Modules: []string{"eth", "net", "web3"},
Expand Down
70 changes: 44 additions & 26 deletions pkg/exporter/consensus/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/davecgh/go-spew/spew"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/event"
"github.com/sirupsen/logrus"
)

Expand All @@ -20,20 +21,19 @@ type Node struct {
// Clients
api api.ConsensusClient
client eth2client.Service
events *event.DecoratedPublisher

// Internal data stores
spec *Spec
genesis *v1.Genesis

// Misc
specFetchedAt time.Time
state *state.Container
}

func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient, client eth2client.Service) *Node {
func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient, client eth2client.Service, events *event.DecoratedPublisher) *Node {
return &Node{
log: log,
api: ap,
client: client,
events: events,
}
}

Expand All @@ -42,46 +42,64 @@ func (n *Node) Start(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 5):
case <-time.After(time.Second * 1):
n.tick(ctx)
}
}
}

func (n *Node) tick(ctx context.Context) {
if time.Since(n.specFetchedAt) > 15*time.Minute {
if err := n.fetchSpec(ctx); err != nil {
n.log.Errorf("failed to fetch spec: %v", err)
func (n *Node) StartAsync(ctx context.Context) {
go func() {
if err := n.Start(ctx); err != nil {
n.log.WithError(err).Error("Failed to start beacon node")
}
}()
}

if _, err := n.GetGenesis(ctx); err != nil {
n.log.Errorf("failed to fetch genesis: %v", err)
func (n *Node) tick(ctx context.Context) {
if n.state == nil {
if err := n.InitializeState(ctx); err != nil {
n.log.WithError(err).Error("Failed to initialize state")
}
}
}

func (n *Node) InitializeState(ctx context.Context) error {
n.log.Info("Initializing beacon state")

spec, err := n.GetSpec(ctx)
if err != nil {
return err
}

genesis, err := n.GetGenesis(ctx)
if err != nil {
return err
}

st := state.NewContainer(ctx, n.log, spec, genesis, n.events)

if _, _, err := n.CurrentSlot(ctx); err != nil {
n.log.Errorf("failed to get current slot: %v", err)
if err := st.Init(ctx); err != nil {
return err
}

n.state = &st

return nil
}

func (n *Node) fetchSpec(ctx context.Context) error {
func (n *Node) GetSpec(ctx context.Context) (*state.Spec, error) {
provider, isProvider := n.client.(eth2client.SpecProvider)
if !isProvider {
return errors.New("client does not implement eth2client.SpecProvider")
return nil, errors.New("client does not implement eth2client.SpecProvider")
}

data, err := provider.Spec(ctx)
if err != nil {
return err
return nil, err
}

spec := NewSpec(data)

n.spec = &spec
spec := state.NewSpec(data)

n.specFetchedAt = time.Now()

spew.Dump(spec)

return nil
return &spec, nil
}
42 changes: 0 additions & 42 deletions pkg/exporter/consensus/beacon/epoch.go

This file was deleted.

40 changes: 0 additions & 40 deletions pkg/exporter/consensus/beacon/proposer_duties.go

This file was deleted.

12 changes: 12 additions & 0 deletions pkg/exporter/consensus/beacon/state/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package state

import (
"time"

"github.com/attestantio/go-eth2-client/spec"
)

type TimedBlock struct {
Block *spec.VersionedSignedBeaconBlock
SeenAt time.Time
}
51 changes: 51 additions & 0 deletions pkg/exporter/consensus/beacon/state/epoch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package state

import (
"errors"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

type Epoch struct {
ProposerDuties ProposerDuties
Blocks MapOfSlotToBlock
}

func NewEpoch(slotsPerEpoch phase0.Slot) Epoch {
e := Epoch{
ProposerDuties: make(ProposerDuties),
Blocks: NewMapOfSlotToBlock(),
}

e.Blocks.InitializeSlots(slotsPerEpoch)

return e
}

func (e *Epoch) SetProposerDuties(duties []*v1.ProposerDuty) {
for _, duty := range duties {
e.ProposerDuties[duty.Slot] = duty
}
}

func (e *Epoch) AddBlock(block *spec.VersionedSignedBeaconBlock) error {
if block == nil {
return errors.New("block is nil")
}

return e.Blocks.AddBlock(&TimedBlock{
Block: block,
SeenAt: time.Now(),
})
}

func (e *Epoch) GetProserDutyAtSlot(slot phase0.Slot) (*v1.ProposerDuty, error) {
if e.ProposerDuties[slot] == nil {
return nil, errors.New("no proposer duty at slot")
}

return e.ProposerDuties[slot], nil
}
75 changes: 75 additions & 0 deletions pkg/exporter/consensus/beacon/state/epochs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package state

import (
"errors"
"sync"

"github.com/attestantio/go-eth2-client/spec/phase0"
)

type Epochs struct {
spec *Spec
state map[phase0.Epoch]*Epoch

mu *sync.Mutex
}

func NewEpochs(spec *Spec) Epochs {
return Epochs{
spec: spec,
state: make(map[phase0.Epoch]*Epoch),

mu: &sync.Mutex{},
}
}

func (e *Epochs) GetEpoch(epoch phase0.Epoch) (*Epoch, error) {
e.mu.Lock()
defer e.mu.Unlock()

if e.state[epoch] == nil {
return nil, errors.New("epoch not found")
}

return e.state[epoch], nil
}

func (e *Epochs) Exists(number phase0.Epoch) bool {
e.mu.Lock()
defer e.mu.Unlock()

_, ok := e.state[number]

return ok
}

func (e *Epochs) NewInitializedEpoch(number phase0.Epoch) error {
e.mu.Lock()
defer e.mu.Unlock()

epoch := NewEpoch(e.spec.SlotsPerEpoch)

return e.AddEpoch(number, &epoch)
}

func (e *Epochs) AddEpoch(number phase0.Epoch, epoch *Epoch) error {
e.mu.Lock()
defer e.mu.Unlock()

if epoch == nil {
return errors.New("epoch is nil")
}

e.state[number] = epoch

return nil
}

func (e *Epochs) RemoveEpoch(number phase0.Epoch) error {
e.mu.Lock()
defer e.mu.Unlock()

delete(e.state, number)

return nil
}
20 changes: 20 additions & 0 deletions pkg/exporter/consensus/beacon/state/proposer_duties.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package state

import (
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

type ProposerDuties = map[phase0.Slot]*v1.ProposerDuty

func NewProposerDuties(duties []*v1.ProposerDuty) ProposerDuties {
out := make(ProposerDuties)

for _, duty := range duties {
out[duty.Slot] = duty
}

return out
}

type ProposerDutiesForEpoch = map[phase0.Epoch]ProposerDuties
Loading

0 comments on commit 883394d

Please sign in to comment.