Skip to content

Commit

Permalink
add: attestation table and removed ProcessInclusionDelays() and Proce…
Browse files Browse the repository at this point in the history
…ssAttestations() that are using GetValidatorFromCommitteeIndex (where data from NewEpochData was causing memory leak)
  • Loading branch information
riccardo-gnosis committed Sep 24, 2024
1 parent 21d68fa commit f345406
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 15 deletions.
18 changes: 9 additions & 9 deletions pkg/analyzer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ func TestAltairRewards(t *testing.T) {
rewards.AttestationReward,
phase0.Gwei(12322))

assert.Equal(t,
rewards.AttSlot,
phase0.Slot(6565698))
// assert.Equal(t,
// rewards.AttSlot,
// phase0.Slot(6565698))
assert.Equal(t,
rewards.BaseReward,
phase0.Gwei(14816))
Expand Down Expand Up @@ -206,9 +206,9 @@ func TestAltairRewards(t *testing.T) {
rewards.AttestationReward,
phase0.Gwei(12322))

assert.Equal(t,
rewards.AttSlot,
phase0.Slot(6565704))
// assert.Equal(t,
// rewards.AttSlot,
// phase0.Slot(6565704))
assert.Equal(t,
rewards.BaseReward,
phase0.Gwei(14816))
Expand Down Expand Up @@ -260,9 +260,9 @@ func TestAltairNegativeRewards(t *testing.T) {
rewards.AttestationReward,
phase0.Gwei(12122))

assert.Equal(t,
rewards.AttSlot,
phase0.Slot(6565786))
// assert.Equal(t,
// rewards.AttSlot,
// phase0.Slot(6565786))
assert.Equal(t,
rewards.BaseReward,
phase0.Gwei(14816))
Expand Down
9 changes: 8 additions & 1 deletion pkg/analyzer/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@ func (s *ChainAnalyzer) ProcessBlock(slot phase0.Slot) {

block := s.downloadCache.BlockHistory.Wait(SlotTo[uint64](slot))

err := s.dbClient.PersistBlocks([]spec.AgnosticBlock{*block})
agnosticBlock := []spec.AgnosticBlock{*block}

err := s.dbClient.PersistBlocks(agnosticBlock)
if err != nil {
log.Errorf("error persisting blocks: %s", err.Error())
}

errAtt := s.dbClient.PersistAttestations(agnosticBlock)
if errAtt != nil {
log.Errorf("error persisting attestations: %s", errAtt.Error())
}

var withdrawals []spec.Withdrawal
for _, item := range block.ExecutionPayload.Withdrawals {
withdrawals = append(withdrawals, spec.Withdrawal{
Expand Down
8 changes: 8 additions & 0 deletions pkg/analyzer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (s *ChainAnalyzer) AdvanceFinalized(newFinalizedSlot phase0.Slot) {

s.dbClient.DeleteBlockMetrics(phase0.Slot(slot))
log.Infof("rewriting metrics for slot %d", slot)

s.dbClient.DeleteAttestationsMetrics(phase0.Slot(slot))
log.Infof("rewriting attestatiins for slot %d", slot)

// write slot metrics
s.ProcessBlock(phase0.Slot(slot))
}
Expand Down Expand Up @@ -94,6 +98,10 @@ func (s *ChainAnalyzer) HandleReorg(newReorg v1.ChainReorgEvent) {
}
s.dbClient.DeleteBlockMetrics(i)
log.Infof("rewriting metrics for slot %d", i)

s.dbClient.DeleteAttestationsMetrics(i)
log.Infof("rewriting attestatiins for slot %d", i)

// write slot metrics
s.ProcessBlock(i)
} else {
Expand Down
125 changes: 125 additions & 0 deletions pkg/db/attestations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package db

import (
"github.com/ClickHouse/ch-go/proto"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/migalabs/goteth/pkg/spec"
)

var (
attestationsTable = "t_attestations"
insertAttestationQuery = `
INSERT INTO %s (
f_timestamp,
f_epoch,
f_slot,
f_attestation_index,
f_attestation_slot,
f_attestation_beacon_block_root,
f_attestation_source_epoch,
f_attestation_source_root,
f_attestation_target_epoch,
f_attestation_target_root)
VALUES`

deleteAttestationQuery = `
DELETE FROM %s
WHERE f_slot = $1;
`
)

func attestationInput(attestations []spec.Attestation) proto.Input {
// one object per column
var (
f_timestamp proto.ColUInt64
f_epoch proto.ColUInt64
f_slot proto.ColUInt64
f_attestation_index proto.ColUInt64
f_attestation_slot proto.ColUInt64
f_attestation_beacon_block_root proto.ColStr
f_attestation_source_epoch proto.ColUInt64
f_attestation_source_root proto.ColStr
f_attestation_target_epoch proto.ColUInt64
f_attestation_target_root proto.ColStr
)

for _, attestation := range attestations {

if attestation.Attestation != nil && attestation.Attestation.Data != nil {

f_timestamp.Append(uint64(attestation.Timestamp))
f_epoch.Append(uint64(attestation.Slot / spec.SlotsPerEpoch))
f_slot.Append(uint64(attestation.Slot))

f_attestation_index.Append(uint64(attestation.Attestation.Data.Index))
f_attestation_slot.Append(uint64(attestation.Attestation.Data.Slot))
f_attestation_beacon_block_root.Append(attestation.Attestation.Data.BeaconBlockRoot.String())

if attestation.Attestation.Data.Source != nil {
f_attestation_source_epoch.Append(uint64(attestation.Attestation.Data.Source.Epoch))
f_attestation_source_root.Append(attestation.Attestation.Data.Source.Root.String())
}

if attestation.Attestation.Data.Target != nil {
f_attestation_target_epoch.Append(uint64(attestation.Attestation.Data.Target.Epoch))
f_attestation_target_root.Append(attestation.Attestation.Data.Target.Root.String())
}
}
}

return proto.Input{
{Name: "f_attestation_slot", Data: f_attestation_slot},
{Name: "f_timestamp", Data: f_timestamp},
{Name: "f_epoch", Data: f_epoch},
{Name: "f_slot", Data: f_slot},
{Name: "f_attestation_index", Data: f_attestation_index},
{Name: "f_attestation_beacon_block_root", Data: f_attestation_beacon_block_root},
{Name: "f_attestation_source_epoch", Data: f_attestation_source_epoch},
{Name: "f_attestation_source_root", Data: f_attestation_source_root},
{Name: "f_attestation_target_epoch", Data: f_attestation_target_epoch},
{Name: "f_attestation_target_root", Data: f_attestation_target_root},
}
}

func (s *DBService) DeleteAttestationsMetrics(slot phase0.Slot) error {

err := s.Delete(DeletableObject{
query: deleteAttestationQuery,
table: attestationsTable,
args: []any{slot},
})
if err != nil {
return err
}
return nil
}

func (p *DBService) PersistAttestations(data []spec.AgnosticBlock) error {

attestations := make([]spec.Attestation, 0)
for _, block := range data {
for _, attestation := range block.Attestations {
att := spec.Attestation{}
att.Slot = phase0.Slot(block.Slot)
att.Timestamp = block.ExecutionPayload.Timestamp
att.Attestation = attestation
attestations = append(attestations, att)
}
}

persistObj := PersistableObject[spec.Attestation]{
input: attestationInput,
table: attestationsTable,
query: insertAttestationQuery,
}

for _, att := range attestations {
persistObj.Append(att)
}

err := p.Persist(persistObj.ExportPersist())
if err != nil {
log.Errorf("error persisting attestations: %s", err.Error())
}
return err
}
3 changes: 2 additions & 1 deletion pkg/db/migrations/000001_init_schema.down.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
DROP TABLE IF EXISTS t_block_metrics;
DROP TABLE IF EXISTS t_attestations;
DROP TABLE IF EXISTS t_epoch_metrics_summary;
DROP TABLE IF EXISTS t_pool_summary;
DROP TABLE IF EXISTS t_proposer_duties;
Expand All @@ -12,4 +13,4 @@ DROP TABLE IF EXISTS t_finalized_checkpoint;
DROP TABLE IF EXISTS t_genesis;
DROP TABLE IF EXISTS t_orphans;
DROP TABLE IF EXISTS t_eth2_pubkeys;
DROP TABLE IF EXISTS t_head_events;
DROP TABLE IF EXISTS t_head_events;
14 changes: 14 additions & 0 deletions pkg/db/migrations/000001_init_schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ CREATE TABLE IF NOT EXISTS t_block_metrics(
ENGINE = ReplacingMergeTree()
ORDER BY (f_slot);

CREATE TABLE IF NOT EXISTS t_attestations(
f_timestamp UInt64,
f_epoch UInt64,
f_slot UInt64,
f_attestation_index UInt64,
f_attestation_slot UInt64,
f_attestation_beacon_block_root TEXT,
f_attestation_source_epoch UInt64,
f_attestation_source_root TEXT,
f_attestation_target_epoch UInt64,
f_attestation_target_root TEXT)
ENGINE = ReplacingMergeTree()
ORDER BY (f_attestation_slot);

CREATE TABLE IF NOT EXISTS t_orphans(
f_timestamp UInt64,
f_epoch UInt64,
Expand Down
1 change: 1 addition & 0 deletions pkg/db/prometheus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (r *DBService) initMonitorMetrics() {
blobEventsTable,
blockRewardsTable,
blocksTable,
attestationsTable,
epochsTable,
finalizedTable,
genesisTable,
Expand Down
1 change: 1 addition & 0 deletions pkg/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ type Input[T any] func(t T) proto.Input

type PersistableObject[
T spec.AgnosticBlock |
spec.Attestation |
spec.Epoch |
api.FinalizedCheckpointEvent |
int64 |
Expand Down
16 changes: 16 additions & 0 deletions pkg/spec/attestation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package spec

import (
"github.com/attestantio/go-eth2-client/spec/phase0"
)

// TODO: review
type Attestation struct {
Slot phase0.Slot
Timestamp uint64
Attestation *phase0.Attestation
}

func (f Attestation) Type() ModelType {
return AttestationModel
}
1 change: 1 addition & 0 deletions pkg/spec/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ const (
ReorgModel
FinalizedCheckpointModel
HeadEventModel
AttestationModel
)

type ValidatorStatus int8
Expand Down
4 changes: 2 additions & 2 deletions pkg/spec/metrics/state_altair.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ func (p *AltairMetrics) PreProcessBundle() {

if !p.baseMetrics.PrevState.EmptyStateRoot() && !p.baseMetrics.CurrentState.EmptyStateRoot() {
// block rewards
p.ProcessAttestations()
// p.ProcessAttestations()
// p.ProcessInclusionDelays()
p.ProcessSlashings()
p.ProcessSyncAggregates()

p.GetMaxFlagIndexDeltas()
p.ProcessInclusionDelays()
p.GetMaxSyncComReward()
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/spec/metrics/state_deneb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ func (p *DenebMetrics) PreProcessBundle() {

if !p.baseMetrics.PrevState.EmptyStateRoot() && !p.baseMetrics.CurrentState.EmptyStateRoot() {
// block rewards
p.ProcessAttestations()
// p.ProcessAttestations()
// p.ProcessInclusionDelays()

p.ProcessSlashings()
p.ProcessSyncAggregates()

p.GetMaxFlagIndexDeltas()
p.ProcessInclusionDelays()
p.GetMaxSyncComReward()
}
}
Expand Down

0 comments on commit f345406

Please sign in to comment.