Skip to content

Commit

Permalink
add proposer duty fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Jul 1, 2022
1 parent c207ed9 commit 4f4ecdc
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 89 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
ethereum-metrics-exporter
ethereum-metrics-exporter
test_config.yaml
2 changes: 1 addition & 1 deletion pkg/exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type DiskUsage struct {
func DefaultConfig() *Config {
return &Config{
Execution: ExecutionNode{
Enabled: false,
Enabled: true,
Name: "execution",
URL: "http://localhost:8545",
Modules: []string{"eth", "net", "web3"},
Expand Down
59 changes: 59 additions & 0 deletions pkg/exporter/consensus/beacon/beacon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

eth2client "github.com/attestantio/go-eth2-client"
v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/api"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/beacon/state"
"github.com/samcm/ethereum-metrics-exporter/pkg/exporter/consensus/event"
Expand All @@ -26,6 +27,8 @@ type Node struct {
// Internal data stores
genesis *v1.Genesis
state *state.Container

currentEpoch phase0.Epoch
}

func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient, client eth2client.Service, events *event.DecoratedPublisher) *Node {
Expand All @@ -34,6 +37,8 @@ func NewNode(ctx context.Context, log logrus.FieldLogger, ap api.ConsensusClient
api: ap,
client: client,
events: events,

currentEpoch: phase0.Epoch(0),
}
}

Expand Down Expand Up @@ -61,7 +66,47 @@ func (n *Node) tick(ctx context.Context) {
if err := n.InitializeState(ctx); err != nil {
n.log.WithError(err).Error("Failed to initialize state")
}
} else {
if err := n.checkForNewEpoch(ctx); err != nil {
n.log.WithError(err).Error("Failed to check for new epoch")
}
}
}

func (n *Node) checkForNewEpoch(ctx context.Context) error {
currentEpoch, err := n.state.CurrentEpoch()
if err != nil {
return err
}

if n.currentEpoch != currentEpoch {
n.log.WithField("epoch", currentEpoch).Info("Epoch changed")

for i := currentEpoch; i < currentEpoch+1; i++ {
if err := n.fetchEpochProposerDuties(ctx, phase0.Epoch(i)); err != nil {
return err
}
}

n.currentEpoch = currentEpoch
}

return nil
}

func (n *Node) fetchEpochProposerDuties(ctx context.Context, epoch phase0.Epoch) error {
n.log.WithField("epoch", epoch).Info("Fetching proposer duties")

duties, err := n.GetProserDuties(ctx, epoch)
if err != nil {
return err
}

if err := n.state.SetProposerDuties(ctx, epoch, duties); err != nil {
return err
}

return nil
}

func (n *Node) InitializeState(ctx context.Context) error {
Expand Down Expand Up @@ -103,3 +148,17 @@ func (n *Node) GetSpec(ctx context.Context) (*state.Spec, error) {

return &spec, nil
}

func (n *Node) GetProserDuties(ctx context.Context, epoch phase0.Epoch) ([]*v1.ProposerDuty, error) {
provider, isProvider := n.client.(eth2client.ProposerDutiesProvider)
if !isProvider {
return nil, errors.New("client does not implement eth2client.ProposerDutiesProvider")
}

duties, err := provider.ProposerDuties(ctx, epoch, nil)
if err != nil {
return nil, err
}

return duties, nil
}
61 changes: 32 additions & 29 deletions pkg/exporter/consensus/beacon/state/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,62 +10,65 @@ import (
)

type Epoch struct {
ProposerDuties ProposerDuties
Blocks MapOfSlotToBlock
Number phase0.Epoch
Start phase0.Slot
End phase0.Slot
bundle BlockTimeCalculatorBundle
slots Slots
Number phase0.Epoch
FirstSlot phase0.Slot
LastSlot phase0.Slot
StartTime time.Time
EndTime time.Time
Duration time.Duration
bundle BlockTimeCalculatorBundle
}

func NewEpoch(epochNumber phase0.Epoch, slotsPerEpoch phase0.Slot, bundle BlockTimeCalculatorBundle) Epoch {
start := uint64(epochNumber) * uint64(slotsPerEpoch)
end := (start + uint64(slotsPerEpoch)) - 1
firstSlot := uint64(epochNumber) * uint64(slotsPerEpoch)
lastSlot := (firstSlot + uint64(slotsPerEpoch)) - 1

e := Epoch{
Number: epochNumber,
ProposerDuties: make(ProposerDuties),
Blocks: NewMapOfSlotToBlock(bundle),
Start: phase0.Slot(start),
End: phase0.Slot(end),
bundle: bundle,
slots: NewSlots(bundle),

Number: epochNumber,
FirstSlot: phase0.Slot(firstSlot),
LastSlot: phase0.Slot(lastSlot),
StartTime: bundle.Genesis.GenesisTime.Add(time.Duration(firstSlot) * bundle.SecondsPerSlot),
EndTime: bundle.Genesis.GenesisTime.Add((time.Duration(lastSlot) * bundle.SecondsPerSlot)).Add(bundle.SecondsPerSlot),
Duration: bundle.SecondsPerSlot * time.Duration(slotsPerEpoch),
bundle: bundle,
}

e.InitializeSlots(epochNumber, slotsPerEpoch)

return e
}

func (e *Epoch) SetProposerDuties(duties []*v1.ProposerDuty) {
for _, duty := range duties {
e.ProposerDuties[duty.Slot] = duty
}
}

func (e *Epoch) AddBlock(block *spec.VersionedSignedBeaconBlock) error {
if block == nil {
return errors.New("block is nil")
}

return e.Blocks.AddBlock(&TimedBlock{
return e.slots.AddBlock(&TimedBlock{
Block: block,
SeenAt: time.Now(),
})
}

func (e *Epoch) GetProserDutyAtSlot(slot phase0.Slot) (*v1.ProposerDuty, error) {
if e.ProposerDuties[slot] == nil {
return nil, errors.New("no proposer duty at slot")
}
func (e *Epoch) GetSlotProposer(slot phase0.Slot) (*v1.ProposerDuty, error) {
return e.slots.GetProposerDuty(slot)
}

func (e *Epoch) SetProposerDuties(duties []*v1.ProposerDuty) error {
return e.slots.AddProposerDuties(duties)
}

return e.ProposerDuties[slot], nil
func (e *Epoch) HasProposerDuties() bool {
return len(e.slots.proposerDuties) > 0
}

func (e *Epoch) InitializeSlots(epoch phase0.Epoch, slots phase0.Slot) {
start := uint64(e.Start)
end := uint64(e.End)
start := uint64(e.FirstSlot)
end := uint64(e.LastSlot)

for i := start; i <= end; i++ {
e.Blocks.AddEmptySlot(phase0.Slot(i))
e.slots.AddEmptySlot(phase0.Slot(i))
}
}
8 changes: 6 additions & 2 deletions pkg/exporter/consensus/beacon/state/epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ func (e *Epochs) Exists(number phase0.Epoch) bool {
return ok
}

func (e *Epochs) NewInitializedEpoch(number phase0.Epoch) error {
func (e *Epochs) NewInitializedEpoch(number phase0.Epoch) (*Epoch, error) {
epoch := NewEpoch(number, e.spec.SlotsPerEpoch, e.bundle)

return e.AddEpoch(number, &epoch)
if err := e.AddEpoch(number, &epoch); err != nil {
return nil, err
}

return &epoch, nil
}

func (e *Epochs) AddEpoch(number phase0.Epoch, epoch *Epoch) error {
Expand Down
20 changes: 0 additions & 20 deletions pkg/exporter/consensus/beacon/state/proposer_duties.go

This file was deleted.

51 changes: 38 additions & 13 deletions pkg/exporter/consensus/beacon/state/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import (
"sync"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

type MapOfSlotToBlock struct {
blocks map[phase0.Slot]*TimedBlock
bundle BlockTimeCalculatorBundle
mu *sync.Mutex
type Slots struct {
blocks map[phase0.Slot]*TimedBlock
proposerDuties map[phase0.Slot]*v1.ProposerDuty
bundle BlockTimeCalculatorBundle
mu *sync.Mutex
}

func NewMapOfSlotToBlock(bundle BlockTimeCalculatorBundle) MapOfSlotToBlock {
return MapOfSlotToBlock{
blocks: make(map[phase0.Slot]*TimedBlock),
mu: &sync.Mutex{},
bundle: bundle,
func NewSlots(bundle BlockTimeCalculatorBundle) Slots {
return Slots{
blocks: make(map[phase0.Slot]*TimedBlock),
proposerDuties: make(map[phase0.Slot]*v1.ProposerDuty),
mu: &sync.Mutex{},
bundle: bundle,
}
}

func (m *MapOfSlotToBlock) GetBlockAtSlot(slot phase0.Slot) (*TimedBlock, error) {
func (m *Slots) GetBlockAtSlot(slot phase0.Slot) (*TimedBlock, error) {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -33,7 +36,7 @@ func (m *MapOfSlotToBlock) GetBlockAtSlot(slot phase0.Slot) (*TimedBlock, error)
return m.blocks[slot], nil
}

func (m *MapOfSlotToBlock) AddBlock(timedBlock *TimedBlock) error {
func (m *Slots) AddBlock(timedBlock *TimedBlock) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -55,14 +58,14 @@ func (m *MapOfSlotToBlock) AddBlock(timedBlock *TimedBlock) error {
return nil
}

func (m *MapOfSlotToBlock) AddEmptySlot(slot phase0.Slot) {
func (m *Slots) AddEmptySlot(slot phase0.Slot) {
m.mu.Lock()
defer m.mu.Unlock()

m.blocks[slot] = nil
}

func (m *MapOfSlotToBlock) GetSlotProposerDelay(slot phase0.Slot) (time.Duration, error) {
func (m *Slots) GetSlotProposerDelay(slot phase0.Slot) (time.Duration, error) {
block, err := m.GetBlockAtSlot(slot)
if err != nil {
return 0, err
Expand All @@ -75,3 +78,25 @@ func (m *MapOfSlotToBlock) GetSlotProposerDelay(slot phase0.Slot) (time.Duration

return delay, nil
}

func (m *Slots) GetProposerDuty(slot phase0.Slot) (*v1.ProposerDuty, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.proposerDuties[slot] == nil {
return nil, errors.New("no proposer duty at slot")
}

return m.proposerDuties[slot], nil
}

func (m *Slots) AddProposerDuties(proposerDuties []*v1.ProposerDuty) error {
m.mu.Lock()
defer m.mu.Unlock()

for _, proposerDuty := range proposerDuties {
m.proposerDuties[proposerDuty.Slot] = proposerDuty
}

return nil
}
Loading

0 comments on commit 4f4ecdc

Please sign in to comment.