Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: decompose AttesterStatus #6945

Merged
merged 12 commits into from
Jul 16, 2024
14 changes: 7 additions & 7 deletions packages/beacon-node/src/chain/rewards/attestationsRewards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,35 +140,35 @@ function computeTotalAttestationsRewardsAltair(
validatorIds: (ValidatorIndex | string)[] = []
): TotalAttestationsReward[] {
const rewards = [];
const {statuses} = transitionCache;
const {flags} = transitionCache;
const {epochCtx, config} = state;
const validatorIndices = validatorIds
.map((id) => (typeof id === "number" ? id : epochCtx.pubkey2index.get(id)))
.filter((index) => index !== undefined); // Validator indices to include in the result

const inactivityPenaltyDenominator = config.INACTIVITY_SCORE_BIAS * INACTIVITY_PENALTY_QUOTIENT_ALTAIR;

for (let i = 0; i < statuses.length; i++) {
for (let i = 0; i < flags.length; i++) {
if (validatorIndices.length && !validatorIndices.includes(i)) {
continue;
}

const status = statuses[i];
if (!hasMarkers(status.flags, FLAG_ELIGIBLE_ATTESTER)) {
const flag = flags[i];
if (!hasMarkers(flag, FLAG_ELIGIBLE_ATTESTER)) {
continue;
}

const effectiveBalanceIncrement = epochCtx.effectiveBalanceIncrements[i];

const currentRewards = {...defaultAttestationsReward, validatorIndex: i};

if (hasMarkers(status.flags, FLAG_PREV_SOURCE_ATTESTER_UNSLASHED)) {
if (hasMarkers(flag, FLAG_PREV_SOURCE_ATTESTER_UNSLASHED)) {
currentRewards.source = idealRewards[effectiveBalanceIncrement].source;
} else {
currentRewards.source = penalties[effectiveBalanceIncrement].source * -1; // Negative reward to indicate penalty
}

if (hasMarkers(status.flags, FLAG_PREV_TARGET_ATTESTER_UNSLASHED)) {
if (hasMarkers(flag, FLAG_PREV_TARGET_ATTESTER_UNSLASHED)) {
currentRewards.target = idealRewards[effectiveBalanceIncrement].target;
} else {
currentRewards.target = penalties[effectiveBalanceIncrement].target * -1;
Expand All @@ -179,7 +179,7 @@ function computeTotalAttestationsRewardsAltair(
currentRewards.inactivity = Math.floor(inactivityPenaltyNumerator / inactivityPenaltyDenominator) * -1;
}

if (hasMarkers(status.flags, FLAG_PREV_HEAD_ATTESTER_UNSLASHED)) {
if (hasMarkers(flag, FLAG_PREV_HEAD_ATTESTER_UNSLASHED)) {
currentRewards.head = idealRewards[effectiveBalanceIncrement].head;
}

Expand Down
28 changes: 14 additions & 14 deletions packages/beacon-node/src/metrics/validatorMonitor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import {
computeEpochAtSlot,
AttesterStatus,
parseAttesterFlags,
CachedBeaconStateAllForks,
CachedBeaconStateAltair,
Expand Down Expand Up @@ -39,7 +38,13 @@ export enum OpSource {
export type ValidatorMonitor = {
registerLocalValidator(index: number): void;
registerLocalValidatorInSyncCommittee(index: number, untilEpoch: Epoch): void;
registerValidatorStatuses(currentEpoch: Epoch, statuses: AttesterStatus[], balances?: number[]): void;
registerValidatorStatuses(
currentEpoch: Epoch,
inclusionDelays: number[],
flags: number[],
isActiveCurrEpoch: boolean[],
balances?: number[]
): void;
registerBeaconBlock(src: OpSource, seenTimestampSec: Seconds, block: BeaconBlock): void;
registerBlobSidecar(src: OpSource, seenTimestampSec: Seconds, blob: deneb.BlobSidecar): void;
registerImportedBlock(block: BeaconBlock, data: {proposerBalanceDelta: number}): void;
Expand Down Expand Up @@ -115,12 +120,12 @@ type ValidatorStatus = {
inclusionDistance: number;
};

function statusToSummary(status: AttesterStatus): ValidatorStatus {
const flags = parseAttesterFlags(status.flags);
function statusToSummary(inclusionDelay: number, flag: number, isActiveInCurrentEpoch: boolean): ValidatorStatus {
const flags = parseAttesterFlags(flag);
return {
isSlashed: flags.unslashed,
isActiveInCurrentEpoch: status.active,
isActiveInPreviousEpoch: status.active,
isActiveInCurrentEpoch,
isActiveInPreviousEpoch: isActiveInCurrentEpoch,
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Implement
currentEpochEffectiveBalance: 0,

Expand All @@ -130,7 +135,7 @@ function statusToSummary(status: AttesterStatus): ValidatorStatus {
isCurrSourceAttester: flags.currSourceAttester,
isCurrTargetAttester: flags.currTargetAttester,
isCurrHeadAttester: flags.currHeadAttester,
inclusionDistance: status.inclusionDelay,
inclusionDistance: inclusionDelay,
};
}

Expand Down Expand Up @@ -287,7 +292,7 @@ export function createValidatorMonitor(
}
},

registerValidatorStatuses(currentEpoch, statuses, balances) {
registerValidatorStatuses(currentEpoch, inclusionDelays, flags, isActiveCurrEpoch, balances) {
// Prevent registering status for the same epoch twice. processEpoch() may be ran more than once for the same epoch.
if (currentEpoch <= lastRegisteredStatusEpoch) {
return;
Expand All @@ -301,12 +306,7 @@ export function createValidatorMonitor(
// - One to account for it being the previous epoch.
// - One to account for the state advancing an epoch whilst generating the validator
// statuses.
const status = statuses[index];
if (status === undefined) {
continue;
}

const summary = statusToSummary(status);
const summary = statusToSummary(inclusionDelays[index], flags[index], isActiveCurrEpoch[index]);

if (summary.isPrevSourceAttester) {
metrics.validatorMonitor.prevEpochOnChainSourceAttesterHit.inc();
Expand Down
107 changes: 78 additions & 29 deletions packages/state-transition/src/cache/epochTransitionCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import {intDiv} from "@lodestar/utils";
import {EPOCHS_PER_SLASHINGS_VECTOR, FAR_FUTURE_EPOCH, ForkSeq, MAX_EFFECTIVE_BALANCE} from "@lodestar/params";

import {
AttesterStatus,
createAttesterStatus,
hasMarkers,
FLAG_UNSLASHED,
FLAG_ELIGIBLE_ATTESTER,
Expand Down Expand Up @@ -128,7 +126,12 @@ export interface EpochTransitionCache {
* - prev attester flag set
* With a status flag to check this conditions at once we just have to mask with an OR of the conditions.
*/
statuses: AttesterStatus[];

proposerIndices: number[];

inclusionDelays: number[];

flags: number[];

/**
* balances array will be populated by processRewardsAndPenalties() and consumed by processEffectiveBalanceUpdates().
Expand Down Expand Up @@ -161,13 +164,34 @@ export interface EpochTransitionCache {
*/
nextEpochTotalActiveBalanceByIncrement: number;

/**
* Track by validator index if it's active in the current epoch.
* Used in metrics
*/
isActiveCurrEpoch: boolean[];

/**
* Track by validator index if it's active in the next epoch.
* Used in `processEffectiveBalanceUpdates` to save one loop over validators after epoch process.
*/
isActiveNextEpoch: boolean[];
}

// reuse arrays to avoid memory reallocation and gc
// WARNING: this is not async safe
/** WARNING: reused, never gc'd */
const isActivePrevEpoch = new Array<boolean>();
/** WARNING: reused, never gc'd */
const isActiveCurrEpoch = new Array<boolean>();
/** WARNING: reused, never gc'd */
const isActiveNextEpoch = new Array<boolean>();
/** WARNING: reused, never gc'd */
const proposerIndices = new Array<number>();
/** WARNING: reused, never gc'd */
const inclusionDelays = new Array<number>();
/** WARNING: reused, never gc'd */
const flags = new Array<number>();

export function beforeProcessEpoch(
state: CachedBeaconStateAllForks,
opts?: EpochTransitionCacheOpts
Expand All @@ -188,9 +212,6 @@ export function beforeProcessEpoch(
const indicesEligibleForActivation: ValidatorIndex[] = [];
const indicesToEject: ValidatorIndex[] = [];
const nextEpochShufflingActiveValidatorIndices: ValidatorIndex[] = [];
const isActivePrevEpoch: boolean[] = [];
const isActiveNextEpoch: boolean[] = [];
const statuses: AttesterStatus[] = [];

let totalActiveStakeByIncrement = 0;

Expand All @@ -200,21 +221,41 @@ export function beforeProcessEpoch(
const validators = state.validators.getAllReadonlyValues();
const validatorCount = validators.length;

// pre-fill with true (most validators are active)
isActivePrevEpoch.length = validatorCount;
isActiveCurrEpoch.length = validatorCount;
isActiveNextEpoch.length = validatorCount;
isActivePrevEpoch.fill(true);
isActiveCurrEpoch.fill(true);
isActiveNextEpoch.fill(true);

// During the epoch transition, additional data is precomputed to avoid traversing any state a second
// time. Attestations are a big part of this, and each validator has a "status" to represent its
// precomputed participation.
// - proposerIndex: number; // -1 when not included by any proposer
// - inclusionDelay: number;
// - flags: number; // bitfield of AttesterFlags
proposerIndices.length = validatorCount;
inclusionDelays.length = validatorCount;
flags.length = validatorCount;
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
proposerIndices.fill(-1);
inclusionDelays.fill(0);
flags.fill(0);

// Clone before being mutated in processEffectiveBalanceUpdates
epochCtx.beforeEpochTransition();

const effectiveBalancesByIncrements = epochCtx.effectiveBalanceIncrements;

for (let i = 0; i < validatorCount; i++) {
const validator = validators[i];
const status = createAttesterStatus();

if (validator.slashed) {
if (slashingsEpoch === validator.withdrawableEpoch) {
indicesToSlash.push(i);
}
} else {
status.flags |= FLAG_UNSLASHED;
flags[i] |= FLAG_UNSLASHED;
}

const {activationEpoch, exitEpoch} = validator;
Expand All @@ -223,19 +264,22 @@ export function beforeProcessEpoch(
const isActiveNext = activationEpoch <= nextEpoch && nextEpoch < exitEpoch;
const isActiveNext2 = activationEpoch <= nextEpoch2 && nextEpoch2 < exitEpoch;

isActivePrevEpoch.push(isActivePrev);
if (!isActivePrev) {
isActivePrevEpoch[i] = false;
}

// Both active validators and slashed-but-not-yet-withdrawn validators are eligible to receive penalties.
// This is done to prevent self-slashing from being a way to escape inactivity leaks.
// TODO: Consider using an array of `eligibleValidatorIndices: number[]`
if (isActivePrev || (validator.slashed && prevEpoch + 1 < validator.withdrawableEpoch)) {
eligibleValidatorIndices.push(i);
status.flags |= FLAG_ELIGIBLE_ATTESTER;
flags[i] |= FLAG_ELIGIBLE_ATTESTER;
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
}

if (isActiveCurr) {
status.active = true;
totalActiveStakeByIncrement += effectiveBalancesByIncrements[i];
} else {
isActiveCurrEpoch[i] = false;
}

// To optimize process_registry_updates():
Expand Down Expand Up @@ -278,16 +322,16 @@ export function beforeProcessEpoch(
//
// Use `else` since indicesEligibleForActivationQueue + indicesEligibleForActivation + indicesToEject are mutually exclusive
else if (
status.active &&
isActiveCurr &&
validator.exitEpoch === FAR_FUTURE_EPOCH &&
validator.effectiveBalance <= config.EJECTION_BALANCE
) {
indicesToEject.push(i);
}

statuses.push(status);

isActiveNextEpoch.push(isActiveNext);
if (!isActiveNext) {
isActiveNextEpoch[i] = false;
}

if (isActiveNext2) {
nextEpochShufflingActiveValidatorIndices.push(i);
Expand All @@ -312,7 +356,9 @@ export function beforeProcessEpoch(
if (forkSeq === ForkSeq.phase0) {
processPendingAttestations(
state as CachedBeaconStatePhase0,
statuses,
proposerIndices,
inclusionDelays,
flags,
(state as CachedBeaconStatePhase0).previousEpochAttestations.getAllReadonly(),
prevEpoch,
FLAG_PREV_SOURCE_ATTESTER,
Expand All @@ -321,7 +367,9 @@ export function beforeProcessEpoch(
);
processPendingAttestations(
state as CachedBeaconStatePhase0,
statuses,
proposerIndices,
inclusionDelays,
flags,
(state as CachedBeaconStatePhase0).currentEpochAttestations.getAllReadonly(),
currentEpoch,
FLAG_CURR_SOURCE_ATTESTER,
Expand All @@ -331,21 +379,19 @@ export function beforeProcessEpoch(
} else {
const previousEpochParticipation = (state as CachedBeaconStateAltair).previousEpochParticipation.getAll();
for (let i = 0; i < previousEpochParticipation.length; i++) {
const status = statuses[i];
// this is required to pass random spec tests in altair
if (isActivePrevEpoch[i]) {
// FLAG_PREV are indexes [0,1,2]
status.flags |= previousEpochParticipation[i];
flags[i] |= previousEpochParticipation[i];
}
}

const currentEpochParticipation = (state as CachedBeaconStateAltair).currentEpochParticipation.getAll();
for (let i = 0; i < currentEpochParticipation.length; i++) {
const status = statuses[i];
// this is required to pass random spec tests in altair
if (status.active) {
if (isActiveCurrEpoch[i]) {
// FLAG_PREV are indexes [3,4,5], so shift by 3
status.flags |= currentEpochParticipation[i] << 3;
flags[i] |= currentEpochParticipation[i] << 3;
}
}
}
Expand All @@ -361,19 +407,19 @@ export function beforeProcessEpoch(
const FLAG_PREV_HEAD_ATTESTER_UNSLASHED = FLAG_PREV_HEAD_ATTESTER | FLAG_UNSLASHED;
const FLAG_CURR_TARGET_UNSLASHED = FLAG_CURR_TARGET_ATTESTER | FLAG_UNSLASHED;

for (let i = 0; i < statuses.length; i++) {
const status = statuses[i];
for (let i = 0; i < validatorCount; i++) {
const effectiveBalanceByIncrement = effectiveBalancesByIncrements[i];
if (hasMarkers(status.flags, FLAG_PREV_SOURCE_ATTESTER_UNSLASHED)) {
const flag = flags[i];
if (hasMarkers(flag, FLAG_PREV_SOURCE_ATTESTER_UNSLASHED)) {
prevSourceUnslStake += effectiveBalanceByIncrement;
}
if (hasMarkers(status.flags, FLAG_PREV_TARGET_ATTESTER_UNSLASHED)) {
if (hasMarkers(flag, FLAG_PREV_TARGET_ATTESTER_UNSLASHED)) {
prevTargetUnslStake += effectiveBalanceByIncrement;
}
if (hasMarkers(status.flags, FLAG_PREV_HEAD_ATTESTER_UNSLASHED)) {
if (hasMarkers(flag, FLAG_PREV_HEAD_ATTESTER_UNSLASHED)) {
prevHeadUnslStake += effectiveBalanceByIncrement;
}
if (hasMarkers(status.flags, FLAG_CURR_TARGET_UNSLASHED)) {
if (hasMarkers(flag, FLAG_CURR_TARGET_UNSLASHED)) {
currTargetUnslStake += effectiveBalanceByIncrement;
}
}
Expand Down Expand Up @@ -421,8 +467,11 @@ export function beforeProcessEpoch(
nextEpochShufflingActiveValidatorIndices,
// to be updated in processEffectiveBalanceUpdates
nextEpochTotalActiveBalanceByIncrement: 0,
isActiveCurrEpoch,
isActiveNextEpoch,
statuses,
proposerIndices,
inclusionDelays,
flags,

// Will be assigned in processRewardsAndPenalties()
balances: undefined,
Expand Down
Loading
Loading