From f345406896335ec039e0ebdc5385510249031e48 Mon Sep 17 00:00:00 2001 From: riccardo <106812074+riccardo-gnosis@users.noreply.github.com> Date: Tue, 24 Sep 2024 13:10:24 +0200 Subject: [PATCH] add: attestation table and removed ProcessInclusionDelays() and ProcessAttestations() that are using GetValidatorFromCommitteeIndex (where data from NewEpochData was causing memory leak) --- pkg/analyzer/metrics_test.go | 18 +-- pkg/analyzer/process_block.go | 9 +- pkg/analyzer/reorg.go | 8 ++ pkg/db/attestations.go | 125 ++++++++++++++++++ pkg/db/migrations/000001_init_schema.down.sql | 3 +- pkg/db/migrations/000001_init_schema.up.sql | 14 ++ pkg/db/prometheus_metrics.go | 1 + pkg/db/service.go | 1 + pkg/spec/attestation.go | 16 +++ pkg/spec/constants.go | 1 + pkg/spec/metrics/state_altair.go | 4 +- pkg/spec/metrics/state_deneb.go | 5 +- 12 files changed, 190 insertions(+), 15 deletions(-) create mode 100644 pkg/db/attestations.go create mode 100644 pkg/spec/attestation.go diff --git a/pkg/analyzer/metrics_test.go b/pkg/analyzer/metrics_test.go index 237e82d4..9f8b4594 100644 --- a/pkg/analyzer/metrics_test.go +++ b/pkg/analyzer/metrics_test.go @@ -167,9 +167,9 @@ func TestAltairRewards(t *testing.T) { rewards.AttestationReward, phase0.Gwei(12322)) - assert.Equal(t, - rewards.AttSlot, - phase0.Slot(6565698)) + // assert.Equal(t, + // rewards.AttSlot, + // phase0.Slot(6565698)) assert.Equal(t, rewards.BaseReward, phase0.Gwei(14816)) @@ -206,9 +206,9 @@ func TestAltairRewards(t *testing.T) { rewards.AttestationReward, phase0.Gwei(12322)) - assert.Equal(t, - rewards.AttSlot, - phase0.Slot(6565704)) + // assert.Equal(t, + // rewards.AttSlot, + // phase0.Slot(6565704)) assert.Equal(t, rewards.BaseReward, phase0.Gwei(14816)) @@ -260,9 +260,9 @@ func TestAltairNegativeRewards(t *testing.T) { rewards.AttestationReward, phase0.Gwei(12122)) - assert.Equal(t, - rewards.AttSlot, - phase0.Slot(6565786)) + // assert.Equal(t, + // rewards.AttSlot, + // phase0.Slot(6565786)) assert.Equal(t, rewards.BaseReward, phase0.Gwei(14816)) diff --git a/pkg/analyzer/process_block.go b/pkg/analyzer/process_block.go index 8efd84e8..29f1bfc2 100644 --- a/pkg/analyzer/process_block.go +++ b/pkg/analyzer/process_block.go @@ -20,11 +20,18 @@ func (s *ChainAnalyzer) ProcessBlock(slot phase0.Slot) { block := s.downloadCache.BlockHistory.Wait(SlotTo[uint64](slot)) - err := s.dbClient.PersistBlocks([]spec.AgnosticBlock{*block}) + agnosticBlock := []spec.AgnosticBlock{*block} + + err := s.dbClient.PersistBlocks(agnosticBlock) if err != nil { log.Errorf("error persisting blocks: %s", err.Error()) } + errAtt := s.dbClient.PersistAttestations(agnosticBlock) + if errAtt != nil { + log.Errorf("error persisting attestations: %s", errAtt.Error()) + } + var withdrawals []spec.Withdrawal for _, item := range block.ExecutionPayload.Withdrawals { withdrawals = append(withdrawals, spec.Withdrawal{ diff --git a/pkg/analyzer/reorg.go b/pkg/analyzer/reorg.go index 352e054e..78297196 100644 --- a/pkg/analyzer/reorg.go +++ b/pkg/analyzer/reorg.go @@ -53,6 +53,10 @@ func (s *ChainAnalyzer) AdvanceFinalized(newFinalizedSlot phase0.Slot) { s.dbClient.DeleteBlockMetrics(phase0.Slot(slot)) log.Infof("rewriting metrics for slot %d", slot) + + s.dbClient.DeleteAttestationsMetrics(phase0.Slot(slot)) + log.Infof("rewriting attestatiins for slot %d", slot) + // write slot metrics s.ProcessBlock(phase0.Slot(slot)) } @@ -94,6 +98,10 @@ func (s *ChainAnalyzer) HandleReorg(newReorg v1.ChainReorgEvent) { } s.dbClient.DeleteBlockMetrics(i) log.Infof("rewriting metrics for slot %d", i) + + s.dbClient.DeleteAttestationsMetrics(i) + log.Infof("rewriting attestatiins for slot %d", i) + // write slot metrics s.ProcessBlock(i) } else { diff --git a/pkg/db/attestations.go b/pkg/db/attestations.go new file mode 100644 index 00000000..7795b408 --- /dev/null +++ b/pkg/db/attestations.go @@ -0,0 +1,125 @@ +package db + +import ( + "github.com/ClickHouse/ch-go/proto" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/migalabs/goteth/pkg/spec" +) + +var ( + attestationsTable = "t_attestations" + insertAttestationQuery = ` + INSERT INTO %s ( + f_timestamp, + f_epoch, + f_slot, + f_attestation_index, + f_attestation_slot, + f_attestation_beacon_block_root, + f_attestation_source_epoch, + f_attestation_source_root, + f_attestation_target_epoch, + f_attestation_target_root) + VALUES` + + deleteAttestationQuery = ` + DELETE FROM %s + WHERE f_slot = $1; +` +) + +func attestationInput(attestations []spec.Attestation) proto.Input { + // one object per column + var ( + f_timestamp proto.ColUInt64 + f_epoch proto.ColUInt64 + f_slot proto.ColUInt64 + f_attestation_index proto.ColUInt64 + f_attestation_slot proto.ColUInt64 + f_attestation_beacon_block_root proto.ColStr + f_attestation_source_epoch proto.ColUInt64 + f_attestation_source_root proto.ColStr + f_attestation_target_epoch proto.ColUInt64 + f_attestation_target_root proto.ColStr + ) + + for _, attestation := range attestations { + + if attestation.Attestation != nil && attestation.Attestation.Data != nil { + + f_timestamp.Append(uint64(attestation.Timestamp)) + f_epoch.Append(uint64(attestation.Slot / spec.SlotsPerEpoch)) + f_slot.Append(uint64(attestation.Slot)) + + f_attestation_index.Append(uint64(attestation.Attestation.Data.Index)) + f_attestation_slot.Append(uint64(attestation.Attestation.Data.Slot)) + f_attestation_beacon_block_root.Append(attestation.Attestation.Data.BeaconBlockRoot.String()) + + if attestation.Attestation.Data.Source != nil { + f_attestation_source_epoch.Append(uint64(attestation.Attestation.Data.Source.Epoch)) + f_attestation_source_root.Append(attestation.Attestation.Data.Source.Root.String()) + } + + if attestation.Attestation.Data.Target != nil { + f_attestation_target_epoch.Append(uint64(attestation.Attestation.Data.Target.Epoch)) + f_attestation_target_root.Append(attestation.Attestation.Data.Target.Root.String()) + } + } + } + + return proto.Input{ + {Name: "f_attestation_slot", Data: f_attestation_slot}, + {Name: "f_timestamp", Data: f_timestamp}, + {Name: "f_epoch", Data: f_epoch}, + {Name: "f_slot", Data: f_slot}, + {Name: "f_attestation_index", Data: f_attestation_index}, + {Name: "f_attestation_beacon_block_root", Data: f_attestation_beacon_block_root}, + {Name: "f_attestation_source_epoch", Data: f_attestation_source_epoch}, + {Name: "f_attestation_source_root", Data: f_attestation_source_root}, + {Name: "f_attestation_target_epoch", Data: f_attestation_target_epoch}, + {Name: "f_attestation_target_root", Data: f_attestation_target_root}, + } +} + +func (s *DBService) DeleteAttestationsMetrics(slot phase0.Slot) error { + + err := s.Delete(DeletableObject{ + query: deleteAttestationQuery, + table: attestationsTable, + args: []any{slot}, + }) + if err != nil { + return err + } + return nil +} + +func (p *DBService) PersistAttestations(data []spec.AgnosticBlock) error { + + attestations := make([]spec.Attestation, 0) + for _, block := range data { + for _, attestation := range block.Attestations { + att := spec.Attestation{} + att.Slot = phase0.Slot(block.Slot) + att.Timestamp = block.ExecutionPayload.Timestamp + att.Attestation = attestation + attestations = append(attestations, att) + } + } + + persistObj := PersistableObject[spec.Attestation]{ + input: attestationInput, + table: attestationsTable, + query: insertAttestationQuery, + } + + for _, att := range attestations { + persistObj.Append(att) + } + + err := p.Persist(persistObj.ExportPersist()) + if err != nil { + log.Errorf("error persisting attestations: %s", err.Error()) + } + return err +} diff --git a/pkg/db/migrations/000001_init_schema.down.sql b/pkg/db/migrations/000001_init_schema.down.sql index a1714d39..dd21d665 100644 --- a/pkg/db/migrations/000001_init_schema.down.sql +++ b/pkg/db/migrations/000001_init_schema.down.sql @@ -1,4 +1,5 @@ DROP TABLE IF EXISTS t_block_metrics; +DROP TABLE IF EXISTS t_attestations; DROP TABLE IF EXISTS t_epoch_metrics_summary; DROP TABLE IF EXISTS t_pool_summary; DROP TABLE IF EXISTS t_proposer_duties; @@ -12,4 +13,4 @@ DROP TABLE IF EXISTS t_finalized_checkpoint; DROP TABLE IF EXISTS t_genesis; DROP TABLE IF EXISTS t_orphans; DROP TABLE IF EXISTS t_eth2_pubkeys; -DROP TABLE IF EXISTS t_head_events; \ No newline at end of file +DROP TABLE IF EXISTS t_head_events; diff --git a/pkg/db/migrations/000001_init_schema.up.sql b/pkg/db/migrations/000001_init_schema.up.sql index 790d4486..8ac8d195 100644 --- a/pkg/db/migrations/000001_init_schema.up.sql +++ b/pkg/db/migrations/000001_init_schema.up.sql @@ -27,6 +27,20 @@ CREATE TABLE IF NOT EXISTS t_block_metrics( ENGINE = ReplacingMergeTree() ORDER BY (f_slot); +CREATE TABLE IF NOT EXISTS t_attestations( + f_timestamp UInt64, + f_epoch UInt64, + f_slot UInt64, + f_attestation_index UInt64, + f_attestation_slot UInt64, + f_attestation_beacon_block_root TEXT, + f_attestation_source_epoch UInt64, + f_attestation_source_root TEXT, + f_attestation_target_epoch UInt64, + f_attestation_target_root TEXT) + ENGINE = ReplacingMergeTree() + ORDER BY (f_attestation_slot); + CREATE TABLE IF NOT EXISTS t_orphans( f_timestamp UInt64, f_epoch UInt64, diff --git a/pkg/db/prometheus_metrics.go b/pkg/db/prometheus_metrics.go index a09e93db..b119dfc8 100644 --- a/pkg/db/prometheus_metrics.go +++ b/pkg/db/prometheus_metrics.go @@ -79,6 +79,7 @@ func (r *DBService) initMonitorMetrics() { blobEventsTable, blockRewardsTable, blocksTable, + attestationsTable, epochsTable, finalizedTable, genesisTable, diff --git a/pkg/db/service.go b/pkg/db/service.go index 1632a952..71ccf2bd 100644 --- a/pkg/db/service.go +++ b/pkg/db/service.go @@ -141,6 +141,7 @@ type Input[T any] func(t T) proto.Input type PersistableObject[ T spec.AgnosticBlock | + spec.Attestation | spec.Epoch | api.FinalizedCheckpointEvent | int64 | diff --git a/pkg/spec/attestation.go b/pkg/spec/attestation.go new file mode 100644 index 00000000..924e7bac --- /dev/null +++ b/pkg/spec/attestation.go @@ -0,0 +1,16 @@ +package spec + +import ( + "github.com/attestantio/go-eth2-client/spec/phase0" +) + +// TODO: review +type Attestation struct { + Slot phase0.Slot + Timestamp uint64 + Attestation *phase0.Attestation +} + +func (f Attestation) Type() ModelType { + return AttestationModel +} diff --git a/pkg/spec/constants.go b/pkg/spec/constants.go index 16e69f9e..27c140c9 100644 --- a/pkg/spec/constants.go +++ b/pkg/spec/constants.go @@ -89,6 +89,7 @@ const ( ReorgModel FinalizedCheckpointModel HeadEventModel + AttestationModel ) type ValidatorStatus int8 diff --git a/pkg/spec/metrics/state_altair.go b/pkg/spec/metrics/state_altair.go index f6579c78..62c8e8c9 100644 --- a/pkg/spec/metrics/state_altair.go +++ b/pkg/spec/metrics/state_altair.go @@ -45,12 +45,12 @@ func (p *AltairMetrics) PreProcessBundle() { if !p.baseMetrics.PrevState.EmptyStateRoot() && !p.baseMetrics.CurrentState.EmptyStateRoot() { // block rewards - p.ProcessAttestations() + // p.ProcessAttestations() + // p.ProcessInclusionDelays() p.ProcessSlashings() p.ProcessSyncAggregates() p.GetMaxFlagIndexDeltas() - p.ProcessInclusionDelays() p.GetMaxSyncComReward() } } diff --git a/pkg/spec/metrics/state_deneb.go b/pkg/spec/metrics/state_deneb.go index 0d93ea29..0f16f326 100644 --- a/pkg/spec/metrics/state_deneb.go +++ b/pkg/spec/metrics/state_deneb.go @@ -42,12 +42,13 @@ func (p *DenebMetrics) PreProcessBundle() { if !p.baseMetrics.PrevState.EmptyStateRoot() && !p.baseMetrics.CurrentState.EmptyStateRoot() { // block rewards - p.ProcessAttestations() + // p.ProcessAttestations() + // p.ProcessInclusionDelays() + p.ProcessSlashings() p.ProcessSyncAggregates() p.GetMaxFlagIndexDeltas() - p.ProcessInclusionDelays() p.GetMaxSyncComReward() } }