Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v1): use real timestamps for cl bigtable data #2989

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions cmd/clickhouse_integration_test/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ func main() {

// income details for sync rewards (this currently fails !!!)
logrus.Infof("verifying history for sync rewards")
verifyHistory([]uint64{653162}, uint64(323260), uint64(323261), true, false)
verifyHistory([]uint64{653162}, uint64(323260), uint64(323261), true, true)

// block proposed
logrus.Infof("verifying history for proposer rewards at end of an epoch")
verifyHistory([]uint64{388033}, uint64(323268), uint64(323270), true, false)
verifyHistory([]uint64{388033}, uint64(323268), uint64(323270), true, true)

logrus.Infof("verifying history for proposer rewards at start of an epoch")
verifyHistory([]uint64{1284148}, uint64(323268), uint64(323270), true, false)
verifyHistory([]uint64{1284148}, uint64(323268), uint64(323270), true, true)

logrus.Infof("verifying history for proposer rewards at during an epoch")
verifyHistory([]uint64{1208852}, uint64(323268), uint64(323270), true, false)
verifyHistory([]uint64{1208852}, uint64(323268), uint64(323270), true, true)

// missed attestations
logrus.Infof("verifying history for missed attestations")
Expand All @@ -101,17 +101,17 @@ func main() {

// slashing (5902 was slashed by 792015)
logrus.Infof("verifying history for a slashed validator")
verifyHistory([]uint64{5902}, uint64(314635), uint64(314645), true, false)
verifyHistory([]uint64{5902}, uint64(314635), uint64(314645), true, true)
logrus.Infof("verifying history for a slashing validator")
verifyHistory([]uint64{792015}, uint64(314635), uint64(314645), true, false)
verifyHistory([]uint64{792015}, uint64(314635), uint64(314645), true, true)

// validator during activation
logrus.Infof("verifying history for a validator during activation")
verifyHistory([]uint64{894572}, uint64(266960), uint64(266970), true, false)
verifyHistory([]uint64{894572}, uint64(266960), uint64(266970), true, true)

// validator during exit
logrus.Infof("verifying history for a validator during exit")
verifyHistory([]uint64{1646687}, uint64(323090), uint64(323110), true, false)
verifyHistory([]uint64{1646687}, uint64(323090), uint64(323110), true, true)

}

Expand Down
138 changes: 73 additions & 65 deletions db/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,17 +611,17 @@ func (bigtable *Bigtable) SaveAttestationDuties(duties map[types.Slot]map[types.

for attestedSlot, validators := range duties {
for validator, inclusions := range validators {

epoch := utils.EpochOfSlot(uint64(attestedSlot))
bigtable.LastAttestationCacheMux.Lock()
if len(inclusions) == 0 { // for missed attestations we write the max block number which will yield a cell ts of 0
inclusions = append(inclusions, MAX_CL_BLOCK_NUMBER)
if len(inclusions) == 0 { // for missed attestations we write the the attested slot as inclusion slot (attested == included means no attestation duty was performed)
inclusions = append(inclusions, attestedSlot)
}
for _, inclusionSlot := range inclusions {
key := fmt.Sprintf("%s:%s:%s:%s", bigtable.chainId, bigtable.validatorIndexToKey(uint64(validator)), ATTESTATIONS_FAMILY, bigtable.reversedPaddedEpoch(epoch))

mutInclusionSlot := gcp_bigtable.NewMutation()
mutInclusionSlot.Set(ATTESTATIONS_FAMILY, fmt.Sprintf("%d", attestedSlot), gcp_bigtable.Timestamp((MAX_CL_BLOCK_NUMBER-inclusionSlot)*1000), []byte{})
ts := gcp_bigtable.Time(utils.SlotToTime(uint64(inclusionSlot)))
mutInclusionSlot.Set(ATTESTATIONS_FAMILY, fmt.Sprintf("%d", attestedSlot), ts, []byte{})

mutsInclusionSlot.Add(key, mutInclusionSlot)

Expand All @@ -639,10 +639,9 @@ func (bigtable *Bigtable) SaveAttestationDuties(duties map[types.Slot]map[types.
}
mutLastAttestationSlot = gcp_bigtable.NewMutation()
mutLastAttestationSlotCount = 0
logger.Infof("applyied last attestation slot mutations in %v", time.Since(mutStart))
logger.Infof("applied last attestation slot mutations in %v", time.Since(mutStart))
}
}

}
bigtable.LastAttestationCacheMux.Unlock()
}
Expand Down Expand Up @@ -694,9 +693,11 @@ func (bigtable *Bigtable) SaveSyncComitteeDuties(duties map[types.Slot]map[types
for validator, participated := range validators {
mut := gcp_bigtable.NewMutation()
if participated {
mut.Set(SYNC_COMMITTEES_FAMILY, "s", gcp_bigtable.Timestamp((MAX_CL_BLOCK_NUMBER-slot)*1000), []byte{})
ts := gcp_bigtable.Time(utils.SlotToTime(uint64(slot)).Add(time.Second)) // add 1 second to avoid collisions with duties
mut.Set(SYNC_COMMITTEES_FAMILY, "s", ts, []byte{})
} else {
mut.Set(SYNC_COMMITTEES_FAMILY, "s", gcp_bigtable.Timestamp(0), []byte{})
ts := gcp_bigtable.Time(utils.SlotToTime(uint64(slot)))
mut.Set(SYNC_COMMITTEES_FAMILY, "s", ts, []byte{})
}
key := fmt.Sprintf("%s:%s:%s:%s:%s", bigtable.chainId, bigtable.validatorIndexToKey(uint64(validator)), SYNC_COMMITTEES_FAMILY, bigtable.reversedPaddedEpoch(utils.EpochOfSlot(uint64(slot))), bigtable.reversedPaddedSlot(uint64(slot)))

Expand Down Expand Up @@ -747,17 +748,20 @@ func (bigtable *Bigtable) getMaxValidatorindexForEpochV2(epoch uint64) (uint64,

// Clickhouse port: Done
func (bigtable *Bigtable) GetValidatorBalanceHistory(validators []uint64, startEpoch uint64, endEpoch uint64) (map[uint64][]*types.ValidatorBalance, error) {
endEpochTs := utils.EpochToTime(endEpoch)
if utils.Config.ClickHouseEnabled && time.Since(endEpochTs) > utils.Config.ClickhouseDelay { // fetch data from clickhouse instead
logger.Infof("fetching validator balance history from clickhouse for validators %v, epochs %v - %v", validators, startEpoch, endEpoch)
return bigtable.getValidatorBalanceHistoryClickhouse(validators, startEpoch, endEpoch)
} else if endEpoch < bigtable.v2SchemaCutOffEpoch {
// Disable balance fetching from clickhouse until we have a compatible data set available
// endEpochTs := utils.EpochToTime(endEpoch)
// if utils.Config.ClickHouseEnabled && time.Since(endEpochTs) > utils.Config.ClickhouseDelay { // fetch data from clickhouse instead
// logger.Infof("fetching validator balance history from clickhouse for validators %v, epochs %v - %v", validators, startEpoch, endEpoch)
// return bigtable.getValidatorBalanceHistoryClickhouse(validators, startEpoch, endEpoch)
// } else
if endEpoch < bigtable.v2SchemaCutOffEpoch {
return bigtable.getValidatorBalanceHistoryV1(validators, startEpoch, endEpoch)
} else {
return bigtable.getValidatorBalanceHistoryV2(validators, startEpoch, endEpoch)
}
}

//lint:ignore U1000 will be used later on
func (bigtable *Bigtable) getValidatorBalanceHistoryClickhouse(validators []uint64, startEpoch uint64, endEpoch uint64) (map[uint64][]*types.ValidatorBalance, error) {
startEpochTs := utils.EpochToTime(startEpoch)
endEpochTs := utils.EpochToTime(endEpoch)
Expand Down Expand Up @@ -905,7 +909,7 @@ func (bigtable *Bigtable) getValidatorBalanceHistoryV2(validators []uint64, star
return true
}

err := bigtable.tableValidatorsHistory.ReadRows(gCtx, ranges, handleRow, ro)
err := bigtable.tableValidatorsHistory.ReadRows(gCtx, ranges, handleRow, gcp_bigtable.RowFilter(gcp_bigtable.LatestNFilter(1)), ro)
if err != nil {
return err
}
Expand Down Expand Up @@ -1134,7 +1138,6 @@ func (bigtable *Bigtable) getValidatorAttestationHistoryV2(validators []uint64,
attestationsMap := make(map[types.ValidatorIndex]map[types.Slot][]*types.ValidatorAttestation)

for i := 0; i < len(validators); i += batchSize {

upperBound := i + batchSize
if len(validators) < upperBound {
upperBound = len(validators)
Expand Down Expand Up @@ -1165,14 +1168,8 @@ func (bigtable *Bigtable) getValidatorAttestationHistoryV2(validators []uint64,
logger.Errorf("error parsing slot from row key %v: %v", r.Key(), err)
return false
}
inclusionSlot := MAX_CL_BLOCK_NUMBER - uint64(ri.Timestamp)/1000

status := uint64(1)
if inclusionSlot == MAX_CL_BLOCK_NUMBER {
inclusionSlot = 0
status = 0
}

status, inclusionSlot := bigtable.getAttestationStatusAndInclusionSlot(ri.Timestamp, attesterSlot)
resMux.Lock()
if attestationsMap[types.ValidatorIndex(validator)] == nil {
attestationsMap[types.ValidatorIndex(validator)] = make(map[types.Slot][]*types.ValidatorAttestation)
Expand All @@ -1187,7 +1184,6 @@ func (bigtable *Bigtable) getValidatorAttestationHistoryV2(validators []uint64,
Status: status,
})
resMux.Unlock()

}
return true
}, filter)
Expand Down Expand Up @@ -1235,6 +1231,9 @@ func (bigtable *Bigtable) getValidatorAttestationHistoryV2(validators []uint64,
res[uint64(validator)] = make([]*types.ValidatorAttestation, 0)
}
for attesterSlot, att := range attestations {
sort.Slice(att, func(i, j int) bool {
return att[i].InclusionSlot < att[j].InclusionSlot
})
currentAttInfo := att[0]
for _, attInfo := range att {
if orphanedSlotsMap[attInfo.InclusionSlot] {
Expand Down Expand Up @@ -1601,7 +1600,6 @@ func (bigtable *Bigtable) getValidatorMissedAttestationHistoryV2(validators []ui
g.SetLimit(concurrency)

for i := 0; i < len(validators); i += batchSize {

upperBound := i + batchSize
if len(validators) < upperBound {
upperBound = len(validators)
Expand All @@ -1618,7 +1616,7 @@ func (bigtable *Bigtable) getValidatorMissedAttestationHistoryV2(validators []ui

filter := gcp_bigtable.LimitRows(int64(endEpoch-startEpoch+1) * int64(len(vals))) // max is one row per epoch

err = bigtable.tableValidatorsHistory.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool {
err := bigtable.tableValidatorsHistory.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool {
keySplit := strings.Split(r.Key(), ":")

validator, err := bigtable.validatorKeyToIndex(keySplit[1])
Expand All @@ -1635,12 +1633,7 @@ func (bigtable *Bigtable) getValidatorMissedAttestationHistoryV2(validators []ui
return false
}

inclusionSlot := MAX_CL_BLOCK_NUMBER - uint64(ri.Timestamp)/1000

status := uint64(1)
if inclusionSlot == MAX_CL_BLOCK_NUMBER {
status = 0
}
status, inclusionSlot := bigtable.getAttestationStatusAndInclusionSlot(ri.Timestamp, attesterSlot)

resMux.Lock()
// only if the attestation was not included in another slot we count it as missed
Expand Down Expand Up @@ -1674,6 +1667,27 @@ func (bigtable *Bigtable) getValidatorMissedAttestationHistoryV2(validators []ui
return res, nil
}

func (bigtable *Bigtable) getAttestationStatusAndInclusionSlot(ts gcp_bigtable.Timestamp, attesterSlot uint64) (status uint64, inclusionSlot uint64) {
if ts.Time().Before(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) {
inclusionSlot = MAX_CL_BLOCK_NUMBER - uint64(ts)/1000
status = uint64(1)
if inclusionSlot == MAX_CL_BLOCK_NUMBER {
inclusionSlot = 0
status = 0
}
return status, inclusionSlot
} else {
inclusionSlotTs := ts.Time().Unix()
inclusionSlot = utils.TimeToSlot(uint64(inclusionSlotTs))
if inclusionSlot == attesterSlot {
status = 0
} else {
status = 1
}
return status, inclusionSlot
}
}

func (bigtable *Bigtable) getValidatorMissedAttestationHistoryV1(validators []uint64, startEpoch uint64, endEpoch uint64) (map[uint64]map[uint64]bool, error) {
valLen := len(validators)

Expand Down Expand Up @@ -1874,7 +1888,6 @@ func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV2(validators []uint64, s
g.SetLimit(concurrency)

for i := 0; i < len(validators); i += batchSize {

i := i
upperBound := i + batchSize
if len(validators) < upperBound {
Expand Down Expand Up @@ -1906,14 +1919,7 @@ func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV2(validators []uint64, s
slot = MAX_CL_BLOCK_NUMBER - slot

for _, ri := range r[SYNC_COMMITTEES_FAMILY] {

inclusionSlot := MAX_CL_BLOCK_NUMBER - uint64(ri.Timestamp)/1000

status := uint64(1) // 1: participated
if inclusionSlot == MAX_CL_BLOCK_NUMBER {
inclusionSlot = 0
status = 0 // 0: missed
}
status := bigtable.getSyncStatus(ri.Timestamp, slot)

resMux.Lock()
if res[validator] == nil {
Expand All @@ -1929,11 +1935,9 @@ func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV2(validators []uint64, s
}
}
resMux.Unlock()

}
return true
}, gcp_bigtable.RowFilter(filter))

return err
})
}
Expand All @@ -1945,6 +1949,33 @@ func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV2(validators []uint64, s
return res, nil
}

func (bigtable *Bigtable) getSyncStatus(ts gcp_bigtable.Timestamp, slot uint64) uint64 {
status := uint64(0)
if ts.Time().Before(time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)) {
// for old schema data read the inclusion slot directly from the timestamp field
inclusionSlot := MAX_CL_BLOCK_NUMBER - uint64(ts)/1000

status = uint64(1) // 1: participated
if inclusionSlot == MAX_CL_BLOCK_NUMBER {
status = 0 // 0: missed
}
return status
} else {
// for new schema data calculate the inclusion slot based on the slot timestamp
slotTs := utils.SlotToTime(slot)
inclusionSlotTs := ts.Time()

if slotTs.Equal(inclusionSlotTs) {
status = 0 // 0: missed
} else if inclusionSlotTs.Equal(slotTs.Add(time.Second)) { // participated is encoded as slot timestamp + 1 sec
status = 1 // 1: participated
} else {
logger.Errorf("unexpected inclusion slot timestamp, slot %d, slotTs: %v, inclusionSlotTs: %v", slot, slotTs, inclusionSlotTs)
}
return status
}
}

func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV1(validators []uint64, startSlot uint64, endSlot uint64) (map[uint64]map[uint64]*types.ValidatorSyncParticipation, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute*5))
defer cancel()
Expand Down Expand Up @@ -1978,7 +2009,6 @@ func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV1(validators []uint64, s
}

err := bigtable.tableBeaconchain.ReadRows(ctx, ranges, func(r gcp_bigtable.Row) bool {

for _, ri := range r[SYNC_COMMITTEES_FAMILY] {
keySplit := strings.Split(r.Key(), ":")

Expand Down Expand Up @@ -2014,7 +2044,6 @@ func (bigtable *Bigtable) getValidatorSyncDutiesHistoryV1(validators []uint64, s
Status: status,
}
}

}
return true
}, gcp_bigtable.RowFilter(filter))
Expand Down Expand Up @@ -2064,7 +2093,6 @@ func (bigtable *Bigtable) GetValidatorMissedAttestationsCount(validators []uint6
}

func (bigtable *Bigtable) GetValidatorSyncDutiesStatistics(validators []uint64, startEpoch uint64, endEpoch uint64) (map[uint64]*types.ValidatorSyncDutiesStatistic, error) {

data, err := bigtable.GetValidatorSyncDutiesHistory(validators, startEpoch*utils.Config.Chain.ClConfig.SlotsPerEpoch, ((endEpoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch)-1)

if err != nil {
Expand Down Expand Up @@ -2163,7 +2191,6 @@ func (bigtable *Bigtable) GetValidatorEffectiveness(validators []uint64, epoch u
}

func (bigtable *Bigtable) GetValidatorBalanceStatistics(validators []uint64, startEpoch, endEpoch uint64) (map[uint64]*types.ValidatorBalanceStatistic, error) {

tmr := time.AfterFunc(REPORT_TIMEOUT, func() {
logger.WithFields(logrus.Fields{
"validatorsCount": len(validators),
Expand All @@ -2184,7 +2211,6 @@ func (bigtable *Bigtable) GetValidatorBalanceStatistics(validators []uint64, sta
batchSize := 10000
// g.SetLimit(1)
for i := 0; i < len(validators); i += batchSize {

upperBound := i + batchSize
if len(validators) < upperBound {
upperBound = len(validators)
Expand Down Expand Up @@ -2248,7 +2274,6 @@ func (bigtable *Bigtable) GetValidatorBalanceStatistics(validators []uint64, sta
}

func (bigtable *Bigtable) SaveValidatorIncomeDetails(epoch uint64, rewards map[uint64]*itypes.ValidatorEpochIncome) error {

start := time.Now()
ts := gcp_bigtable.Time(utils.EpochToTime(epoch))

Expand All @@ -2257,7 +2282,6 @@ func (bigtable *Bigtable) SaveValidatorIncomeDetails(epoch uint64, rewards map[u
muts := types.NewBulkMutations(len(rewards))

for i, rewardDetails := range rewards {

data, err := proto.Marshal(rewardDetails)

if err != nil {
Expand Down Expand Up @@ -2742,21 +2766,6 @@ func (bigtable *Bigtable) DeleteEpoch(epoch uint64) error {
return fmt.Errorf("NOT IMPLEMENTED")
}

func (bigtable *Bigtable) PurgeV2Data(validatorIndex, thresholdEpoch uint64) error {
// create the row ranges for each column family
vals := []uint64{validatorIndex}
balanceRange := bigtable.getValidatorsEpochRanges(vals, VALIDATOR_BALANCES_FAMILY, 0, thresholdEpoch)
err := bigtable.ClearByRowRange("beaconchain_validators", "*", "*", balanceRange, true)
if err != nil {
return err
}
// attestationHistoryRange := bigtable.getValidatorsEpochRanges(vals, ATTESTATIONS_FAMILY, 0, thresholdEpoch)
// syncDutiesHistoryRange := bigtable.getValidatorSlotRanges(vals, SYNC_COMMITTEES_FAMILY, 0, thresholdEpoch*utils.Config.Chain.ClConfig.SlotsPerEpoch)
// incomeRange := bigtable.getValidatorsEpochRanges(vals, INCOME_DETAILS_COLUMN_FAMILY, 0, thresholdEpoch)

return nil
}

func (bigtable *Bigtable) getValidatorsEpochRanges(validatorIndices []uint64, prefix string, startEpoch uint64, endEpoch uint64) gcp_bigtable.RowRangeList {
if endEpoch > math.MaxInt64 {
endEpoch = 0
Expand Down Expand Up @@ -2812,7 +2821,6 @@ func (bigtable *Bigtable) getValidatorSlotRanges(validatorIndices []uint64, pref
rangeEnd := fmt.Sprintf("%s:%s:%s:%s:%s%s", bigtable.chainId, validatorKey, prefix, bigtable.reversedPaddedEpoch(startEpoch), bigtable.reversedPaddedSlot(startSlot), "\x00")
rangeStart := fmt.Sprintf("%s:%s:%s:%s:%s", bigtable.chainId, validatorKey, prefix, bigtable.reversedPaddedEpoch(endEpoch), bigtable.reversedPaddedSlot(endSlot))
ranges = append(ranges, gcp_bigtable.NewRange(rangeStart, rangeEnd))

}
return ranges
}
Expand Down
Loading
Loading