Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only preaggregate attestations if there are connected aggregators #5284

Merged
merged 3 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ export function getBeaconPoolApi({
beaconBlockRoot
);

const insertOutcome = chain.attestationPool.add(attestation);
if (network.attnetsService.shouldProcess(subnet, slot)) {
const insertOutcome = chain.attestationPool.add(attestation);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
const sentPeers = await network.gossip.publishBeaconAttestation(attestation, subnet);
metrics?.submitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
} catch (e) {
errors.push(e as Error);
logger.error(
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ export function createLodestarMetrics(
help: "Count of unsubscribe_subnets calls",
labelNames: ["subnet", "src"],
}),
aggregatorSlotSubnetCount: register.gauge({
name: "lodestar_attnets_service_aggregator_slot_subnet_total",
help: "Count of aggregator per slot and subnet",
}),
},

syncnetsService: {
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {INetworkEventBus} from "./events.js";
import {GossipBeaconNode} from "./gossip/index.js";
import {PeerAction, PeerScoreStats} from "./peers/index.js";
import {IReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js";
import {CommitteeSubscription} from "./subnets/index.js";
import {AttnetsService, CommitteeSubscription} from "./subnets/index.js";

export type PeerSearchOptions = {
supportsProtocols?: string[];
Expand All @@ -27,6 +27,7 @@ export interface INetwork {

events: INetworkEventBus;
reqResp: IReqRespBeaconNode;
attnetsService: AttnetsService;
gossip: GossipBeaconNode;

getEnr(): Promise<SignableENR | undefined>;
Expand Down
31 changes: 29 additions & 2 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
SLOTS_PER_EPOCH,
} from "@lodestar/params";
import {Epoch, Slot, ssz} from "@lodestar/types";
import {Logger, randBetween} from "@lodestar/utils";
import {Logger, MapDef, randBetween} from "@lodestar/utils";
import {shuffle} from "../../util/shuffle.js";
import {ChainEvent, IBeaconChain} from "../../chain/index.js";
import {GossipTopic, GossipType} from "../gossip/index.js";
Expand Down Expand Up @@ -45,6 +45,11 @@ export class AttnetsService implements IAttnetsService {
private subscriptionsCommittee = new SubnetMap();
/** Same as `subscriptionsCommittee` but for long-lived subnets. May overlap with `subscriptionsCommittee` */
private subscriptionsRandom = new SubnetMap();
/**
* Map of an aggregator at a slot and subnet
* Used to determine if we should process an attestation.
*/
private aggregatorSlotSubnet = new MapDef<Slot, Set<number>>(() => new Set());

/**
* A collection of seen validators. These dictate how many random subnets we should be
Expand Down Expand Up @@ -119,6 +124,7 @@ export class AttnetsService implements IAttnetsService {
if (isAggregator) {
// need exact slot here
subnetsToSubscribe.push({subnet, toSlot: slot});
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
}
}

Expand All @@ -141,7 +147,10 @@ export class AttnetsService implements IAttnetsService {
* Check if a subscription is still active before handling a gossip object
*/
shouldProcess(subnet: number, slot: Slot): boolean {
return this.subscriptionsCommittee.isActiveAtSlot(subnet, slot);
if (!this.aggregatorSlotSubnet.has(slot)) {
return false;
}
return this.aggregatorSlotSubnet.getOrDefault(slot).has(subnet);
}

/** Call ONLY ONCE: Two epoch before the fork, re-subscribe all existing random subscriptions to the new fork */
Expand Down Expand Up @@ -184,6 +193,7 @@ export class AttnetsService implements IAttnetsService {
try {
const slot = computeStartSlotAtEpoch(epoch);
this.pruneExpiredKnownValidators(slot);
this.pruneExpiredAggregator(slot);
} catch (e) {
this.logger.error("Error on AttnetsService.onEpoch", {epoch}, e as Error);
}
Expand Down Expand Up @@ -246,6 +256,18 @@ export class AttnetsService implements IAttnetsService {
if (deletedKnownValidators) this.rebalanceRandomSubnets();
}

/**
* No need to track aggregator for past slots.
* @param currentSlot
*/
private pruneExpiredAggregator(currentSlot: Slot): void {
for (const slot of this.aggregatorSlotSubnet.keys()) {
if (currentSlot > slot) {
this.aggregatorSlotSubnet.delete(slot);
}
}
}

/**
* Called when we have new validators or expired validators.
* knownValidators should be updated before this function.
Expand Down Expand Up @@ -350,5 +372,10 @@ export class AttnetsService implements IAttnetsService {
metrics.attnetsService.committeeSubnets.set(this.committeeSubnets.size);
metrics.attnetsService.subscriptionsCommittee.set(this.subscriptionsCommittee.size);
metrics.attnetsService.subscriptionsRandom.set(this.subscriptionsRandom.size);
let aggregatorCount = 0;
for (const subnets of this.aggregatorSlotSubnet.values()) {
aggregatorCount += subnets.size;
}
metrics.attnetsService.aggregatorSlotSubnetCount.set(aggregatorCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import {
} from "@lodestar/params";
import {createBeaconConfig} from "@lodestar/config";
import {BeaconStateAllForks, getCurrentSlot} from "@lodestar/state-transition";
import {MockBeaconChain} from "../../utils/mocks/chain/chain.js";
import {generateState} from "../../utils/state.js";
import {testLogger} from "../../utils/logger.js";
import {MetadataController} from "../../../src/network/metadata.js";
import {Eth2Gossipsub, GossipType} from "../../../src/network/gossip/index.js";
import {AttnetsService, CommitteeSubscription, ShuffleFn} from "../../../src/network/subnets/index.js";
import {ChainEvent, IBeaconChain} from "../../../src/chain/index.js";
import {ZERO_HASH} from "../../../src/constants/index.js";
import {MockBeaconChain} from "../../../utils/mocks/chain/chain.js";
import {generateState} from "../../../utils/state.js";
import {testLogger} from "../../../utils/logger.js";
import {MetadataController} from "../../../../src/network/metadata.js";
import {Eth2Gossipsub, GossipType} from "../../../../src/network/gossip/index.js";
import {AttnetsService, CommitteeSubscription, ShuffleFn} from "../../../../src/network/subnets/index.js";
import {ChainEvent, IBeaconChain} from "../../../../src/chain/index.js";
import {ZERO_HASH} from "../../../../src/constants/index.js";

describe("AttnetsService", function () {
const COMMITTEE_SUBNET_SUBSCRIPTION = 10;
Expand Down Expand Up @@ -193,6 +193,7 @@ describe("AttnetsService", function () {
randomSubnet = COMMITTEE_SUBNET_SUBSCRIPTION;
const aggregatorSubscription: CommitteeSubscription = {...subscription, isAggregator: true};
service.addCommitteeSubscriptions([aggregatorSubscription]);
expect(service.shouldProcess(subscription.subnet, subscription.slot)).to.be.true;
expect(service.getActiveSubnets()).to.be.deep.equal([{subnet: COMMITTEE_SUBNET_SUBSCRIPTION, toSlot: 101}]);
// committee subnet is same to random subnet
expect(gossipStub.subscribeTopic).to.be.calledOnce;
Expand All @@ -202,4 +203,10 @@ describe("AttnetsService", function () {
// don't unsubscribe bc random subnet is still there
expect(gossipStub.unsubscribeTopic).to.be.not.called;
});

it("should not process if no aggregator at dutied slot", () => {
expect(subscription.isAggregator).to.be.false;
service.addCommitteeSubscriptions([subscription]);
expect(service.shouldProcess(subscription.subnet, subscription.slot)).to.be.false;
});
});