Skip to content

Commit

Permalink
Limit preaggregating attestations (#5256)
Browse files Browse the repository at this point in the history
* Limit preaggregating attestations

* preaggregateSlotDistance hidden cli param

* Log debug if error adding SyncCommitteeMessage to pool

* SyncCommitteeMessagePool: return instead of throw error

* Add SyncCommitteeMesssage insertOutcome metric

* Update prune() method header

Co-authored-by: Cayman <caymannava@gmail.com>

---------

Co-authored-by: Cayman <caymannava@gmail.com>
  • Loading branch information
twoeths and wemeetagain committed Mar 29, 2023
1 parent c39c6e6 commit 1db18b9
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 14 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ export function getBeaconPoolApi({
}

errors.push(e as Error);
logger.error(
logger.debug(
`Error on submitPoolSyncCommitteeSignatures [${i}]`,
{slot: signature.slot, validatorIndex: signature.validatorIndex},
e as Error
Expand Down
12 changes: 10 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ export class BeaconChain implements IBeaconChain {
readonly reprocessController: ReprocessController;

// Ops pool
readonly attestationPool = new AttestationPool();
readonly attestationPool: AttestationPool;
readonly aggregatedAttestationPool = new AggregatedAttestationPool();
readonly syncCommitteeMessagePool = new SyncCommitteeMessagePool();
readonly syncCommitteeMessagePool: SyncCommitteeMessagePool;
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
readonly opPool = new OpPool();

Expand Down Expand Up @@ -185,6 +185,14 @@ export class BeaconChain implements IBeaconChain {

if (!clock) clock = new LocalClock({config, emitter, genesisTime: this.genesisTime, signal});

const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(
clock,
preAggregateCutOffTime,
this.opts?.preaggregateSlotDistance
);

this.seenAggregatedAttestations = new SeenAggregatedAttestations(metrics);
this.seenContributionAndProof = new SeenContributionAndProof(metrics);

Expand Down
22 changes: 18 additions & 4 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {PointFormat, Signature} from "@chainsafe/bls/types";
import bls from "@chainsafe/bls";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {MapDef} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -60,6 +61,12 @@ export class AttestationPool {
);
private lowestPermissibleSlot = 0;

constructor(
private readonly clock: BeaconClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}

/** Returns current count of pre-aggregated attestations with unique data */
getAttestationCount(): number {
let attestationCount = 0;
Expand All @@ -77,7 +84,8 @@ export class AttestationPool {
* `SignedAggregateAndProof`.
*
* If the attestation is too old (low slot) to be included in the pool it is simply dropped
* and no error is returned.
* and no error is returned. Also if it's at clock slot but come to the pool later than 2/3
* of slot time, it's dropped too since it's not helpful for the validator anymore
*
* Expects the attestation to be fully validated:
* - Valid signature
Expand All @@ -94,6 +102,11 @@ export class AttestationPool {
return InsertOutcome.Old;
}

// Reject attestations in the current slot but come to this pool very late
if (this.clock.secFromSlot(slot) > this.cutOffSecFromSlot) {
return InsertOutcome.Late;
}

// Limit object per slot
const aggregateByRoot = this.attestationByRootBySlot.getOrDefault(slot);
if (aggregateByRoot.size >= MAX_ATTESTATIONS_PER_SLOT) {
Expand Down Expand Up @@ -130,12 +143,13 @@ export class AttestationPool {
}

/**
* Removes any attestations with a slot lower than `current_slot` and bars any future
* attestations with a slot lower than `current_slot - SLOTS_RETAINED`.
* Removes any attestations with a slot lower than `current_slot - preaggregateSlotDistance`.
* By default, not interested in attestations in old slots, we only preaggregate attestations for the current slot.
*/
prune(clockSlot: Slot): void {
pruneBySlot(this.attestationByRootBySlot, clockSlot, SLOTS_RETAINED);
this.lowestPermissibleSlot = Math.max(clockSlot - SLOTS_RETAINED, 0);
// by default preaggregateSlotDistance is 0, i.e only accept attestations in the same clock slot.
this.lowestPermissibleSlot = Math.max(clockSlot - this.preaggregateSlotDistance, 0);
}

/**
Expand Down
17 changes: 15 additions & 2 deletions packages/beacon-node/src/chain/opPools/syncCommitteeMessagePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {SYNC_COMMITTEE_SIZE, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params
import {altair, Root, Slot, SubcommitteeIndex} from "@lodestar/types";
import {BitArray, toHexString} from "@chainsafe/ssz";
import {MapDef} from "@lodestar/utils";
import {BeaconClock} from "../clock/interface.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

Expand Down Expand Up @@ -44,6 +45,12 @@ export class SyncCommitteeMessagePool {
>(() => new MapDef<Subnet, Map<BlockRootHex, ContributionFast>>(() => new Map<BlockRootHex, ContributionFast>()));
private lowestPermissibleSlot = 0;

constructor(
private readonly clock: BeaconClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
) {}

/** Returns current count of unique ContributionFast by block root and subnet */
get size(): number {
let count = 0;
Expand All @@ -63,7 +70,12 @@ export class SyncCommitteeMessagePool {

// Reject if too old.
if (slot < lowestPermissibleSlot) {
throw new OpPoolError({code: OpPoolErrorCode.SLOT_TOO_LOW, slot, lowestPermissibleSlot});
return InsertOutcome.Old;
}

// validator gets SyncCommitteeContribution at 2/3 of slot, it's no use to preaggregate later than that time
if (this.clock.secFromSlot(slot) > this.cutOffSecFromSlot) {
return InsertOutcome.Late;
}

// Limit object per slot
Expand Down Expand Up @@ -106,7 +118,8 @@ export class SyncCommitteeMessagePool {
*/
prune(clockSlot: Slot): void {
pruneBySlot(this.contributionsByRootBySubnetBySlot, clockSlot, SLOTS_RETAINED);
this.lowestPermissibleSlot = Math.max(clockSlot - SLOTS_RETAINED, 0);
// by default preaggregateSlotDistance is 0, i.e only accept SyncCommitteeMessage in the same clock slot.
this.lowestPermissibleSlot = Math.max(clockSlot - this.preaggregateSlotDistance, 0);
}
}

Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/opPools/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export enum InsertOutcome {
AlreadyKnown = "AlreadyKnown",
/** Not existing in the pool but it's too old to add. No changes were made. */
Old = "Old",
/** Attestation comes to the pool at > 2/3 of slot. No changes were made */
Late = "Late",
/** The data is know, and the new participants have been added to the aggregated signature */
Aggregated = "Aggregated",
/** The data is not better than the existing data*/
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {ForkChoiceOpts} from "./forkChoice/index.js";
import {LightClientServerOpts} from "./lightClient/index.js";

export type IChainOptions = BlockProcessOpts &
PoolOpts &
ForkChoiceOpts &
ArchiverOpts &
LightClientServerOpts & {
Expand Down Expand Up @@ -47,6 +48,13 @@ export type BlockProcessOpts = {
emitPayloadAttributes?: boolean;
};

export type PoolOpts = {
/**
* Only preaggregate attestation/sync committee message since clockSlot - preaggregateSlotDistance
*/
preaggregateSlotDistance?: number;
};

export const defaultChainOptions: IChainOptions = {
blsVerifyAllMainThread: false,
blsVerifyAllMultiThread: false,
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,11 @@ export function createLodestarMetrics(
name: "lodestar_oppool_sync_committee_message_pool_size",
help: "Current size of the SyncCommitteeMessagePool unique by slot subnet and block root",
}),
syncCommitteeMessagePoolInsertOutcome: register.counter<"insertOutcome">({
name: "lodestar_oppool_sync_committee_message_insert_outcome_total",
help: "Total number of InsertOutcome as a result of adding a SyncCommitteeMessage to pool",
labelNames: ["insertOutcome"],
}),
syncContributionAndProofPoolSize: register.gauge({
name: "lodestar_oppool_sync_contribution_and_proof_pool_pool_size",
help: "Current size of the SyncContributionAndProofPool unique by slot subnet and block root",
Expand Down
5 changes: 3 additions & 2 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
// Handler

try {
chain.syncCommitteeMessagePool.add(subnet, syncCommittee, indexInSubcommittee);
const insertOutcome = chain.syncCommitteeMessagePool.add(subnet, syncCommittee, indexInSubcommittee);
metrics?.opPool.syncCommitteeMessagePoolInsertOutcome.inc({insertOutcome});
} catch (e) {
logger.error("Error adding to syncCommittee pool", {subnet}, e as Error);
logger.debug("Error adding to syncCommittee pool", {subnet}, e as Error);
}
},

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import {expect} from "chai";
import sinon, {SinonStubbedInstance} from "sinon";
import bls from "@chainsafe/bls";
import {altair} from "@lodestar/types";
import {toHexString} from "@chainsafe/ssz";
import {SyncCommitteeMessagePool} from "../../../../src/chain/opPools/index.js";
import {LocalClock} from "../../../../src/chain/clock/LocalClock.js";

describe("chain / opPools / SyncCommitteeMessagePool", function () {
const sandbox = sinon.createSandbox();
let cache: SyncCommitteeMessagePool;
const subcommitteeIndex = 2;
const indexInSubcommittee = 3;
const beaconBlockRoot = Buffer.alloc(32, 1);
const slot = 10;
let syncCommittee: altair.SyncCommitteeMessage;
let clockStub: SinonStubbedInstance<LocalClock>;
const cutOffTime = 1;

before("Init BLS", async () => {
const sk = bls.SecretKey.fromBytes(Buffer.alloc(32, 1));
Expand All @@ -23,11 +28,17 @@ describe("chain / opPools / SyncCommitteeMessagePool", function () {
});

beforeEach(() => {
cache = new SyncCommitteeMessagePool();
clockStub = sandbox.createStubInstance(LocalClock);
cache = new SyncCommitteeMessagePool(clockStub, cutOffTime);
cache.add(subcommitteeIndex, syncCommittee, indexInSubcommittee);
});

afterEach(function () {
sandbox.restore();
});

it("should preaggregate SyncCommitteeContribution", () => {
clockStub.secFromSlot.returns(0);
let contribution = cache.getContribution(subcommitteeIndex, syncCommittee.slot, syncCommittee.beaconBlockRoot);
expect(contribution).to.be.not.null;
const newSecretKey = bls.SecretKey.fromBytes(Buffer.alloc(32, 2));
Expand Down
6 changes: 4 additions & 2 deletions packages/beacon-node/test/utils/mocks/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ export class MockBeaconChain implements IBeaconChain {
reprocessController: ReprocessController;

// Ops pool
readonly attestationPool = new AttestationPool();
readonly attestationPool: AttestationPool;
readonly aggregatedAttestationPool = new AggregatedAttestationPool();
readonly syncCommitteeMessagePool = new SyncCommitteeMessagePool();
readonly syncCommitteeMessagePool: SyncCommitteeMessagePool;
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
readonly opPool = new OpPool();

Expand Down Expand Up @@ -126,6 +126,8 @@ export class MockBeaconChain implements IBeaconChain {
emitter: this.emitter,
signal: this.abortController.signal,
});
this.attestationPool = new AttestationPool(this.clock, (2 / 3) * this.config.SECONDS_PER_SLOT);
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(this.clock, (2 / 3) * this.config.SECONDS_PER_SLOT);
this.forkChoice = mockForkChoice();
this.stateCache = new StateContextCache({});
this.checkpointStateCache = new CheckpointStateCache({});
Expand Down
9 changes: 9 additions & 0 deletions packages/cli/src/options/beaconNodeOptions/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export type ChainArgs = {
// "chain.persistInvalidSszObjectsDir": string;
"chain.proposerBoostEnabled": boolean;
"chain.disableImportExecutionFcU": boolean;
"chain.preaggregateSlotDistance": number;
"chain.computeUnrealized": boolean;
"chain.assertCorrectProgressiveBalances": boolean;
"chain.maxSkipSlots": number;
Expand All @@ -31,6 +32,7 @@ export function parseArgs(args: ChainArgs): IBeaconNodeOptions["chain"] {
persistInvalidSszObjectsDir: undefined as any,
proposerBoostEnabled: args["chain.proposerBoostEnabled"],
disableImportExecutionFcU: args["chain.disableImportExecutionFcU"],
preaggregateSlotDistance: args["chain.preaggregateSlotDistance"],
computeUnrealized: args["chain.computeUnrealized"],
assertCorrectProgressiveBalances: args["chain.assertCorrectProgressiveBalances"],
maxSkipSlots: args["chain.maxSkipSlots"],
Expand Down Expand Up @@ -104,6 +106,13 @@ Will double processing times. Use only for debugging purposes.",
group: "chain",
},

"chain.preaggregateSlotDistance": {
hidden: true,
type: "number",
description: "Only preaggregate attestations or sync committee message since clockSlot - preaggregateSlotDistance",
group: "chain",
},

"chain.computeUnrealized": {
hidden: true,
type: "boolean",
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ describe("options / beaconNodeOptions", () => {
"chain.persistInvalidSszObjects": true,
"chain.proposerBoostEnabled": false,
"chain.disableImportExecutionFcU": false,
"chain.preaggregateSlotDistance": 1,
"chain.computeUnrealized": true,
suggestedFeeRecipient: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
"chain.assertCorrectProgressiveBalances": true,
Expand Down Expand Up @@ -110,6 +111,7 @@ describe("options / beaconNodeOptions", () => {
persistInvalidSszObjects: true,
proposerBoostEnabled: false,
disableImportExecutionFcU: false,
preaggregateSlotDistance: 1,
computeUnrealized: true,
safeSlotsToImportOptimistically: 256,
suggestedFeeRecipient: "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
Expand Down

0 comments on commit 1db18b9

Please sign in to comment.