Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bids 2369/total withdrawals stats #2581

Closed
wants to merge 9 commits into from
120 changes: 119 additions & 1 deletion cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var opts = struct {
DataConcurrency uint64
Transformers string
Table string
Columns string
Family string
Key string
ValidatorNameRanges string
Expand All @@ -51,7 +52,7 @@ var opts = struct {

func main() {
configPath := flag.String("config", "config/default.config.yml", "Path to the config file")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, generate-config-from-testnet-stub, export-genesis-validators")
flag.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, generate-config-from-testnet-stub, export-genesis-validators, export-stats-totals")
flag.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
flag.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
flag.Uint64Var(&opts.User, "user", 0, "user id")
Expand All @@ -68,6 +69,7 @@ func main() {
flag.Uint64Var(&opts.BatchSize, "data.batchSize", 1000, "Batch size")
flag.StringVar(&opts.Transformers, "transformers", "", "Comma separated list of transformers used by the eth1 indexer")
flag.StringVar(&opts.ValidatorNameRanges, "validator-name-ranges", "https://config.dencun-devnet-8.ethpandaops.io/api/v1/nodes/validator-ranges", "url to or json of validator-ranges (format must be: {'ranges':{'X-Y':'name'}})")
flag.StringVar(&opts.Columns, "columns", "", "Comma separated list of columns that should be affected by the command")
dryRun := flag.String("dry-run", "true", "if 'false' it deletes all rows starting with the key, per default it only logs the rows that would be deleted, but does not really delete them")
versionFlag := flag.Bool("version", false, "Show version and exit")
flag.Parse()
Expand Down Expand Up @@ -321,6 +323,8 @@ func main() {
if err != nil {
logrus.Fatal(err)
}
case "export-stats-totals":
exportStatsTotals(opts.Columns, opts.StartDay, opts.EndDay, opts.DataConcurrency)
default:
utils.LogFatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand Down Expand Up @@ -898,3 +902,117 @@ func exportHistoricPrices(dayStart uint64, dayEnd uint64) {

logrus.Info("historic price update run completed")
}

func exportStatsTotals(columns string, dayStart, dayEnd, concurrency uint64) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example Usage:

go run cmd/misc/main.go --config __gitignore/config.yml --command export-stats-totals --columns withdrawals_total,withdrawals_amount_total --day-start 10 --day-end 12

start := time.Now()
logrus.Infof("exporting stats totals for columns '%v'", columns)

// validate columns input
columnsSlice := strings.Split(columns, ",")
validColumns := []string{
"cl_rewards_gwei_total",
"el_rewards_wei_total",
"mev_rewards_wei_total",
"missed_attestations_total",
"participated_sync_total",
"missed_sync_total",
"orphaned_sync_total",
"withdrawals_total",
"withdrawals_amount_total",
}

OUTER:
for _, c := range columnsSlice {
for _, vc := range validColumns {
if c == vc {
// valid column found, continue to next column from input
continue OUTER
}
}
// no valid column matched, exit with error
utils.LogFatal(nil, "invalid column provided, please use a valid one", 0, map[string]interface{}{
"usedColumn": c,
"validColumns": validColumns,
})
}

// build insert query from input columns
var totalClauses []string
var conflictClauses []string

for _, col := range columnsSlice {
totalClause := fmt.Sprintf("COALESCE(vs1.%s, 0) + COALESCE(vs2.%s, 0)", strings.TrimSuffix(col, "_total"), col)
totalClauses = append(totalClauses, totalClause)

conflictClause := fmt.Sprintf("%s = excluded.%s", col, col)
conflictClauses = append(conflictClauses, conflictClause)
}

insertQuery := fmt.Sprintf(`
INSERT INTO validator_stats (validatorindex, day, %s)
SELECT
vs1.validatorindex,
vs1.day,
%s
FROM validator_stats vs1
LEFT JOIN validator_stats vs2
ON vs2.day = vs1.day - 1 AND vs2.validatorindex = vs1.validatorindex
WHERE vs1.day = $1 AND vs1.validatorindex >= $2 AND vs1.validatorindex < $3
Eisei24 marked this conversation as resolved.
Show resolved Hide resolved
ON CONFLICT (validatorindex, day) DO UPDATE SET %s;`,
strings.Join(columnsSlice, ",\n\t"),
strings.Join(totalClauses, ",\n\t\t"),
strings.Join(conflictClauses, ",\n\t"))

for day := dayStart; day <= dayEnd; day++ {
timeDay := time.Now()
logrus.Infof("exporting total sync and for columns %v for day %v", columns, day)

// get max validator index for day
firstEpoch, _ := utils.GetFirstAndLastEpochForDay(day + 1)
maxValidatorIndex, err := db.BigtableClient.GetMaxValidatorindexForEpoch(firstEpoch)
if err != nil {
utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: could not get max validator index", 0, map[string]interface{}{
"epoch": firstEpoch,
})
} else if maxValidatorIndex == uint64(0) {
utils.LogFatal(err, "error in GetMaxValidatorindexForEpoch: no validator found", 0, map[string]interface{}{
"epoch": firstEpoch,
})
}

ctx := context.Background()
g, gCtx := errgroup.WithContext(ctx)
g.SetLimit(int(concurrency))

batchSize := 1000

// insert stats totals for each batch of validators
for b := 0; b <= int(maxValidatorIndex); b += batchSize {
start := b
end := b + batchSize // exclusive
if int(maxValidatorIndex) < end {
end = int(maxValidatorIndex)
}

g.Go(func() error {
select {
case <-gCtx.Done():
return gCtx.Err()
default:
}

_, err = db.WriterDb.Exec(insertQuery, day, start, end)
return err
})
}
if err = g.Wait(); err != nil {
utils.LogFatal(err, "error exporting stats totals", 0, map[string]interface{}{
"day": day,
"columns": columns,
})
}
logrus.Infof("finished exporting stats totals for columns '%v for day %v, took %v", columns, day, time.Since(timeDay))
}

logrus.Infof("finished all exporting stats totals for columns '%v' for days %v - %v, took %v", columns, dayStart, dayEnd, time.Since(start))
}
48 changes: 38 additions & 10 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2377,12 +2377,40 @@ func GetTotalAmountWithdrawn() (sum uint64, count uint64, err error) {
Sum uint64 `db:"sum"`
Count uint64 `db:"count"`
}{}
lastExportedDay, err := GetLastExportedStatisticDay()
if err != nil {
return 0, 0, fmt.Errorf("error getting latest exported statistic day for withdrawals count: %w", err)
}
_, lastEpochOfDay := utils.GetFirstAndLastEpochForDay(lastExportedDay)
cutoffSlot := (lastEpochOfDay * utils.Config.Chain.ClConfig.SlotsPerEpoch) + 1

err = ReaderDb.Get(&res, `
SELECT
COALESCE(sum(w.amount), 0) as sum,
COALESCE(count(*), 0) as count
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'`)
WITH today AS (
SELECT
COALESCE(SUM(w.amount), 0) as sum,
COUNT(*) as count
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
WHERE w.block_slot >= $1
),
stats AS (
SELECT
COALESCE(SUM(withdrawals_amount_total), 0) as sum,
COALESCE(SUM(withdrawals_total), 0) as count
FROM validator_stats
WHERE day = $2
)
SELECT
today.sum + stats.sum as sum,
today.count + stats.count as count
FROM today, stats;`, cutoffSlot, lastExportedDay)
if err != nil {
if err == sql.ErrNoRows {
return 0, 0, nil
}
return 0, 0, fmt.Errorf("error fetching total withdrawal count and amount: %w", err)
}

return res.Sum, res.Count, err
}

Expand Down Expand Up @@ -2648,18 +2676,18 @@ func GetTotalWithdrawalsCount(validators []uint64) (uint64, error) {

err = ReaderDb.Get(&count, `
WITH today AS (
SELECT COUNT(*) as count_today
SELECT COUNT(*) as count
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
WHERE w.validatorindex = ANY($1) AND w.block_slot >= $2
),
stats AS (
SELECT COALESCE(SUM(withdrawals), 0) as total_count
SELECT COALESCE(SUM(withdrawals_total), 0) as count
FROM validator_stats
WHERE validatorindex = ANY($1)
WHERE validatorindex = ANY($1) AND day = $3
)
SELECT today.count_today + stats.total_count
FROM today, stats;`, validatorFilter, cutoffSlot)
SELECT today.count + stats.count
FROM today, stats;`, validatorFilter, cutoffSlot, lastExportedDay)
if err != nil {
if err == sql.ErrNoRows {
return 0, nil
Expand Down
13 changes: 13 additions & 0 deletions db/migrations/temp_add_total_withdrawals_stats.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- +goose Up
-- +goose StatementBegin
SELECT 'up SQL query - add total withdrawal count and amount columns to stats';
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_total INT;
ALTER TABLE validator_stats ADD COLUMN IF NOT EXISTS withdrawals_amount_total BIGINT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
SELECT 'down SQL query - remove total withdrawal count and amount columns from stats';
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_total;
ALTER TABLE validator_stats DROP COLUMN IF EXISTS withdrawals_amount_total;
-- +goose StatementEnd
10 changes: 10 additions & 0 deletions db/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ func WriteValidatorStatisticsForDay(day uint64) error {

// update mev reward total
data.MEVRewardsWeiTotal = previousDayData.MEVRewardsWeiTotal.Add(data.MEVRewardsWei)

// update withdrawal total
data.WithdrawalsTotal = previousDayData.WithdrawalsTotal + data.Withdrawals
data.WithdrawalsAmountTotal = previousDayData.WithdrawalsAmountTotal + data.WithdrawalsAmount
}

conn, err := WriterDb.Conn(context.Background())
Expand Down Expand Up @@ -233,7 +237,9 @@ func WriteValidatorStatisticsForDay(day uint64) error {
"deposits",
"deposits_amount",
"withdrawals",
"withdrawals_total",
"withdrawals_amount",
"withdrawals_amount_total",
"cl_rewards_gwei",
"cl_rewards_gwei_total",
"el_rewards_wei",
Expand Down Expand Up @@ -269,7 +275,9 @@ func WriteValidatorStatisticsForDay(day uint64) error {
validatorData[i].Deposits,
validatorData[i].DepositsAmount,
validatorData[i].Withdrawals,
validatorData[i].WithdrawalsTotal,
validatorData[i].WithdrawalsAmount,
validatorData[i].WithdrawalsAmountTotal,
validatorData[i].ClRewardsGWei,
validatorData[i].ClRewardsGWeiTotal,
validatorData[i].ElRewardsWei,
Expand Down Expand Up @@ -840,7 +848,9 @@ func gatherStatisticsForDay(day int64) ([]*types.ValidatorStatsTableDbRow, error
COALESCE(deposits, 0) AS deposits,
COALESCE(deposits_amount, 0) AS deposits_amount,
COALESCE(withdrawals, 0) AS withdrawals,
COALESCE(withdrawals_total, 0) AS withdrawals_total,
COALESCE(withdrawals_amount, 0) AS withdrawals_amount,
COALESCE(withdrawals_amount_total, 0) AS withdrawals_amount_total,
COALESCE(cl_rewards_gwei, 0) AS cl_rewards_gwei,
COALESCE(cl_rewards_gwei_total, 0) AS cl_rewards_gwei_total,
COALESCE(el_rewards_wei, 0) AS el_rewards_wei,
Expand Down
10 changes: 5 additions & 5 deletions handlers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1576,7 +1576,7 @@ func apiValidator(w http.ResponseWriter, r *http.Request) {
WITH today AS (
SELECT
w.validatorindex,
COALESCE(SUM(w.amount), 0) as amount_today
COALESCE(SUM(w.amount), 0) as amount
FROM blocks_withdrawals w
INNER JOIN blocks b ON b.blockroot = w.block_root AND b.status = '1'
WHERE w.validatorindex = ANY($1) AND w.block_slot >= $2
Expand All @@ -1585,15 +1585,15 @@ func apiValidator(w http.ResponseWriter, r *http.Request) {
stats AS (
SELECT
vs.validatorindex,
COALESCE(SUM(vs.withdrawals_amount), 0) as total_amount
COALESCE(SUM(vs.withdrawals_amount_total), 0) as amount
FROM validator_stats vs
WHERE vs.validatorindex = ANY($1)
WHERE vs.validatorindex = ANY($1) AND day = $3
GROUP BY vs.validatorindex
),
withdrawals_summary AS (
SELECT
COALESCE(t.validatorindex, s.validatorindex) as validatorindex,
COALESCE(t.amount_today, 0) + COALESCE(s.total_amount, 0) as total
COALESCE(t.amount, 0) + COALESCE(s.amount, 0) as total
FROM today t
FULL JOIN stats s ON t.validatorindex = s.validatorindex
)
Expand All @@ -1612,7 +1612,7 @@ func apiValidator(w http.ResponseWriter, r *http.Request) {
LEFT JOIN withdrawals_summary ws ON ws.validatorindex = v.validatorindex
WHERE v.validatorindex = ANY($1)
ORDER BY v.validatorindex;
`, pq.Array(queryIndices), cutoffSlot)
`, pq.Array(queryIndices), cutoffSlot, lastExportedDay)
if err != nil {
logger.Warnf("error retrieving validator data from db: %v", err)
sendErrorResponse(w, r.URL.String(), "could not retrieve db results")
Expand Down
6 changes: 4 additions & 2 deletions types/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,10 @@ type ValidatorStatsTableDbRow struct {
Deposits int64 `db:"deposits"`
DepositsAmount int64 `db:"deposits_amount"`

Withdrawals int64 `db:"withdrawals"`
WithdrawalsAmount int64 `db:"withdrawals_amount"`
Withdrawals int64 `db:"withdrawals"`
WithdrawalsTotal int64 `db:"withdrawals_total"`
WithdrawalsAmount int64 `db:"withdrawals_amount"`
WithdrawalsAmountTotal int64 `db:"withdrawals_amount_total"`

ClRewardsGWei int64 `db:"cl_rewards_gwei"`
ClRewardsGWeiTotal int64 `db:"cl_rewards_gwei_total"`
Expand Down
Loading