Skip to content

Commit

Permalink
feat(v1): implement clickhouse as source for historical consensus lay…
Browse files Browse the repository at this point in the history
…er data (#2974)

chore(v1): implement data cleaning

feat(v1): timestamp balance data

feat(v1): use correct sync committee votes table

feat(v1): fix rewards and penalties for sync committee data
  • Loading branch information
peterbitfly authored Dec 20, 2024
1 parent cd9220f commit 09812aa
Show file tree
Hide file tree
Showing 11 changed files with 893 additions and 436 deletions.
154 changes: 154 additions & 0 deletions cmd/clickhouse_integration_test/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"flag"
"fmt"

"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"

"github.com/gobitfly/eth2-beaconchain-explorer/db"
"github.com/gobitfly/eth2-beaconchain-explorer/types"

itypes "github.com/gobitfly/eth-rewards/types"
"github.com/gobitfly/eth2-beaconchain-explorer/utils"
"github.com/gobitfly/eth2-beaconchain-explorer/version"
"github.com/sirupsen/logrus"
)

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")

flag.Parse()

cfg := &types.Config{}
err := utils.ReadConfig(cfg, *configPath)
if err != nil {
logrus.Fatalf("error reading config file: %v", err)
}
utils.Config = cfg
logrus.WithFields(logrus.Fields{
"config": *configPath,
"version": version.Version,
"chainName": utils.Config.Chain.ClConfig.ConfigName}).Printf("starting")

db.MustInitDB(&types.DatabaseConfig{
Username: cfg.WriterDatabase.Username,
Password: cfg.WriterDatabase.Password,
Name: cfg.WriterDatabase.Name,
Host: cfg.WriterDatabase.Host,
Port: cfg.WriterDatabase.Port,
MaxOpenConns: cfg.WriterDatabase.MaxOpenConns,
MaxIdleConns: cfg.WriterDatabase.MaxIdleConns,
SSL: cfg.WriterDatabase.SSL,
}, &types.DatabaseConfig{
Username: cfg.ReaderDatabase.Username,
Password: cfg.ReaderDatabase.Password,
Name: cfg.ReaderDatabase.Name,
Host: cfg.ReaderDatabase.Host,
Port: cfg.ReaderDatabase.Port,
MaxOpenConns: cfg.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ReaderDatabase.MaxIdleConns,
SSL: cfg.ReaderDatabase.SSL,
}, "pgx", "postgres")

db.MustInitClickhouseDB(nil, &types.DatabaseConfig{
Username: cfg.ClickHouse.ReaderDatabase.Username,
Password: cfg.ClickHouse.ReaderDatabase.Password,
Name: cfg.ClickHouse.ReaderDatabase.Name,
Host: cfg.ClickHouse.ReaderDatabase.Host,
Port: cfg.ClickHouse.ReaderDatabase.Port,
MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
SSL: true,
}, "clickhouse", "clickhouse")

bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, fmt.Sprintf("%d", utils.Config.Chain.ClConfig.DepositChainID), utils.Config.RedisCacheEndpoint)
if err != nil {
logrus.Fatalf("error connecting to bigtable: %v", err)
}
db.BigtableClient = bt

utils.Config.ClickhouseDelay = 0

// verification funcs are to be run against mainnet
// normal attestation
logrus.Infof("verifying history for normal attestations")
verifyHistory([]uint64{653161}, uint64(323260), uint64(323270), true, true)

// income details for sync rewards (this currently fails !!!)
logrus.Infof("verifying history for sync rewards")
verifyHistory([]uint64{653162}, uint64(323260), uint64(323261), true, false)

// block proposed
logrus.Infof("verifying history for proposer rewards at end of an epoch")
verifyHistory([]uint64{388033}, uint64(323268), uint64(323270), true, false)

logrus.Infof("verifying history for proposer rewards at start of an epoch")
verifyHistory([]uint64{1284148}, uint64(323268), uint64(323270), true, false)

logrus.Infof("verifying history for proposer rewards at during an epoch")
verifyHistory([]uint64{1208852}, uint64(323268), uint64(323270), true, false)

// missed attestations
logrus.Infof("verifying history for missed attestations")
verifyHistory([]uint64{76040}, uint64(323260), uint64(323270), true, true)

// missed slots
logrus.Infof("verifying history for missed slots")
verifyHistory([]uint64{858473}, uint64(323250), uint64(323260), true, true)

// slashing (5902 was slashed by 792015)
logrus.Infof("verifying history for a slashed validator")
verifyHistory([]uint64{5902}, uint64(314635), uint64(314645), true, false)
logrus.Infof("verifying history for a slashing validator")
verifyHistory([]uint64{792015}, uint64(314635), uint64(314645), true, false)

// validator during activation
logrus.Infof("verifying history for a validator during activation")
verifyHistory([]uint64{894572}, uint64(266960), uint64(266970), true, false)

// validator during exit
logrus.Infof("verifying history for a validator during exit")
verifyHistory([]uint64{1646687}, uint64(323090), uint64(323110), true, false)

}

func verifyHistory(validatorIndices []uint64, epochStart, epochEnd uint64, income, balance bool) {
slotsPerEpoch := utils.Config.Chain.ClConfig.SlotsPerEpoch
logrus.Infof("verifying history for validator indices %v from epoch %d to %d", validatorIndices, epochStart, epochEnd)
if income {
compare(db.BigtableClient.GetValidatorIncomeDetailsHistory, validatorIndices, epochStart, epochEnd)
}
if balance {
compare(db.BigtableClient.GetValidatorBalanceHistory, validatorIndices, epochStart, epochEnd)
}
compare(db.BigtableClient.GetValidatorAttestationHistory, validatorIndices, epochStart, epochEnd)
compare(db.BigtableClient.GetValidatorMissedAttestationHistory, validatorIndices, epochStart, epochEnd)
compare(db.BigtableClient.GetValidatorSyncDutiesHistory, validatorIndices, epochStart*slotsPerEpoch, epochEnd*slotsPerEpoch)
}

type GenericFunc[T any] func([]uint64, uint64, uint64) (T, error)

func compare[T any](compareFunc GenericFunc[T], validatorIndices []uint64, epochStart, epochEnd uint64) {
utils.Config.ClickHouseEnabled = false
bigtableData, err := compareFunc(validatorIndices, epochStart, epochEnd)
if err != nil {
logrus.Fatalf("error getting validator income details history from bigtable: %v", err)
}
utils.Config.ClickHouseEnabled = true
clickhouseData, err := compareFunc(validatorIndices, epochStart, epochEnd)
if err != nil {
logrus.Fatalf("error getting validator income details history from clickhouse: %v", err)
}
diff := cmp.Diff(bigtableData, clickhouseData, cmpopts.IgnoreUnexported(itypes.ValidatorEpochIncome{}))
if diff != "" {
logrus.Infof("bigtable")
spew.Dump(bigtableData)
logrus.Infof("clickhouse")
spew.Dump(clickhouseData)
logrus.Info(diff)
logrus.Fatalf("bigtable and clickhouse data do not match")
}
}
17 changes: 17 additions & 0 deletions cmd/explorer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,23 @@ func main() {
}, "pgx", "postgres")
}()

if utils.Config.ClickHouseEnabled {
wg.Add(1)
go func() {
defer wg.Done()
db.MustInitClickhouseDB(nil, &types.DatabaseConfig{
Username: cfg.ClickHouse.ReaderDatabase.Username,
Password: cfg.ClickHouse.ReaderDatabase.Password,
Name: cfg.ClickHouse.ReaderDatabase.Name,
Host: cfg.ClickHouse.ReaderDatabase.Host,
Port: cfg.ClickHouse.ReaderDatabase.Port,
MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
SSL: true,
}, "clickhouse", "clickhouse")
}()
}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
18 changes: 0 additions & 18 deletions cmd/migrations/bigtable/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,31 +132,13 @@ func main() {
}
return nil
})
g.Go(func() error {
err := db.BigtableClient.SaveProposalAssignments(epoch, data.ValidatorAssignmentes.ProposerAssignments)
if err != nil {
return fmt.Errorf("error exporting proposal assignments to bigtable for epoch %v: %w", epoch, err)
}
return nil
})
g.Go(func() error {
err := db.BigtableClient.SaveAttestationDuties(data.AttestationDuties)
if err != nil {
return fmt.Errorf("error exporting attestations to bigtable for epoch %v: %w", epoch, err)
}
return nil
})
g.Go(func() error {
for _, blocks := range data.Blocks {
for _, block := range blocks {
err := db.BigtableClient.SaveProposal(block)
if err != nil {
return fmt.Errorf("error exporting proposals to bigtable for slot %v: %w", block.Slot, err)
}
}
}
return nil
})
g.Go(func() error {
err := db.BigtableClient.SaveSyncComitteeDuties(data.SyncDuties)
if err != nil {
Expand Down
Loading

0 comments on commit 09812aa

Please sign in to comment.