From 764bcc8d0b7e26dabbdc4b6a8d84c9f9f09543a4 Mon Sep 17 00:00:00 2001 From: twoeths Date: Thu, 16 May 2024 21:09:28 +0300 Subject: [PATCH] fix: batch validation for electra attestations (#6788) --- .../src/api/impl/beacon/pool/index.ts | 6 +- .../src/chain/opPools/attestationPool.ts | 29 ++-- .../chain/seenCache/seenAttestationData.ts | 24 ++-- .../src/chain/validation/aggregateAndProof.ts | 2 +- .../src/chain/validation/attestation.ts | 135 +++++++++++------- .../src/network/processor/gossipHandlers.ts | 12 +- .../network/processor/gossipQueues/index.ts | 8 +- .../network/processor/gossipQueues/indexed.ts | 1 + packages/beacon-node/src/util/sszBytes.ts | 32 ++++- .../perf/chain/validation/attestation.test.ts | 6 +- .../attestation/validateAttestation.test.ts | 26 ++-- .../test/unit/util/sszBytes.test.ts | 6 +- .../validator/src/services/attestation.ts | 4 +- .../validator/src/services/validatorStore.ts | 2 +- 14 files changed, 181 insertions(+), 112 deletions(-) diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 50936fe407b..01ebee30fc7 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -1,5 +1,5 @@ import {routes, ServerApi} from "@lodestar/api"; -import {Epoch, ssz} from "@lodestar/types"; +import {CommitteeIndex, Epoch, isElectraAttestation, ssz} from "@lodestar/types"; import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params"; import {validateApiAttestation} from "../../../../chain/validation/index.js"; import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js"; @@ -64,7 +64,7 @@ export function getBeaconPoolApi({ // when a validator is configured with multiple beacon node urls, this attestation data may come from another beacon node // and the block hasn't been in our forkchoice since we haven't seen / processing that block // see https://github.com/ChainSafe/lodestar/issues/5098 - const {indexedAttestation, subnet, attDataRootHex} = await validateGossipFnRetryUnknownRoot( + const {indexedAttestation, subnet, attDataRootHex, committeeIndex} = await validateGossipFnRetryUnknownRoot( validateFn, network, chain, @@ -73,7 +73,7 @@ export function getBeaconPoolApi({ ); if (network.shouldAggregate(subnet, slot)) { - const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); + const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex); metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); } diff --git a/packages/beacon-node/src/chain/opPools/attestationPool.ts b/packages/beacon-node/src/chain/opPools/attestationPool.ts index 7d6cca30056..b647f284349 100644 --- a/packages/beacon-node/src/chain/opPools/attestationPool.ts +++ b/packages/beacon-node/src/chain/opPools/attestationPool.ts @@ -62,9 +62,10 @@ type CommitteeIndex = number; * receives and it can be triggered manually. */ export class AttestationPool { - private readonly attestationByRootBySlot = new MapDef>>( - () => new Map>() - ); + private readonly aggregateByIndexByRootBySlot = new MapDef< + Slot, + Map> + >(() => new Map>()); private lowestPermissibleSlot = 0; constructor( @@ -76,8 +77,10 @@ export class AttestationPool { /** Returns current count of pre-aggregated attestations with unique data */ getAttestationCount(): number { let attestationCount = 0; - for (const attestationByRoot of this.attestationByRootBySlot.values()) { - attestationCount += attestationByRoot.size; + for (const attestationByIndexByRoot of this.aggregateByIndexByRootBySlot.values()) { + for (const attestationByIndex of attestationByIndexByRoot.values()) { + attestationCount += attestationByIndex.size; + } } return attestationCount; } @@ -99,7 +102,7 @@ export class AttestationPool { * - Valid committeeIndex * - Valid data */ - add(attestation: allForks.Attestation, attDataRootHex: RootHex): InsertOutcome { + add(committeeIndex: CommitteeIndex, attestation: allForks.Attestation, attDataRootHex: RootHex): InsertOutcome { const slot = attestation.data.slot; const lowestPermissibleSlot = this.lowestPermissibleSlot; @@ -114,15 +117,11 @@ export class AttestationPool { } // Limit object per slot - const aggregateByRoot = this.attestationByRootBySlot.getOrDefault(slot); + const aggregateByRoot = this.aggregateByIndexByRootBySlot.getOrDefault(slot); if (aggregateByRoot.size >= MAX_ATTESTATIONS_PER_SLOT) { throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT}); } - const committeeIndex = isElectraAttestation(attestation) - ? // this attestation is added to pool after validation - attestation.committeeBits.getSingleTrueBit() - : attestation.data.index; // this should not happen because attestation should be validated before reaching this assert.notNull(committeeIndex, "Committee index should not be null in attestation pool"); @@ -147,7 +146,7 @@ export class AttestationPool { * For validator API to get an aggregate */ getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): allForks.Attestation | null { - const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex); + const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex); if (!aggregate) { // TODO: Add metric for missing aggregates return null; @@ -161,7 +160,7 @@ export class AttestationPool { * By default, not interested in attestations in old slots, we only preaggregate attestations for the current slot. */ prune(clockSlot: Slot): void { - pruneBySlot(this.attestationByRootBySlot, clockSlot, SLOTS_RETAINED); + pruneBySlot(this.aggregateByIndexByRootBySlot, clockSlot, SLOTS_RETAINED); // by default preaggregateSlotDistance is 0, i.e only accept attestations in the same clock slot. this.lowestPermissibleSlot = Math.max(clockSlot - this.preaggregateSlotDistance, 0); } @@ -175,8 +174,8 @@ export class AttestationPool { const aggregateByRoots = bySlot === undefined - ? Array.from(this.attestationByRootBySlot.values()) - : [this.attestationByRootBySlot.get(bySlot)]; + ? Array.from(this.aggregateByIndexByRootBySlot.values()) + : [this.aggregateByIndexByRootBySlot.get(bySlot)]; for (const aggregateByRoot of aggregateByRoots) { if (aggregateByRoot) { diff --git a/packages/beacon-node/src/chain/seenCache/seenAttestationData.ts b/packages/beacon-node/src/chain/seenCache/seenAttestationData.ts index 9312f3b517a..17343e386fd 100644 --- a/packages/beacon-node/src/chain/seenCache/seenAttestationData.ts +++ b/packages/beacon-node/src/chain/seenCache/seenAttestationData.ts @@ -1,12 +1,16 @@ -import {phase0, RootHex, Slot} from "@lodestar/types"; +import {BitArray} from "@chainsafe/ssz"; +import {CommitteeIndex, phase0, RootHex, Slot} from "@lodestar/types"; import {MapDef} from "@lodestar/utils"; import {Metrics} from "../../metrics/metrics.js"; -import {AttDataBase64} from "../../util/sszBytes.js"; +import {SeenAttDataKey} from "../../util/sszBytes.js"; import {InsertOutcome} from "../opPools/types.js"; export type AttestationDataCacheEntry = { // part of shuffling data, so this does not take memory - committeeIndices: Uint32Array; + committeeValidatorIndices: Uint32Array; + // undefined for phase0 Attestation + committeeBits?: BitArray; + committeeIndex: CommitteeIndex; // IndexedAttestationData signing root, 32 bytes signingRoot: Uint8Array; // to be consumed by forkchoice and oppool @@ -38,12 +42,14 @@ const DEFAULT_MAX_CACHE_SIZE_PER_SLOT = 200; const DEFAULT_CACHE_SLOT_DISTANCE = 2; /** + * Cached seen AttestationData to improve gossip validation. For Electra, this still take into account attestationIndex + * even through it is moved outside of AttestationData. * As of April 2023, validating gossip attestation takes ~12% of cpu time for a node subscribing to all subnets on mainnet. * Having this cache help saves a lot of cpu time since most of the gossip attestations are on the same slot. */ export class SeenAttestationDatas { - private cacheEntryByAttDataBase64BySlot = new MapDef>( - () => new Map() + private cacheEntryByAttDataBase64BySlot = new MapDef>( + () => new Map() ); private lowestPermissibleSlot = 0; @@ -57,14 +63,14 @@ export class SeenAttestationDatas { } // TODO: Move InsertOutcome type definition to a common place - add(slot: Slot, attDataBase64: AttDataBase64, cacheEntry: AttestationDataCacheEntry): InsertOutcome { + add(slot: Slot, attDataKey: SeenAttDataKey, cacheEntry: AttestationDataCacheEntry): InsertOutcome { if (slot < this.lowestPermissibleSlot) { this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old}); return InsertOutcome.Old; } const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.getOrDefault(slot); - if (cacheEntryByAttDataBase64.has(attDataBase64)) { + if (cacheEntryByAttDataBase64.has(attDataKey)) { this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known}); return InsertOutcome.AlreadyKnown; } @@ -74,11 +80,11 @@ export class SeenAttestationDatas { return InsertOutcome.ReachLimit; } - cacheEntryByAttDataBase64.set(attDataBase64, cacheEntry); + cacheEntryByAttDataBase64.set(attDataKey, cacheEntry); return InsertOutcome.NewData; } - get(slot: Slot, attDataBase64: AttDataBase64): AttestationDataCacheEntry | null { + get(slot: Slot, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null { const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.get(slot); const cacheEntry = cacheEntryByAttDataBase64?.get(attDataBase64); if (cacheEntry) { diff --git a/packages/beacon-node/src/chain/validation/aggregateAndProof.ts b/packages/beacon-node/src/chain/validation/aggregateAndProof.ts index a1d7135716c..91776fbdd55 100644 --- a/packages/beacon-node/src/chain/validation/aggregateAndProof.ts +++ b/packages/beacon-node/src/chain/validation/aggregateAndProof.ts @@ -168,7 +168,7 @@ async function validateAggregateAndProof( // [REJECT] The committee index is within the expected range // -- i.e. data.index < get_committee_count_per_slot(state, data.target.epoch) const committeeIndices = cachedAttData - ? cachedAttData.committeeIndices + ? cachedAttData.committeeValidatorIndices : getCommitteeIndices(shuffling, attSlot, attIndex); // [REJECT] The number of aggregation bits matches the committee size diff --git a/packages/beacon-node/src/chain/validation/attestation.ts b/packages/beacon-node/src/chain/validation/attestation.ts index 87e8c2a6b2b..0141eaafbb0 100644 --- a/packages/beacon-node/src/chain/validation/attestation.ts +++ b/packages/beacon-node/src/chain/validation/attestation.ts @@ -1,5 +1,16 @@ -import {toHexString} from "@chainsafe/ssz"; -import {phase0, Epoch, Root, Slot, RootHex, ssz, allForks, electra} from "@lodestar/types"; +import {BitArray, toHexString} from "@chainsafe/ssz"; +import { + phase0, + Epoch, + Root, + Slot, + RootHex, + ssz, + allForks, + electra, + isElectraAttestation, + CommitteeIndex, +} from "@lodestar/types"; import {ProtoBlock} from "@lodestar/fork-choice"; import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, ForkName, ForkSeq, DOMAIN_BEACON_ATTESTER} from "@lodestar/params"; import { @@ -17,10 +28,9 @@ import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/in import {MAXIMUM_GOSSIP_CLOCK_DISPARITY_SEC} from "../../constants/index.js"; import {RegenCaller} from "../regen/index.js"; import { - AttDataBase64, + SeenAttDataKey, getAggregationBitsFromAttestationSerialized, - getAttDataBase64FromAttestationSerialized, - getCommitteeBitsFromAttestationSerialized, + getSeenAttDataKey, getSignatureFromAttestationSerialized, } from "../../util/sszBytes.js"; import {AttestationDataCacheEntry} from "../seenCache/seenAttestationData.js"; @@ -39,12 +49,13 @@ export type AttestationValidationResult = { indexedAttestation: allForks.IndexedAttestation; subnet: number; attDataRootHex: RootHex; + committeeIndex: CommitteeIndex; }; export type AttestationOrBytes = ApiAttestation | GossipAttestation; /** attestation from api */ -export type ApiAttestation = {attestation: phase0.Attestation; serializedData: null}; +export type ApiAttestation = {attestation: allForks.Attestation; serializedData: null}; /** attestation from gossip */ export type GossipAttestation = { @@ -52,7 +63,9 @@ export type GossipAttestation = { serializedData: Uint8Array; // available in NetworkProcessor since we check for unknown block root attestations attSlot: Slot; - attDataBase64?: string | null; + // for old LIFO linear gossip queue we don't have attDataBase64 + // for indexed gossip queue we have attDataBase64 + seenAttestationKey?: SeenAttDataKey | null; }; export type Step0Result = AttestationValidationResult & { @@ -83,7 +96,7 @@ export async function validateGossipAttestation( export async function validateGossipAttestationsSameAttData( fork: ForkName, chain: IBeaconChain, - attestationOrBytesArr: AttestationOrBytes[], + attestationOrBytesArr: GossipAttestation[], subnet: number, // for unit test, consumers do not need to pass this step0ValidationFn = validateGossipAttestationNoSignatureCheck @@ -251,15 +264,16 @@ async function validateGossipAttestationNoSignatureCheck( let attestationOrCache: | {attestation: allForks.Attestation; cache: null} | {attestation: null; cache: AttestationDataCacheEntry; serializedData: Uint8Array}; - let attDataBase64: AttDataBase64 | null = null; + let attDataKey: SeenAttDataKey | null = null; if (attestationOrBytes.serializedData) { // gossip const attSlot = attestationOrBytes.attSlot; - // for old LIFO linear gossip queue we don't have attDataBase64 - // for indexed gossip queue we have attDataBase64 - attDataBase64 = - attestationOrBytes.attDataBase64 ?? getAttDataBase64FromAttestationSerialized(attestationOrBytes.serializedData); - const cachedAttData = attDataBase64 !== null ? chain.seenAttestationDatas.get(attSlot, attDataBase64) : null; + attDataKey = + // we always have seenAttestationKey from the IndexedGossipQueue, getSeenAttDataKey() just for backward + // compatible in case beaconAttestationBatchValidation is false + // TODO: remove beaconAttestationBatchValidation flag since the batch attestation is stable + attestationOrBytes.seenAttestationKey ?? getSeenAttDataKey(ForkSeq[fork], attestationOrBytes.serializedData); + const cachedAttData = attDataKey !== null ? chain.seenAttestationDatas.get(attSlot, attDataKey) : null; if (cachedAttData === null) { const attestation = sszDeserializeAttestation(fork, attestationOrBytes.serializedData); // only deserialize on the first AttestationData that's not cached @@ -269,7 +283,7 @@ async function validateGossipAttestationNoSignatureCheck( } } else { // api - attDataBase64 = null; + attDataKey = null; attestationOrCache = {attestation: attestationOrBytes.attestation, cache: null}; } @@ -280,29 +294,33 @@ async function validateGossipAttestationNoSignatureCheck( const attEpoch = computeEpochAtSlot(attSlot); const attTarget = attData.target; const targetEpoch = attTarget.epoch; + let committeeIndex; + if (attestationOrCache.attestation) { + if (isElectraAttestation(attestationOrCache.attestation)) { + // api or first time validation of a gossip attestation + const {committeeBits} = attestationOrCache.attestation; + // throw in both in case of undefined and null + if (committeeBits == null) { + throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SERIALIZED_BYTES}); + } - let attIndex; - if (ForkSeq[fork] >= ForkSeq.electra) { - const committeeBits = attestationOrCache.attestation - ? (attestationOrCache.attestation as electra.Attestation).committeeBits - : getCommitteeBitsFromAttestationSerialized(attestationOrCache.serializedData); - - if (committeeBits === null) { - throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SERIALIZED_BYTES}); - } - - attIndex = committeeBits.getSingleTrueBit(); - // [REJECT] len(committee_indices) == 1, where committee_indices = get_committee_indices(aggregate) - if (attIndex === null) { - throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NOT_EXACTLY_ONE_COMMITTEE_BIT_SET}); - } + committeeIndex = committeeBits.getSingleTrueBit(); + // [REJECT] len(committee_indices) == 1, where committee_indices = get_committee_indices(aggregate) + if (committeeIndex === null) { + throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NOT_EXACTLY_ONE_COMMITTEE_BIT_SET}); + } - // [REJECT] aggregate.data.index == 0 - if (attData.index !== 0) { - throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX}); + // [REJECT] aggregate.data.index == 0 + if (attData.index !== 0) { + throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.NON_ZERO_ATTESTATION_DATA_INDEX}); + } + } else { + // phase0 attestation + committeeIndex = attData.index; } } else { - attIndex = attData.index; + // found a seen AttestationData + committeeIndex = attestationOrCache.cache.committeeIndex; } chain.metrics?.gossipAttestation.attestationSlotToClockSlot.observe( @@ -343,11 +361,11 @@ async function validateGossipAttestationNoSignatureCheck( }); } - let committeeIndices: Uint32Array; + let committeeValidatorIndices: Uint32Array; let getSigningRoot: () => Uint8Array; let expectedSubnet: number; if (attestationOrCache.cache) { - committeeIndices = attestationOrCache.cache.committeeIndices; + committeeValidatorIndices = attestationOrCache.cache.committeeValidatorIndices; const signingRoot = attestationOrCache.cache.signingRoot; getSigningRoot = () => signingRoot; expectedSubnet = attestationOrCache.cache.subnet; @@ -389,17 +407,17 @@ async function validateGossipAttestationNoSignatureCheck( // [REJECT] The committee index is within the expected range // -- i.e. data.index < get_committee_count_per_slot(state, data.target.epoch) - committeeIndices = getCommitteeIndices(shuffling, attSlot, attIndex); + committeeValidatorIndices = getCommitteeIndices(shuffling, attSlot, committeeIndex); getSigningRoot = () => getAttestationDataSigningRoot(chain.config, attData); - expectedSubnet = computeSubnetForSlot(shuffling, attSlot, attIndex); + expectedSubnet = computeSubnetForSlot(shuffling, attSlot, committeeIndex); } - const validatorIndex = committeeIndices[bitIndex]; + const validatorIndex = committeeValidatorIndices[bitIndex]; // [REJECT] The number of aggregation bits matches the committee size // -- i.e. len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, data.index)). // > TODO: Is this necessary? Lighthouse does not do this check. - if (aggregationBits.bitLen !== committeeIndices.length) { + if (aggregationBits.bitLen !== committeeValidatorIndices.length) { throw new AttestationError(GossipAction.REJECT, { code: AttestationErrorCode.WRONG_NUMBER_OF_AGGREGATION_BITS, }); @@ -445,6 +463,7 @@ async function validateGossipAttestationNoSignatureCheck( }); } + let committeeBits: BitArray | undefined = undefined; if (attestationOrCache.cache) { // there could be up to 6% of cpu time to compute signing root if we don't clone the signature set signatureSet = createSingleSignatureSetFromComponents( @@ -453,6 +472,7 @@ async function validateGossipAttestationNoSignatureCheck( signature ); attDataRootHex = attestationOrCache.cache.attDataRootHex; + committeeBits = attestationOrCache.cache.committeeBits; } else { signatureSet = createSingleSignatureSetFromComponents( chain.index2pubkey[validatorIndex], @@ -462,9 +482,15 @@ async function validateGossipAttestationNoSignatureCheck( // add cached attestation data before verifying signature attDataRootHex = toHexString(ssz.phase0.AttestationData.hashTreeRoot(attData)); - if (attDataBase64) { - chain.seenAttestationDatas.add(attSlot, attDataBase64, { - committeeIndices, + // if attestation is phase0 the committeeBits is undefined anyway + committeeBits = isElectraAttestation(attestationOrCache.attestation) + ? attestationOrCache.attestation.committeeBits.clone() + : undefined; + if (attDataKey) { + chain.seenAttestationDatas.add(attSlot, attDataKey, { + committeeValidatorIndices, + committeeBits, + committeeIndex, signingRoot: signatureSet.signingRoot, subnet: expectedSubnet, // precompute this to be used in forkchoice @@ -486,14 +512,21 @@ async function validateGossipAttestationNoSignatureCheck( ? (indexedAttestationContent as electra.IndexedAttestation) : (indexedAttestationContent as phase0.IndexedAttestation); - const attestation: allForks.Attestation = attestationOrCache.attestation - ? attestationOrCache.attestation - : { - aggregationBits, - data: attData, - signature, - }; - return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex, signatureSet, validatorIndex}; + const attestation: allForks.Attestation = attestationOrCache.attestation ?? { + aggregationBits, + data: attData, + committeeBits, + signature, + }; + return { + attestation, + indexedAttestation, + subnet: expectedSubnet, + attDataRootHex, + signatureSet, + validatorIndex, + committeeIndex, + }; } /** diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index 4672a4db825..6fd41a56976 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -38,8 +38,8 @@ import { AggregateAndProofValidationResult, validateGossipAttestationsSameAttData, validateGossipAttestation, - AttestationOrBytes, AttestationValidationResult, + GossipAttestation, } from "../../chain/validation/index.js"; import {NetworkEvent, NetworkEventBus} from "../events.js"; import {PeerAction} from "../peers/index.js"; @@ -488,14 +488,14 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler } // Handler - const {indexedAttestation, attDataRootHex, attestation} = validationResult; + const {indexedAttestation, attDataRootHex, attestation, committeeIndex} = validationResult; metrics?.registerGossipUnaggregatedAttestation(seenTimestampSec, indexedAttestation); try { // Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages // but don't add to attestation pool, to save CPU and RAM if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) { - const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); + const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex); metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); } } catch (e) { @@ -680,7 +680,7 @@ function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOp serializedData: param.gossipData.serializedData, attSlot: param.gossipData.msgSlot, attDataBase64: param.gossipData.indexed, - })) as AttestationOrBytes[]; + })) as GossipAttestation[]; const {results: validationResults, batchableBls} = await validateGossipAttestationsSameAttData( fork, chain, @@ -696,14 +696,14 @@ function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOp results.push(null); // Handler - const {indexedAttestation, attDataRootHex, attestation} = validationResult.result; + const {indexedAttestation, attDataRootHex, attestation, committeeIndex} = validationResult.result; metrics?.registerGossipUnaggregatedAttestation(gossipHandlerParams[i].seenTimestampSec, indexedAttestation); try { // Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages // but don't add to attestation pool, to save CPU and RAM if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) { - const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); + const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex); metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); } } catch (e) { diff --git a/packages/beacon-node/src/network/processor/gossipQueues/index.ts b/packages/beacon-node/src/network/processor/gossipQueues/index.ts index 366b23b3067..b38ee74279c 100644 --- a/packages/beacon-node/src/network/processor/gossipQueues/index.ts +++ b/packages/beacon-node/src/network/processor/gossipQueues/index.ts @@ -1,7 +1,8 @@ import {mapValues} from "@lodestar/utils"; +import {ForkSeq} from "@lodestar/params"; import {GossipType} from "../../gossip/interface.js"; import {PendingGossipsubMessage} from "../types.js"; -import {getAttDataBase64FromAttestationSerialized} from "../../../util/sszBytes.js"; +import {getSeenAttDataKey} from "../../../util/sszBytes.js"; import {LinearGossipQueue} from "./linear.js"; import { DropType, @@ -86,7 +87,10 @@ const indexedGossipQueueOpts: { } = { [GossipType.beacon_attestation]: { maxLength: 24576, - indexFn: (item: PendingGossipsubMessage) => getAttDataBase64FromAttestationSerialized(item.msg.data), + indexFn: (item: PendingGossipsubMessage) => { + const {topic, msg} = item; + return getSeenAttDataKey(ForkSeq[topic.fork], msg.data); + }, minChunkSize: MIN_SIGNATURE_SETS_TO_BATCH_VERIFY, maxChunkSize: MAX_GOSSIP_ATTESTATION_BATCH_SIZE, }, diff --git a/packages/beacon-node/src/network/processor/gossipQueues/indexed.ts b/packages/beacon-node/src/network/processor/gossipQueues/indexed.ts index 4e29a52173f..8edba7dfaad 100644 --- a/packages/beacon-node/src/network/processor/gossipQueues/indexed.ts +++ b/packages/beacon-node/src/network/processor/gossipQueues/indexed.ts @@ -84,6 +84,7 @@ export class IndexedGossipQueueMinSize= ForkSeq.electra ? getSeenAttDataKeyElectra(data) : getSeenAttDataKeyPhase0(data); +} + +/** + * Extract attestation data + committeeBits base64 from electra attestation serialized bytes. + * Return null if data is not long enough to extract attestation data. + */ +export function getSeenAttDataKeyElectra(electraAttestationBytes: Uint8Array): AttDataCommitteeBitsBase64 | null { + const startIndex = VARIABLE_FIELD_OFFSET; + const seenKeyLength = ATTESTATION_DATA_SIZE + COMMITTEE_BITS_SIZE; + + if (electraAttestationBytes.length < startIndex + seenKeyLength) { + return null; + } + + return Buffer.from(electraAttestationBytes.subarray(startIndex, startIndex + seenKeyLength)).toString("base64"); +} + +/** + * Extract attestation data base64 from phase0 attestation serialized bytes. * Return null if data is not long enough to extract attestation data. */ -export function getAttDataBase64FromAttestationSerialized(data: Uint8Array): AttDataBase64 | null { +export function getSeenAttDataKeyPhase0(data: Uint8Array): AttDataBase64 | null { if (data.length < VARIABLE_FIELD_OFFSET + ATTESTATION_DATA_SIZE) { return null; } // base64 is a bit efficient than hex - return Buffer.from(data.slice(VARIABLE_FIELD_OFFSET, VARIABLE_FIELD_OFFSET + ATTESTATION_DATA_SIZE)).toString( + return Buffer.from(data.subarray(VARIABLE_FIELD_OFFSET, VARIABLE_FIELD_OFFSET + ATTESTATION_DATA_SIZE)).toString( "base64" ); } diff --git a/packages/beacon-node/test/perf/chain/validation/attestation.test.ts b/packages/beacon-node/test/perf/chain/validation/attestation.test.ts index 5fce9a34250..8f462609c06 100644 --- a/packages/beacon-node/test/perf/chain/validation/attestation.test.ts +++ b/packages/beacon-node/test/perf/chain/validation/attestation.test.ts @@ -5,7 +5,7 @@ import {ssz} from "@lodestar/types"; import {generateTestCachedBeaconStateOnlyValidators} from "../../../../../state-transition/test/perf/util.js"; import {validateAttestation, validateGossipAttestationsSameAttData} from "../../../../src/chain/validation/index.js"; import {getAttestationValidData} from "../../../utils/validationData/attestation.js"; -import {getAttDataBase64FromAttestationSerialized} from "../../../../src/util/sszBytes.js"; +import {getSeenAttDataKeyPhase0} from "../../../../src/util/sszBytes.js"; describe("validate gossip attestation", () => { setBenchOpts({ @@ -42,7 +42,7 @@ describe("validate gossip attestation", () => { attestation: null, serializedData, attSlot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet0 ); @@ -67,7 +67,7 @@ describe("validate gossip attestation", () => { attestation: null, serializedData, attSlot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + attDataBase64: getSeenAttDataKeyPhase0(serializedData), }; }); diff --git a/packages/beacon-node/test/unit/chain/validation/attestation/validateAttestation.test.ts b/packages/beacon-node/test/unit/chain/validation/attestation/validateAttestation.test.ts index 56aab699f4f..c7d9dc775c8 100644 --- a/packages/beacon-node/test/unit/chain/validation/attestation/validateAttestation.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/attestation/validateAttestation.test.ts @@ -12,7 +12,7 @@ import { validateApiAttestation, validateAttestation, } from "../../../../../src/chain/validation/index.js"; -import {getAttDataBase64FromAttestationSerialized} from "../../../../../src/util/sszBytes.js"; +import {getSeenAttDataKeyPhase0} from "../../../../../src/util/sszBytes.js"; import {memoOnce} from "../../../../utils/cache.js"; import {expectRejectedWithLodestarError} from "../../../../utils/errors.js"; import {AttestationValidDataOpts, getAttestationValidData} from "../../../../utils/validationData/attestation.js"; @@ -52,7 +52,7 @@ describe("validateAttestation", () => { const {chain, subnet} = getValidData(); await expectGossipError( chain, - {attestation: null, serializedData: Buffer.alloc(0), attSlot: 0, attDataBase64: "invalid"}, + {attestation: null, serializedData: Buffer.alloc(0), attSlot: 0, seenAttestationDataKey: "invalid"}, subnet, GossipErrorCode.INVALID_SERIALIZED_BYTES_ERROR_CODE ); @@ -72,7 +72,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.BAD_TARGET_EPOCH @@ -91,7 +91,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.PAST_SLOT @@ -110,7 +110,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.FUTURE_SLOT @@ -135,7 +135,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.NOT_EXACTLY_ONE_AGGREGATION_BIT_SET @@ -155,7 +155,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.NOT_EXACTLY_ONE_AGGREGATION_BIT_SET @@ -179,7 +179,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.UNKNOWN_OR_PREFINALIZED_BEACON_BLOCK_ROOT @@ -199,7 +199,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.INVALID_TARGET_ROOT @@ -226,7 +226,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.WRONG_NUMBER_OF_AGGREGATION_BITS @@ -245,7 +245,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, invalidSubnet, AttestationErrorCode.INVALID_SUBNET_ID @@ -265,7 +265,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.ATTESTATION_ALREADY_KNOWN @@ -287,7 +287,7 @@ describe("validateAttestation", () => { attestation: null, serializedData, attSlot: attestation.data.slot, - attDataBase64: getAttDataBase64FromAttestationSerialized(serializedData), + seenAttestationDataKey: getSeenAttDataKeyPhase0(serializedData), }, subnet, AttestationErrorCode.INVALID_SIGNATURE diff --git a/packages/beacon-node/test/unit/util/sszBytes.test.ts b/packages/beacon-node/test/unit/util/sszBytes.test.ts index d0dc150cd79..8c277300d83 100644 --- a/packages/beacon-node/test/unit/util/sszBytes.test.ts +++ b/packages/beacon-node/test/unit/util/sszBytes.test.ts @@ -4,7 +4,7 @@ import {allForks, deneb, Epoch, isElectraAttestation, phase0, RootHex, Slot, ssz import {fromHex, toHex} from "@lodestar/utils"; import {ForkName, MAX_COMMITTEES_PER_SLOT} from "@lodestar/params"; import { - getAttDataBase64FromAttestationSerialized, + getSeenAttDataKeyPhase0, getAttDataBase64FromSignedAggregateAndProofSerialized, getAggregationBitsFromAttestationSerialized as getAggregationBitsFromAttestationSerialized, getBlockRootFromAttestationSerialized, @@ -62,7 +62,7 @@ describe("attestation SSZ serialized picking", () => { } const attDataBase64 = ssz.phase0.AttestationData.serialize(attestation.data); - expect(getAttDataBase64FromAttestationSerialized(bytes)).toBe(Buffer.from(attDataBase64).toString("base64")); + expect(getSeenAttDataKeyPhase0(bytes)).toBe(Buffer.from(attDataBase64).toString("base64")); }); } @@ -83,7 +83,7 @@ describe("attestation SSZ serialized picking", () => { it("getAttDataBase64FromAttestationSerialized - invalid data", () => { const invalidAttDataBase64DataSizes = [0, 4, 100, 128, 131]; for (const size of invalidAttDataBase64DataSizes) { - expect(getAttDataBase64FromAttestationSerialized(Buffer.alloc(size))).toBeNull(); + expect(getSeenAttDataKeyPhase0(Buffer.alloc(size))).toBeNull(); } }); diff --git a/packages/validator/src/services/attestation.ts b/packages/validator/src/services/attestation.ts index 15693776746..5ba487d7d9f 100644 --- a/packages/validator/src/services/attestation.ts +++ b/packages/validator/src/services/attestation.ts @@ -1,5 +1,5 @@ import {toHexString} from "@chainsafe/ssz"; -import {allForks, BLSSignature, phase0, Slot, ssz} from "@lodestar/types"; +import {allForks, BLSSignature, electra, isElectraAttestation, phase0, Slot, ssz} from "@lodestar/types"; import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition"; import {sleep} from "@lodestar/utils"; import {Api, ApiError, routes} from "@lodestar/api"; @@ -184,7 +184,7 @@ export class AttestationService { attestationNoCommittee: phase0.AttestationData, duties: AttDutyAndProof[] ): Promise { - const signedAttestations: phase0.Attestation[] = []; + const signedAttestations: allForks.Attestation[] = []; const headRootHex = toHexString(attestationNoCommittee.beaconBlockRoot); const currentEpoch = computeEpochAtSlot(slot); const isAfterElectra = currentEpoch >= this.config.ELECTRA_FORK_EPOCH; diff --git a/packages/validator/src/services/validatorStore.ts b/packages/validator/src/services/validatorStore.ts index 04923cbddf3..f36032004aa 100644 --- a/packages/validator/src/services/validatorStore.ts +++ b/packages/validator/src/services/validatorStore.ts @@ -531,7 +531,7 @@ export class ValidatorStore { data: attestationData, committeeBits: BitArray.fromSingleBit(MAX_COMMITTEES_PER_SLOT, duty.committeeIndex), signature: await this.getSignature(duty.pubkey, signingRoot, signingSlot, signableMessage), - } as electra.Attestation; + }; } else { return { aggregationBits: BitArray.fromSingleBit(duty.committeeLength, duty.validatorCommitteeIndex),