Skip to content

Commit

Permalink
Only preaggregate attestations if there are connected aggregators (#5284
Browse files Browse the repository at this point in the history
)

* Only process attestations if there are connected aggregators

* Track aggregator slot subnet count

* Fix deneb sim test
  • Loading branch information
twoeths committed Mar 29, 2023
1 parent 1db18b9 commit 97f5360
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 13 deletions.
6 changes: 4 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ export function getBeaconPoolApi({
beaconBlockRoot
);

const insertOutcome = chain.attestationPool.add(attestation);
if (network.attnetsService.shouldProcess(subnet, slot)) {
const insertOutcome = chain.attestationPool.add(attestation);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
const sentPeers = await network.gossip.publishBeaconAttestation(attestation, subnet);
metrics?.submitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
} catch (e) {
errors.push(e as Error);
logger.error(
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ export function createLodestarMetrics(
help: "Count of unsubscribe_subnets calls",
labelNames: ["subnet", "src"],
}),
aggregatorSlotSubnetCount: register.gauge({
name: "lodestar_attnets_service_aggregator_slot_subnet_total",
help: "Count of aggregator per slot and subnet",
}),
},

syncnetsService: {
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import {INetworkEventBus} from "./events.js";
import {GossipBeaconNode, GossipType} from "./gossip/index.js";
import {PeerAction, PeerScoreStats} from "./peers/index.js";
import {IReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js";
import {CommitteeSubscription} from "./subnets/index.js";
import {PendingGossipsubMessage} from "./processor/types.js";
import {AttnetsService, CommitteeSubscription} from "./subnets/index.js";

export type PeerSearchOptions = {
supportsProtocols?: string[];
Expand All @@ -28,6 +28,7 @@ export interface INetwork {

events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
attnetsService: AttnetsService;
gossip: GossipBeaconNode;

getEnr(): Promise<SignableENR | undefined>;
Expand Down
31 changes: 29 additions & 2 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
SLOTS_PER_EPOCH,
} from "@lodestar/params";
import {Epoch, Slot, ssz} from "@lodestar/types";
import {Logger, randBetween} from "@lodestar/utils";
import {Logger, MapDef, randBetween} from "@lodestar/utils";
import {shuffle} from "../../util/shuffle.js";
import {ChainEvent, IBeaconChain} from "../../chain/index.js";
import {GossipTopic, GossipType} from "../gossip/index.js";
Expand Down Expand Up @@ -45,6 +45,11 @@ export class AttnetsService implements IAttnetsService {
private subscriptionsCommittee = new SubnetMap();
/** Same as `subscriptionsCommittee` but for long-lived subnets. May overlap with `subscriptionsCommittee` */
private subscriptionsRandom = new SubnetMap();
/**
* Map of an aggregator at a slot and subnet
* Used to determine if we should process an attestation.
*/
private aggregatorSlotSubnet = new MapDef<Slot, Set<number>>(() => new Set());

/**
* A collection of seen validators. These dictate how many random subnets we should be
Expand Down Expand Up @@ -119,6 +124,7 @@ export class AttnetsService implements IAttnetsService {
if (isAggregator) {
// need exact slot here
subnetsToSubscribe.push({subnet, toSlot: slot});
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
}
}

Expand All @@ -141,7 +147,10 @@ export class AttnetsService implements IAttnetsService {
* Check if a subscription is still active before handling a gossip object
*/
shouldProcess(subnet: number, slot: Slot): boolean {
return this.subscriptionsCommittee.isActiveAtSlot(subnet, slot);
if (!this.aggregatorSlotSubnet.has(slot)) {
return false;
}
return this.aggregatorSlotSubnet.getOrDefault(slot).has(subnet);
}

/** Call ONLY ONCE: Two epoch before the fork, re-subscribe all existing random subscriptions to the new fork */
Expand Down Expand Up @@ -184,6 +193,7 @@ export class AttnetsService implements IAttnetsService {
try {
const slot = computeStartSlotAtEpoch(epoch);
this.pruneExpiredKnownValidators(slot);
this.pruneExpiredAggregator(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -246,6 +256,18 @@ export class AttnetsService implements IAttnetsService {
if (deletedKnownValidators) this.rebalanceRandomSubnets();
}

/**
* No need to track aggregator for past slots.
* @param currentSlot
*/
private pruneExpiredAggregator(currentSlot: Slot): void {
for (const slot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > slot) {
this.aggregatorSlotSubnet.delete(slot);
}
}
}

/**
* Called when we have new validators or expired validators.
* knownValidators should be updated before this function.
Expand Down Expand Up @@ -350,5 +372,10 @@ export class AttnetsService implements IAttnetsService {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
metrics.attnetsService.subscriptionsRandom.set(this.subscriptionsRandom.size);
let aggregatorCount = 0;
for (const subnets of this.aggregatorSlotSubnet.values()) {
aggregatorCount += subnets.size;
}
metrics.attnetsService.aggregatorSlotSubnetCount.set(aggregatorCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import {
} from "@lodestar/params";
import {createBeaconConfig} from "@lodestar/config";
import {BeaconStateAllForks, getCurrentSlot} from "@lodestar/state-transition";
import {MockBeaconChain} from "../../utils/mocks/chain/chain.js";
import {generateState} from "../../utils/state.js";
import {testLogger} from "../../utils/logger.js";
import {MetadataController} from "../../../src/network/metadata.js";
import {Eth2Gossipsub, GossipType} from "../../../src/network/gossip/index.js";
import {AttnetsService, CommitteeSubscription, ShuffleFn} from "../../../src/network/subnets/index.js";
import {ChainEvent, IBeaconChain} from "../../../src/chain/index.js";
import {ZERO_HASH} from "../../../src/constants/index.js";
import {MockBeaconChain} from "../../../utils/mocks/chain/chain.js";
import {generateState} from "../../../utils/state.js";
import {testLogger} from "../../../utils/logger.js";
import {MetadataController} from "../../../../src/network/metadata.js";
import {Eth2Gossipsub, GossipType} from "../../../../src/network/gossip/index.js";
import {AttnetsService, CommitteeSubscription, ShuffleFn} from "../../../../src/network/subnets/index.js";
import {ChainEvent, IBeaconChain} from "../../../../src/chain/index.js";
import {ZERO_HASH} from "../../../../src/constants/index.js";

describe("AttnetsService", function () {
const COMMITTEE_SUBNET_SUBSCRIPTION = 10;
Expand Down Expand Up @@ -193,6 +193,7 @@ describe("AttnetsService", function () {
randomSubnet = COMMITTEE_SUBNET_SUBSCRIPTION;
const aggregatorSubscription: CommitteeSubscription = {...subscription, isAggregator: true};
service.addCommitteeSubscriptions([aggregatorSubscription]);
expect(service.shouldProcess(subscription.subnet, subscription.slot)).to.be.true;
expect(service.getActiveSubnets()).to.be.deep.equal([{subnet: COMMITTEE_SUBNET_SUBSCRIPTION, toSlot: 101}]);
// committee subnet is same to random subnet
expect(gossipStub.subscribeTopic).to.be.calledOnce;
Expand All @@ -202,4 +203,10 @@ describe("AttnetsService", function () {
// don't unsubscribe bc random subnet is still there
expect(gossipStub.unsubscribeTopic).to.be.not.called;
});

it("should not process if no aggregator at dutied slot", () => {
expect(subscription.isAggregator).to.be.false;
service.addCommitteeSubscriptions([subscription]);
expect(service.shouldProcess(subscription.subnet, subscription.slot)).to.be.false;
});
});

0 comments on commit 97f5360

Please sign in to comment.