diff --git a/packages/api/src/beacon/routes/lodestar.ts b/packages/api/src/beacon/routes/lodestar.ts index 85c79d825933..44771b432176 100644 --- a/packages/api/src/beacon/routes/lodestar.ts +++ b/packages/api/src/beacon/routes/lodestar.ts @@ -81,7 +81,7 @@ export type Api = { /** TODO: description */ getSyncChainsDebugState(): Promise>; /** Dump all items in a gossip queue, by gossipType */ - getGossipQueueItems(gossipType: string): Promise>; + getGossipQueueItems(gossipType: string): Promise>; /** Dump all items in the regen queue */ getRegenQueueItems(): Promise>; /** Dump all items in the block processor queue */ diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 266e8be17cdd..8edef0a8fe4f 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -9,7 +9,7 @@ import {validateBlsToExecutionChange} from "../../../../chain/validation/blsToEx import {validateSyncCommitteeSigOnly} from "../../../../chain/validation/syncCommittee.js"; import {ApiModules} from "../../types.js"; import {AttestationError, GossipAction, SyncCommitteeError} from "../../../../chain/errors/index.js"; -import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js"; +import {validateGossipFnRetryUnknownRoot} from "../../../../network/gossip/handlers/index.js"; export function getBeaconPoolApi({ chain, diff --git a/packages/beacon-node/src/api/impl/lodestar/index.ts b/packages/beacon-node/src/api/impl/lodestar/index.ts index 7e69053a5841..d779f7d18cdc 100644 --- a/packages/beacon-node/src/api/impl/lodestar/index.ts +++ b/packages/beacon-node/src/api/impl/lodestar/index.ts @@ -60,7 +60,7 @@ export function getLodestarApi({ async getGossipQueueItems(gossipType: GossipType | string) { return { - data: await network.dumpGossipQueue(gossipType as GossipType), + data: await network.dumpGossipQueueItems(gossipType), }; }, diff --git a/packages/beacon-node/src/api/impl/validator/index.ts b/packages/beacon-node/src/api/impl/validator/index.ts index f6b2b4f9f3f2..53441c898a81 100644 --- a/packages/beacon-node/src/api/impl/validator/index.ts +++ b/packages/beacon-node/src/api/impl/validator/index.ts @@ -25,7 +25,7 @@ import {CommitteeSubscription} from "../../../network/subnets/index.js"; import {ApiModules} from "../types.js"; import {RegenCaller} from "../../../chain/regen/index.js"; import {getValidatorStatus} from "../beacon/state/utils.js"; -import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js"; +import {validateGossipFnRetryUnknownRoot} from "../../../network/gossip/handlers/index.js"; import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils.js"; /** diff --git a/packages/beacon-node/src/chain/bls/interface.ts b/packages/beacon-node/src/chain/bls/interface.ts index 2abeca62e103..4ce95a675d01 100644 --- a/packages/beacon-node/src/chain/bls/interface.ts +++ b/packages/beacon-node/src/chain/bls/interface.ts @@ -43,9 +43,4 @@ export interface IBlsVerifier { /** For multithread pool awaits terminating all workers */ close(): Promise; - - /** - * Returns true if BLS worker pool is ready to accept more work jobs. - */ - canAcceptWork(): boolean; } diff --git a/packages/beacon-node/src/chain/bls/multithread/index.ts b/packages/beacon-node/src/chain/bls/multithread/index.ts index ac28cd2dd24a..484c29accf09 100644 --- a/packages/beacon-node/src/chain/bls/multithread/index.ts +++ b/packages/beacon-node/src/chain/bls/multithread/index.ts @@ -56,11 +56,6 @@ const MAX_BUFFERED_SIGS = 32; */ const MAX_BUFFER_WAIT_MS = 100; -/** - * Max concurrent jobs on `canAcceptWork` status - */ -const MAX_JOBS_CAN_ACCEPT_WORK = 512; - type WorkerApi = { verifyManySignatureSets(workReqArr: BlsWorkReq[]): Promise; }; @@ -115,7 +110,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } | null = null; private blsVerifyAllMultiThread: boolean; private closed = false; - private workersBusy = 0; constructor(options: BlsMultiThreadWorkerPoolOptions, modules: BlsMultiThreadWorkerPoolModules) { const {logger, metrics} = modules; @@ -133,21 +127,10 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { this.workers = this.createWorkers(implementation, defaultPoolSize); if (metrics) { - metrics.blsThreadPool.queueLength.addCollect(() => { - metrics.blsThreadPool.queueLength.set(this.jobs.length); - metrics.blsThreadPool.workersBusy.set(this.workersBusy); - }); + metrics.blsThreadPool.queueLength.addCollect(() => metrics.blsThreadPool.queueLength.set(this.jobs.length)); } } - canAcceptWork(): boolean { - return ( - this.workersBusy < defaultPoolSize && - // TODO: Should also bound the jobs queue? - this.jobs.length < MAX_JOBS_CAN_ACCEPT_WORK - ); - } - async verifySignatureSets(sets: ISignatureSet[], opts: VerifySignatureOpts = {}): Promise { // Pubkeys are aggregated in the main thread regardless if verified in workers or in main thread this.metrics?.bls.aggregatedPubkeys.inc(getAggregatedPubkeysCount(sets)); @@ -327,7 +310,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { const workerApi = worker.status.workerApi; worker.status = {code: WorkerStatusCode.running, workerApi}; - this.workersBusy++; try { let startedSigSets = 0; @@ -393,7 +375,6 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier { } worker.status = {code: WorkerStatusCode.idle, workerApi}; - this.workersBusy--; // Potentially run a new job setTimeout(this.runJob, 0); diff --git a/packages/beacon-node/src/chain/bls/singleThread.ts b/packages/beacon-node/src/chain/bls/singleThread.ts index 78f3f4bf5200..6895e2225696 100644 --- a/packages/beacon-node/src/chain/bls/singleThread.ts +++ b/packages/beacon-node/src/chain/bls/singleThread.ts @@ -37,9 +37,4 @@ export class BlsSingleThreadVerifier implements IBlsVerifier { async close(): Promise { // nothing to do } - - canAcceptWork(): boolean { - // Since sigs are verified blocking the main thread, there's no mechanism to throttle - return true; - } } diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 599f2d85bc98..99ca83af5e08 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -34,7 +34,7 @@ import {BeaconClock, LocalClock} from "./clock/index.js"; import {ChainEventEmitter, ChainEvent} from "./emitter.js"; import {IBeaconChain, ProposerPreparationData} from "./interface.js"; import {IChainOptions} from "./options.js"; -import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js"; +import {IStateRegenerator, QueuedStateRegenerator, RegenCaller} from "./regen/index.js"; import {initializeForkChoice} from "./forkChoice/index.js"; import {computeAnchorCheckpoint} from "./initState.js"; import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js"; @@ -91,7 +91,7 @@ export class BeaconChain implements IBeaconChain { readonly emitter: ChainEventEmitter; readonly stateCache: StateContextCache; readonly checkpointStateCache: CheckpointStateCache; - readonly regen: QueuedStateRegenerator; + readonly regen: IStateRegenerator; readonly lightClientServer: LightClientServer; readonly reprocessController: ReprocessController; @@ -273,14 +273,6 @@ export class BeaconChain implements IBeaconChain { await this.bls.close(); } - regenCanAcceptWork(): boolean { - return this.regen.canAcceptWork(); - } - - blsThreadPoolCanAcceptWork(): boolean { - return this.bls.canAcceptWork(); - } - validatorSeenAtEpoch(index: ValidatorIndex, epoch: Epoch): boolean { // Caller must check that epoch is not older that current epoch - 1 // else the caches for that epoch may already be pruned. diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index aac24fe76aee..d087d2b2a3ce 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -133,9 +133,6 @@ export interface IBeaconChain { /** Persist bad items to persistInvalidSszObjectsDir dir, for example invalid state, attestations etc. */ persistInvalidSszView(view: TreeView, suffix?: string): void; updateBuilderStatus(clockSlot: Slot): void; - - regenCanAcceptWork(): boolean; - blsThreadPoolCanAcceptWork(): boolean; } export type SSZObjectType = diff --git a/packages/beacon-node/src/chain/regen/queued.ts b/packages/beacon-node/src/chain/regen/queued.ts index b4ee806cfba4..16e2afaa33ea 100644 --- a/packages/beacon-node/src/chain/regen/queued.ts +++ b/packages/beacon-node/src/chain/regen/queued.ts @@ -10,8 +10,6 @@ import {StateRegenerator, RegenModules} from "./regen.js"; import {RegenError, RegenErrorCode} from "./errors.js"; const REGEN_QUEUE_MAX_LEN = 256; -// TODO: Should this constant be lower than above? 256 feels high -const REGEN_CAN_ACCEPT_WORK_THRESHOLD = 16; type QueuedStateRegeneratorModules = RegenModules & { signal: AbortSignal; @@ -48,10 +46,6 @@ export class QueuedStateRegenerator implements IStateRegenerator { this.metrics = modules.metrics; } - canAcceptWork(): boolean { - return this.jobQueue.jobLen < REGEN_CAN_ACCEPT_WORK_THRESHOLD; - } - /** * Get the state to run with `block`. * - State after `block.parentRoot` dialed forward to block.slot diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 2c2f0cc825b3..01db193822a2 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -267,26 +267,10 @@ export function createLodestarMetrics( }), gossipValidationQueueConcurrency: register.gauge<"topic">({ name: "lodestar_gossip_validation_queue_concurrency", - help: "Current count of jobs being run on network processor for topic", + help: "Current concurrency of gossip validation queue", labelNames: ["topic"], }), - networkProcessor: { - executeWorkCalls: register.gauge({ - name: "lodestar_network_processor_execute_work_calls_total", - help: "Total calls to network processor execute work fn", - }), - jobsSubmitted: register.histogram({ - name: "lodestar_network_processor_execute_jobs_submitted_total", - help: "Total calls to network processor execute work fn", - buckets: [0, 1, 5, 128], - }), - canNotAcceptWork: register.gauge({ - name: "lodestar_network_processor_can_not_accept_work_total", - help: "Total times network processor can not accept work on executeWork", - }), - }, - discv5: { decodeEnrAttemptCount: register.counter({ name: "lodestar_discv5_decode_enr_attempt_count", @@ -554,10 +538,6 @@ export function createLodestarMetrics( name: "lodestar_bls_thread_pool_queue_length", help: "Count of total block processor queue length", }), - workersBusy: register.gauge({ - name: "lodestar_bls_thread_pool_workers_busy", - help: "Count of current busy workers", - }), totalJobsGroupsStarted: register.gauge({ name: "lodestar_bls_thread_pool_job_groups_started_total", help: "Count of total jobs groups started in bls thread pool, job groups include +1 jobs", diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index 2111afaa4c04..2973ca60a4e8 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -1,11 +1,9 @@ import {EventEmitter} from "events"; import {PeerId} from "@libp2p/interface-peer-id"; import StrictEventEmitter from "strict-event-emitter-types"; -import {TopicValidatorResult} from "@libp2p/interface-pubsub"; import {phase0} from "@lodestar/types"; import {BlockInput} from "../chain/blocks/types.js"; import {RequestTypedContainer} from "./reqresp/ReqRespBeaconNode.js"; -import {PendingGossipsubMessage} from "./processor/types.js"; export enum NetworkEvent { /** A relevant peer has connected or has been re-STATUS'd */ @@ -16,10 +14,6 @@ export enum NetworkEvent { gossipHeartbeat = "gossipsub.heartbeat", reqRespRequest = "req-resp.request", unknownBlockParent = "unknownBlockParent", - - // Network processor events - pendingGossipsubMessage = "gossip.pendingGossipsubMessage", - gossipMessageValidationResult = "gossip.messageValidationResult", } export type NetworkEvents = { @@ -27,12 +21,6 @@ export type NetworkEvents = { [NetworkEvent.peerDisconnected]: (peer: PeerId) => void; [NetworkEvent.reqRespRequest]: (request: RequestTypedContainer, peer: PeerId) => void; [NetworkEvent.unknownBlockParent]: (blockInput: BlockInput, peerIdStr: string) => void; - [NetworkEvent.pendingGossipsubMessage]: (data: PendingGossipsubMessage) => void; - [NetworkEvent.gossipMessageValidationResult]: ( - msgId: string, - propagationSource: PeerId, - acceptance: TopicValidatorResult - ) => void; }; export type INetworkEventBus = StrictEventEmitter; diff --git a/packages/beacon-node/src/network/gossip/gossipsub.ts b/packages/beacon-node/src/network/gossip/gossipsub.ts index 06740c7e7829..bd66d4952c07 100644 --- a/packages/beacon-node/src/network/gossip/gossipsub.ts +++ b/packages/beacon-node/src/network/gossip/gossipsub.ts @@ -1,5 +1,3 @@ -import {PeerId} from "@libp2p/interface-peer-id"; -import {TopicValidatorResult} from "@libp2p/interface-pubsub"; import {GossipSub, GossipsubEvents} from "@chainsafe/libp2p-gossipsub"; import {PublishOpts, SignaturePolicy, TopicStr} from "@chainsafe/libp2p-gossipsub/types"; import {PeerScore, PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score"; @@ -17,10 +15,19 @@ import {PeersData} from "../peers/peersData.js"; import {ClientKind} from "../peers/client.js"; import {GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX} from "../../constants/network.js"; import {Libp2p} from "../interface.js"; -import {NetworkEvent, NetworkEventBus} from "../events.js"; -import {GossipBeaconNode, GossipTopic, GossipTopicMap, GossipType, GossipTypeMap} from "./interface.js"; +import { + GossipJobQueues, + GossipTopic, + GossipTopicMap, + GossipType, + GossipTypeMap, + ValidatorFnsByType, + GossipHandlers, + GossipBeaconNode, +} from "./interface.js"; import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js"; import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js"; +import {createValidatorFnsByType} from "./validation/index.js"; import { computeGossipPeerScoreParams, @@ -41,9 +48,10 @@ export type Eth2GossipsubModules = { libp2p: Libp2p; logger: Logger; metrics: Metrics | null; + signal: AbortSignal; eth2Context: Eth2Context; + gossipHandlers: GossipHandlers; peersData: PeersData; - events: NetworkEventBus; }; export type Eth2GossipsubOpts = { @@ -52,7 +60,6 @@ export type Eth2GossipsubOpts = { gossipsubDLow?: number; gossipsubDHigh?: number; gossipsubAwaitHandler?: boolean; - skipParamsLog?: boolean; }; /** @@ -69,21 +76,23 @@ export type Eth2GossipsubOpts = { * See https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub */ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode { + readonly jobQueues: GossipJobQueues; readonly scoreParams: Partial; private readonly config: BeaconConfig; private readonly logger: Logger; private readonly peersData: PeersData; - private readonly events: NetworkEventBus; // Internal caches private readonly gossipTopicCache: GossipTopicCache; + private readonly validatorFnsByType: ValidatorFnsByType; + constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) { const {allowPublishToZeroPeers, gossipsubD, gossipsubDLow, gossipsubDHigh} = opts; const gossipTopicCache = new GossipTopicCache(modules.config); const scoreParams = computeGossipPeerScoreParams(modules); - const {config, logger, metrics, peersData, events} = modules; + const {config, logger, metrics, signal, gossipHandlers, peersData} = modules; // Gossipsub parameters defined here: // https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub @@ -128,21 +137,29 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode { this.config = config; this.logger = logger; this.peersData = peersData; - this.events = events; this.gossipTopicCache = gossipTopicCache; + // Note: We use the validator functions as handlers. No handler will be registered to gossipsub. + // libp2p-js layer will emit the message to an EventEmitter that won't be listened by anyone. + // TODO: Force to ensure there's a validatorFunction attached to every received topic. + const {validatorFnsByType, jobQueues} = createValidatorFnsByType(gossipHandlers, { + config, + logger, + metrics, + signal, + }); + this.validatorFnsByType = validatorFnsByType; + this.jobQueues = jobQueues; + if (metrics) { metrics.gossipMesh.peersByType.addCollect(() => this.onScrapeLodestarMetrics(metrics)); } this.addEventListener("gossipsub:message", this.onGossipsubMessage.bind(this)); - this.events.on(NetworkEvent.gossipMessageValidationResult, this.onValidationResult.bind(this)); // Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted. // Scoring issues require this dump + current peer score stats to re-calculate scores. - if (!opts.skipParamsLog) { - this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)}); - } + this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)}); } /** @@ -405,19 +422,14 @@ export class Eth2Gossipsub extends GossipSub implements GossipBeaconNode { // Get seenTimestamp before adding the message to the queue or add async delays const seenTimestampSec = Date.now() / 1000; - // Emit message to network processor - this.events.emit(NetworkEvent.pendingGossipsubMessage, { - topic, - msg, - msgId, - propagationSource, - seenTimestampSec, - startProcessUnixSec: null, - }); - } - - private onValidationResult(msgId: string, propagationSource: PeerId, acceptance: TopicValidatorResult): void { - this.reportMessageValidationResult(msgId, propagationSource, acceptance); + // Puts object in queue, validates, then processes + this.validatorFnsByType[topic.type](topic, msg, propagationSource.toString(), seenTimestampSec) + .then((acceptance) => { + this.reportMessageValidationResult(msgId, propagationSource, acceptance); + }) + .catch((e) => { + this.logger.error("Error onGossipsubMessage", {}, e); + }); } } diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/gossip/handlers/index.ts similarity index 93% rename from packages/beacon-node/src/network/processor/gossipHandlers.ts rename to packages/beacon-node/src/network/gossip/handlers/index.ts index c46f547f13c9..585b778dbe89 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/gossip/handlers/index.ts @@ -4,9 +4,9 @@ import {BeaconConfig} from "@lodestar/config"; import {Logger, prettyBytes} from "@lodestar/utils"; import {phase0, Root, Slot, ssz} from "@lodestar/types"; import {ForkName, ForkSeq} from "@lodestar/params"; -import {Metrics} from "../../metrics/index.js"; -import {OpSource} from "../../metrics/validatorMonitor.js"; -import {IBeaconChain} from "../../chain/index.js"; +import {Metrics} from "../../../metrics/index.js"; +import {OpSource} from "../../../metrics/validatorMonitor.js"; +import {IBeaconChain} from "../../../chain/index.js"; import { AttestationError, AttestationErrorCode, @@ -16,8 +16,8 @@ import { GossipAction, GossipActionError, SyncCommitteeError, -} from "../../chain/errors/index.js"; -import {GossipHandlers, GossipType} from "../gossip/interface.js"; +} from "../../../chain/errors/index.js"; +import {GossipHandlers, GossipType} from "../interface.js"; import { validateGossipAggregateAndProof, validateGossipAttestation, @@ -28,14 +28,14 @@ import { validateSyncCommitteeGossipContributionAndProof, validateGossipVoluntaryExit, validateBlsToExecutionChange, -} from "../../chain/validation/index.js"; -import {NetworkEvent, NetworkEventBus} from "../events.js"; -import {PeerAction, PeerRpcScoreStore} from "../peers/index.js"; -import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js"; -import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js"; -import {validateGossipBlobsSidecar} from "../../chain/validation/blobsSidecar.js"; -import {BlockInput, getBlockInput} from "../../chain/blocks/types.js"; -import {AttnetsService} from "../subnets/attnetsService.js"; +} from "../../../chain/validation/index.js"; +import {NetworkEvent, NetworkEventBus} from "../../events.js"; +import {PeerAction, PeerRpcScoreStore} from "../../peers/index.js"; +import {validateLightClientFinalityUpdate} from "../../../chain/validation/lightClientFinalityUpdate.js"; +import {validateLightClientOptimisticUpdate} from "../../../chain/validation/lightClientOptimisticUpdate.js"; +import {validateGossipBlobsSidecar} from "../../../chain/validation/blobsSidecar.js"; +import {BlockInput, getBlockInput} from "../../../chain/blocks/types.js"; +import {AttnetsService} from "../../subnets/attnetsService.js"; /** * Gossip handler options as part of network options @@ -52,13 +52,13 @@ export const defaultGossipHandlerOpts = { dontSendGossipAttestationsToForkchoice: false, }; -export type ValidatorFnsModules = { +type ValidatorFnsModules = { attnetsService: AttnetsService; chain: IBeaconChain; config: BeaconConfig; logger: Logger; metrics: Metrics | null; - events: NetworkEventBus; + networkEventBus: NetworkEventBus; peerRpcScores: PeerRpcScoreStore; }; @@ -79,7 +79,7 @@ const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1; * - Ethereum Consensus gossipsub protocol strictly defined a single topic for message */ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipHandlerOpts): GossipHandlers { - const {attnetsService, chain, config, metrics, events, peerRpcScores, logger} = modules; + const {attnetsService, chain, config, metrics, networkEventBus, peerRpcScores, logger} = modules; async function validateBeaconBlock( blockInput: BlockInput, @@ -109,7 +109,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH if (e instanceof BlockGossipError) { if (e instanceof BlockGossipError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { logger.debug("Gossip block has error", {slot, root: blockHex, code: e.type.code}); - events.emit(NetworkEvent.unknownBlockParent, blockInput, peerIdStr); + networkEventBus.emit(NetworkEvent.unknownBlockParent, blockInput, peerIdStr); } } diff --git a/packages/beacon-node/src/network/gossip/index.ts b/packages/beacon-node/src/network/gossip/index.ts index e76bc9279444..07d3d1310a3c 100644 --- a/packages/beacon-node/src/network/gossip/index.ts +++ b/packages/beacon-node/src/network/gossip/index.ts @@ -1,4 +1,4 @@ export {Eth2Gossipsub} from "./gossipsub.js"; -export {getGossipHandlers} from "../processor/gossipHandlers.js"; +export {getGossipHandlers} from "./handlers/index.js"; export {getCoreTopicsAtFork} from "./topic.js"; export * from "./interface.js"; diff --git a/packages/beacon-node/src/network/processor/gossipValidatorFn.ts b/packages/beacon-node/src/network/gossip/validation/index.ts similarity index 60% rename from packages/beacon-node/src/network/processor/gossipValidatorFn.ts rename to packages/beacon-node/src/network/gossip/validation/index.ts index 00b5b3f4a6e9..4bc07d9405ac 100644 --- a/packages/beacon-node/src/network/processor/gossipValidatorFn.ts +++ b/packages/beacon-node/src/network/gossip/validation/index.ts @@ -1,17 +1,52 @@ import {TopicValidatorResult} from "@libp2p/interface-pubsub"; import {ChainForkConfig} from "@lodestar/config"; -import {Logger} from "@lodestar/utils"; -import {Metrics} from "../../metrics/index.js"; -import {getGossipSSZType} from "../gossip/topic.js"; -import {GossipValidatorFn, GossipHandlers, GossipHandlerFn} from "../gossip/interface.js"; -import {GossipActionError, GossipAction} from "../../chain/errors/index.js"; +import {Logger, mapValues} from "@lodestar/utils"; +import {Metrics} from "../../../metrics/index.js"; +import {getGossipSSZType} from "../topic.js"; +import { + GossipJobQueues, + GossipType, + GossipValidatorFn, + ValidatorFnsByType, + GossipHandlers, + GossipHandlerFn, +} from "../interface.js"; +import {GossipActionError, GossipAction} from "../../../chain/errors/index.js"; +import {createValidationQueues} from "./queue.js"; -export type ValidatorFnModules = { +type ValidatorFnModules = { config: ChainForkConfig; logger: Logger; metrics: Metrics | null; }; +/** + * Returns GossipValidatorFn for each GossipType, given GossipHandlerFn indexed by type. + * + * @see getGossipHandlers for reasoning on why GossipHandlerFn are used for gossip validation. + */ +export function createValidatorFnsByType( + gossipHandlers: GossipHandlers, + modules: ValidatorFnModules & {signal: AbortSignal} +): {validatorFnsByType: ValidatorFnsByType; jobQueues: GossipJobQueues} { + const gossipValidatorFns = mapValues(gossipHandlers, (gossipHandler, type) => { + return getGossipValidatorFn(gossipHandler, type, modules); + }); + + const jobQueues = createValidationQueues(gossipValidatorFns, modules.signal, modules.metrics); + + const validatorFnsByType = mapValues( + jobQueues, + (jobQueue): GossipValidatorFn => { + return async function gossipValidatorFnWithQueue(topic, gossipMsg, propagationSource, seenTimestampSec) { + return jobQueue.push(topic, gossipMsg, propagationSource, seenTimestampSec); + }; + } + ); + + return {jobQueues, validatorFnsByType}; +} + /** * Returns a GossipSub validator function from a GossipHandlerFn. GossipHandlerFn may throw GossipActionError if one * or more validation conditions from the consensus-specs#p2p-interface are not satisfied. @@ -26,12 +61,14 @@ export type ValidatorFnModules = { * * @see getGossipHandlers for reasoning on why GossipHandlerFn are used for gossip validation. */ -export function getGossipValidatorFn(gossipHandlers: GossipHandlers, modules: ValidatorFnModules): GossipValidatorFn { +function getGossipValidatorFn( + gossipHandler: GossipHandlers[K], + type: K, + modules: ValidatorFnModules +): GossipValidatorFn { const {logger, metrics} = modules; return async function gossipValidatorFn(topic, msg, propagationSource, seenTimestampSec) { - const type = topic.type; - // Define in scope above try {} to be used in catch {} if object was parsed let gossipObject; try { @@ -44,7 +81,7 @@ export function getGossipValidatorFn(gossipHandlers: GossipHandlers, modules: Va return TopicValidatorResult.Reject; } - await (gossipHandlers[topic.type] as GossipHandlerFn)(gossipObject, topic, propagationSource, seenTimestampSec); + await (gossipHandler as GossipHandlerFn)(gossipObject, topic, propagationSource, seenTimestampSec); metrics?.gossipValidationAccept.inc({topic: type}); diff --git a/packages/beacon-node/src/network/gossip/validation/onAccept.ts b/packages/beacon-node/src/network/gossip/validation/onAccept.ts new file mode 100644 index 000000000000..810e7ccfd64b --- /dev/null +++ b/packages/beacon-node/src/network/gossip/validation/onAccept.ts @@ -0,0 +1,15 @@ +import {ChainForkConfig} from "@lodestar/config"; +import {GossipType, GossipTypeMap, GossipTopicTypeMap} from "../interface.js"; + +export type GetGossipAcceptMetadataFn = ( + config: ChainForkConfig, + object: GossipTypeMap[GossipType], + topic: GossipTopicTypeMap[GossipType] +) => Record; +export type GetGossipAcceptMetadataFns = { + [K in GossipType]: ( + config: ChainForkConfig, + object: GossipTypeMap[K], + topic: GossipTopicTypeMap[K] + ) => Record; +}; diff --git a/packages/beacon-node/src/network/processor/gossipQueues.ts b/packages/beacon-node/src/network/gossip/validation/queue.ts similarity index 61% rename from packages/beacon-node/src/network/processor/gossipQueues.ts rename to packages/beacon-node/src/network/gossip/validation/queue.ts index bd4f85cc1761..fbc43dadfca0 100644 --- a/packages/beacon-node/src/network/processor/gossipQueues.ts +++ b/packages/beacon-node/src/network/gossip/validation/queue.ts @@ -1,89 +1,33 @@ import {mapValues} from "@lodestar/utils"; -import {LinkedList} from "../../util/array.js"; -import {GossipType} from "../gossip/interface.js"; - -enum QueueType { - FIFO = "FIFO", - LIFO = "LIFO", -} +import {Metrics} from "../../../metrics/index.js"; +import {JobItemQueue, JobQueueOpts, QueueType} from "../../../util/queue/index.js"; +import {GossipJobQueues, GossipType, GossipValidatorFn, ResolvedType, ValidatorFnsByType} from "../interface.js"; /** * Numbers from https://github.com/sigp/lighthouse/blob/b34a79dc0b02e04441ba01fd0f304d1e203d877d/beacon_node/network/src/beacon_processor/mod.rs#L69 */ const gossipQueueOpts: { - [K in GossipType]: GossipQueueOpts; + [K in GossipType]: Pick; } = { // validation gossip block asap - [GossipType.beacon_block]: {maxLength: 1024, type: QueueType.FIFO}, + [GossipType.beacon_block]: {maxLength: 1024, type: QueueType.FIFO, noYieldIfOneItem: true}, // TODO DENEB: What's a good queue max given that now blocks are much bigger? - [GossipType.beacon_block_and_blobs_sidecar]: {maxLength: 32, type: QueueType.FIFO}, + [GossipType.beacon_block_and_blobs_sidecar]: {maxLength: 32, type: QueueType.FIFO, noYieldIfOneItem: true}, // lighthoue has aggregate_queue 4096 and unknown_block_aggregate_queue 1024, we use single queue - [GossipType.beacon_aggregate_and_proof]: {maxLength: 5120, type: QueueType.LIFO}, + [GossipType.beacon_aggregate_and_proof]: {maxLength: 5120, type: QueueType.LIFO, maxConcurrency: 16}, // lighthouse has attestation_queue 16384 and unknown_block_attestation_queue 8192, we use single queue - [GossipType.beacon_attestation]: {maxLength: 24576, type: QueueType.LIFO}, + [GossipType.beacon_attestation]: {maxLength: 24576, type: QueueType.LIFO, maxConcurrency: 64}, [GossipType.voluntary_exit]: {maxLength: 4096, type: QueueType.FIFO}, [GossipType.proposer_slashing]: {maxLength: 4096, type: QueueType.FIFO}, [GossipType.attester_slashing]: {maxLength: 4096, type: QueueType.FIFO}, - [GossipType.sync_committee_contribution_and_proof]: {maxLength: 4096, type: QueueType.LIFO}, - [GossipType.sync_committee]: {maxLength: 4096, type: QueueType.LIFO}, + [GossipType.sync_committee_contribution_and_proof]: {maxLength: 4096, type: QueueType.LIFO, maxConcurrency: 16}, + [GossipType.sync_committee]: {maxLength: 4096, type: QueueType.LIFO, maxConcurrency: 64}, [GossipType.light_client_finality_update]: {maxLength: 1024, type: QueueType.FIFO}, [GossipType.light_client_optimistic_update]: {maxLength: 1024, type: QueueType.FIFO}, // lighthouse has bls changes queue set to their max 16384 to handle large spike at capella [GossipType.bls_to_execution_change]: {maxLength: 16384, type: QueueType.FIFO}, }; -type GossipQueueOpts = { - type: QueueType; - maxLength: number; -}; - -export class GossipQueue { - private readonly list = new LinkedList(); - - constructor(private readonly opts: GossipQueueOpts) {} - - get length(): number { - return this.list.length; - } - - clear(): void { - this.list.clear(); - } - - add(item: T): T | null { - let droppedItem: T | null = null; - - if (this.list.length + 1 > this.opts.maxLength) { - // LIFO -> keep latest job, drop oldest, FIFO -> drop latest job - switch (this.opts.type) { - case QueueType.LIFO: - droppedItem = this.list.shift(); - break; - case QueueType.FIFO: - return item; - } - } - - this.list.push(item); - - return droppedItem; - } - - next(): T | null { - // LIFO -> pop() remove last item, FIFO -> shift() remove first item - switch (this.opts.type) { - case QueueType.LIFO: - return this.list.pop(); - case QueueType.FIFO: - return this.list.shift(); - } - } - - getAll(): T[] { - return this.list.toArray(); - } -} - /** * Wraps a GossipValidatorFn with a queue, to limit the processing of gossip objects by type. * @@ -100,8 +44,25 @@ export class GossipQueue { * By topic is too specific, so by type groups all similar objects in the same queue. All in the same won't allow * to customize different queue behaviours per object type (see `gossipQueueOpts`). */ -export function createGossipQueues(): {[K in GossipType]: GossipQueue} { - return mapValues(gossipQueueOpts, (opts) => { - return new GossipQueue(opts); +export function createValidationQueues( + gossipValidatorFns: ValidatorFnsByType, + signal: AbortSignal, + metrics: Metrics | null +): GossipJobQueues { + return mapValues(gossipQueueOpts, (opts, type) => { + const gossipValidatorFn = gossipValidatorFns[type]; + return new JobItemQueue, ResolvedType>( + gossipValidatorFn, + {signal, ...opts}, + metrics + ? { + length: metrics.gossipValidationQueueLength.child({topic: type}), + droppedJobs: metrics.gossipValidationQueueDroppedJobs.child({topic: type}), + jobTime: metrics.gossipValidationQueueJobTime.child({topic: type}), + jobWaitTime: metrics.gossipValidationQueueJobWaitTime.child({topic: type}), + concurrency: metrics.gossipValidationQueueConcurrency.child({topic: type}), + } + : undefined + ); }); } diff --git a/packages/beacon-node/src/network/interface.ts b/packages/beacon-node/src/network/interface.ts index 93ca0cec65c6..2711eef83d4b 100644 --- a/packages/beacon-node/src/network/interface.ts +++ b/packages/beacon-node/src/network/interface.ts @@ -10,11 +10,10 @@ import {PeerScoreStatsDump} from "@chainsafe/libp2p-gossipsub/score"; import {routes} from "@lodestar/api"; import {BlockInput} from "../chain/blocks/types.js"; import {INetworkEventBus} from "./events.js"; -import {GossipBeaconNode, GossipType} from "./gossip/index.js"; +import {GossipBeaconNode} from "./gossip/index.js"; import {PeerAction, PeerScoreStats} from "./peers/index.js"; import {IReqRespBeaconNode} from "./reqresp/ReqRespBeaconNode.js"; import {CommitteeSubscription} from "./subnets/index.js"; -import {PendingGossipsubMessage} from "./processor/types.js"; export type PeerSearchOptions = { supportsProtocols?: string[]; @@ -62,7 +61,7 @@ export interface INetwork { dumpPeer(peerIdStr: string): Promise; dumpPeerScoreStats(): Promise; dumpGossipPeerScoreStats(): Promise; - dumpGossipQueue(gossipType: GossipType): Promise; + dumpGossipQueueItems(gossipType: string): Promise; dumpDiscv5KadValues(): Promise; } diff --git a/packages/beacon-node/src/network/network.ts b/packages/beacon-node/src/network/network.ts index cef0090ae669..f7d8ce534c7c 100644 --- a/packages/beacon-node/src/network/network.ts +++ b/packages/beacon-node/src/network/network.ts @@ -20,6 +20,7 @@ import {ReqRespBeaconNode, ReqRespHandlers, beaconBlocksMaybeBlobsByRange} from import {beaconBlocksMaybeBlobsByRoot} from "./reqresp/beaconBlocksMaybeBlobsByRoot.js"; import { Eth2Gossipsub, + getGossipHandlers, GossipHandlers, GossipTopicTypeMap, GossipType, @@ -36,8 +37,6 @@ import {PeersData} from "./peers/peersData.js"; import {getConnectionsMap, isPublishToZeroPeersError} from "./util.js"; import {Discv5Worker} from "./discv5/index.js"; import {createNodeJsLibp2p} from "./nodejs/util.js"; -import {NetworkProcessor} from "./processor/index.js"; -import {PendingGossipsubMessage} from "./processor/types.js"; // How many changes to batch cleanup const CACHED_BLS_BATCH_CLEANUP_LIMIT = 10; @@ -51,7 +50,6 @@ type NetworkModules = { signal: AbortSignal; peersData: PeersData; networkEventBus: NetworkEventBus; - networkProcessor: NetworkProcessor; metadata: MetadataController; peerRpcScores: PeerRpcScoreStore; reqResp: ReqRespBeaconNode; @@ -86,7 +84,6 @@ export class Network implements INetwork { private readonly opts: NetworkOptions; private readonly peersData: PeersData; - private readonly networkProcessor: NetworkProcessor; private readonly peerManager: PeerManager; private readonly libp2p: Libp2p; private readonly logger: Logger; @@ -109,7 +106,6 @@ export class Network implements INetwork { signal, peersData, networkEventBus, - networkProcessor, metadata, peerRpcScores, reqResp, @@ -127,7 +123,7 @@ export class Network implements INetwork { this.signal = signal; this.peersData = peersData; this.events = networkEventBus; - (this.networkProcessor = networkProcessor), (this.metadata = metadata); + this.metadata = metadata; this.peerRpcScores = peerRpcScores; this.reqResp = reqResp; this.gossip = gossip; @@ -150,8 +146,8 @@ export class Network implements INetwork { peerStoreDir, chain, reqRespHandlers, - signal, gossipHandlers, + signal, }: NetworkInitModules): Promise { const clock = chain.clock; const peersData = new PeersData(); @@ -200,13 +196,16 @@ export class Network implements INetwork { libp2p, logger, metrics, + signal, + gossipHandlers: + gossipHandlers ?? + getGossipHandlers({chain, config, logger, attnetsService, peerRpcScores, networkEventBus, metrics}, opts), eth2Context: { activeValidatorCount: chain.getHeadState().epochCtx.currentShuffling.activeIndices.length, currentSlot: clock.currentSlot, currentEpoch: clock.currentEpoch, }, peersData, - events: networkEventBus, }); const syncnetsService = new SyncnetsService(config, chain, gossip, metadata, logger, metrics, opts); @@ -229,11 +228,6 @@ export class Network implements INetwork { opts ); - const networkProcessor = new NetworkProcessor( - {attnetsService, chain, config, logger, metrics, peerRpcScores, events: networkEventBus, gossipHandlers}, - opts - ); - await libp2p.start(); // Network spec decides version changes based on clock fork, not head fork @@ -266,7 +260,6 @@ export class Network implements INetwork { signal, peersData, networkEventBus, - networkProcessor, metadata, peerRpcScores, reqResp, @@ -418,7 +411,9 @@ export class Network implements INetwork { } // Drop all the gossip validation queues - this.networkProcessor.dropAllJobs(); + for (const jobQueue of Object.values(this.gossip.jobQueues)) { + jobQueue.dropAllJobs(); + } } isSubscribedToGossipCoreTopics(): boolean { @@ -450,6 +445,24 @@ export class Network implements INetwork { })); } + async dumpGossipQueueItems(gossipType: string): Promise { + const jobQueue = this.gossip.jobQueues[gossipType as GossipType]; + if (jobQueue === undefined) { + throw Error(`Unknown gossipType ${gossipType}, known values: ${Object.keys(jobQueue).join(", ")}`); + } + + return jobQueue.getItems().map((item) => { + const [topic, message, propagationSource, seenTimestampSec] = item.args; + return { + topic: topic, + propagationSource, + data: message.data, + addedTimeMs: item.addedTimeMs, + seenTimestampSec, + }; + }); + } + async dumpPeerScoreStats(): Promise { return this.peerRpcScores.dumpPeerScoreStats(); } @@ -462,10 +475,6 @@ export class Network implements INetwork { return (await this.discv5?.kadValues())?.map((enr) => enr.encodeTxt()) ?? []; } - async dumpGossipQueue(gossipType: GossipType): Promise { - return this.networkProcessor.dumpGossipQueue(gossipType); - } - /** * Handle subscriptions through fork transitions, @see FORK_EPOCH_LOOKAHEAD */ diff --git a/packages/beacon-node/src/network/options.ts b/packages/beacon-node/src/network/options.ts index 696bb65f3876..e004d671597c 100644 --- a/packages/beacon-node/src/network/options.ts +++ b/packages/beacon-node/src/network/options.ts @@ -1,16 +1,15 @@ import {generateKeypair, IDiscv5DiscoveryInputOptions, KeypairType, SignableENR} from "@chainsafe/discv5"; import {Eth2GossipsubOpts} from "./gossip/gossipsub.js"; -import {defaultGossipHandlerOpts} from "./processor/gossipHandlers.js"; +import {defaultGossipHandlerOpts, GossipHandlerOpts} from "./gossip/handlers/index.js"; import {PeerManagerOpts} from "./peers/index.js"; import {ReqRespBeaconNodeOpts} from "./reqresp/ReqRespBeaconNode.js"; -import {NetworkProcessorOpts} from "./processor/index.js"; // Since Network is eventually intended to be run in a separate thread, ensure that all options are cloneable using structuredClone export interface NetworkOptions extends PeerManagerOpts, // remove all Functions Omit, - NetworkProcessorOpts, + GossipHandlerOpts, Eth2GossipsubOpts { localMultiaddrs: string[]; bootMultiaddrs?: string[]; diff --git a/packages/beacon-node/src/network/processor/index.ts b/packages/beacon-node/src/network/processor/index.ts deleted file mode 100644 index e1f37f8da531..000000000000 --- a/packages/beacon-node/src/network/processor/index.ts +++ /dev/null @@ -1,161 +0,0 @@ -import {Logger, mapValues} from "@lodestar/utils"; -import {IBeaconChain} from "../../chain/interface.js"; -import {Metrics} from "../../metrics/metrics.js"; -import {NetworkEvent, NetworkEventBus} from "../events.js"; -import {GossipType} from "../gossip/interface.js"; -import {createGossipQueues} from "./gossipQueues.js"; -import {NetworkWorker, NetworkWorkerModules} from "./worker.js"; -import {PendingGossipsubMessage} from "./types.js"; -import {ValidatorFnsModules, GossipHandlerOpts} from "./gossipHandlers.js"; - -export type NetworkProcessorModules = NetworkWorkerModules & - ValidatorFnsModules & { - chain: IBeaconChain; - events: NetworkEventBus; - logger: Logger; - metrics: Metrics | null; - }; - -export type NetworkProcessorOpts = GossipHandlerOpts & { - maxGossipTopicConcurrency?: number; -}; - -const executeGossipWorkOrderObj: Record = { - [GossipType.beacon_block]: true, - [GossipType.beacon_block_and_blobs_sidecar]: true, - [GossipType.beacon_aggregate_and_proof]: true, - [GossipType.beacon_attestation]: true, - [GossipType.voluntary_exit]: true, - [GossipType.proposer_slashing]: true, - [GossipType.attester_slashing]: true, - [GossipType.sync_committee_contribution_and_proof]: true, - [GossipType.sync_committee]: true, - [GossipType.light_client_finality_update]: true, - [GossipType.light_client_optimistic_update]: true, - [GossipType.bls_to_execution_change]: true, -}; -const executeGossipWorkOrder = Object.keys(executeGossipWorkOrderObj) as (keyof typeof executeGossipWorkOrderObj)[]; - -// TODO: Arbitrary constant, check metrics -const MAX_JOBS_SUBMITTED_PER_TICK = 128; - -/** - * Network processor handles the gossip queues and throtles processing to not overload the main thread - * - Decides when to process work and what to process - * - * What triggers execute work? - * - * - When work is submitted - * - When downstream workers become available - * - * ### PendingGossipsubMessage beacon_attestation example - * - * For attestations, processing the message includes the steps: - * 1. Pre shuffling sync validation - * 2. Retrieve shuffling: async + goes into the regen queue and can be expensive - * 3. Pre sig validation sync validation - * 4. Validate BLS signature: async + goes into workers through another manager - * - * The gossip queues should receive "backpressue" from the regen and BLS workers queues. - * Such that enough work is processed to fill either one of the queue. - */ -export class NetworkProcessor { - private readonly worker: NetworkWorker; - private readonly chain: IBeaconChain; - private readonly logger: Logger; - private readonly metrics: Metrics | null; - private readonly gossipQueues = createGossipQueues(); - private readonly gossipTopicConcurrency = mapValues(this.gossipQueues, () => 0); - - constructor(modules: NetworkProcessorModules, private readonly opts: NetworkProcessorOpts) { - const {chain, events, logger, metrics} = modules; - this.chain = chain; - this.metrics = metrics; - this.logger = logger; - this.worker = new NetworkWorker(modules, opts); - - events.on(NetworkEvent.pendingGossipsubMessage, this.onPendingGossipsubMessage.bind(this)); - - if (metrics) { - metrics.gossipValidationQueueLength.addCollect(() => { - for (const topic of executeGossipWorkOrder) { - metrics.gossipValidationQueueLength.set({topic}, this.gossipQueues[topic].length); - metrics.gossipValidationQueueConcurrency.set({topic}, this.gossipTopicConcurrency[topic]); - } - }); - } - - // TODO: Pull new work when available - // this.bls.onAvailable(() => this.executeWork()); - // this.regen.onAvailable(() => this.executeWork()); - } - - dropAllJobs(): void { - for (const topic of executeGossipWorkOrder) { - this.gossipQueues[topic].clear(); - } - } - - dumpGossipQueue(topic: GossipType): PendingGossipsubMessage[] { - const queue = this.gossipQueues[topic]; - if (queue === undefined) { - throw Error(`Unknown gossipType ${topic}, known values: ${Object.keys(this.gossipQueues).join(", ")}`); - } - - return queue.getAll(); - } - - private onPendingGossipsubMessage(data: PendingGossipsubMessage): void { - const droppedJob = this.gossipQueues[data.topic.type].add(data); - if (droppedJob) { - // TODO: Should report the dropped job to gossip? It will be eventually pruned from the mcache - this.metrics?.gossipValidationQueueDroppedJobs.inc({topic: data.topic.type}); - } - - // Tentatively perform work - this.executeWork(); - } - - private executeWork(): void { - // TODO: Maybe de-bounce by timing the last time executeWork was run - - this.metrics?.networkProcessor.executeWorkCalls.inc(); - let jobsSubmitted = 0; - - job_loop: while (jobsSubmitted < MAX_JOBS_SUBMITTED_PER_TICK) { - // Check canAcceptWork before calling queue.next() since it consumes the items - if (!this.chain.blsThreadPoolCanAcceptWork() || !this.chain.regenCanAcceptWork()) { - this.metrics?.networkProcessor.canNotAcceptWork.inc(); - break; - } - - for (const topic of executeGossipWorkOrder) { - if ( - this.opts.maxGossipTopicConcurrency !== undefined && - this.gossipTopicConcurrency[topic] > this.opts.maxGossipTopicConcurrency - ) { - // Reached concurrency limit for topic, continue to next topic - continue; - } - - const item = this.gossipQueues[topic].next(); - if (item) { - this.gossipTopicConcurrency[topic]++; - this.worker - .processPendingGossipsubMessage(item) - .finally(() => this.gossipTopicConcurrency[topic]--) - .catch((e) => this.logger.error("processGossipAttestations must not throw", {}, e)); - - jobsSubmitted++; - // Attempt to find more work, but check canAcceptWork() again and run executeGossipWorkOrder priorization - continue job_loop; - } - } - - // No item of work available on all queues, break off job_loop - break; - } - - this.metrics?.networkProcessor.jobsSubmitted.observe(jobsSubmitted); - } -} diff --git a/packages/beacon-node/src/network/processor/types.ts b/packages/beacon-node/src/network/processor/types.ts deleted file mode 100644 index ebff7e68d437..000000000000 --- a/packages/beacon-node/src/network/processor/types.ts +++ /dev/null @@ -1,17 +0,0 @@ -import {PeerId} from "@libp2p/interface-peer-id"; -import {Message} from "@libp2p/interface-pubsub"; -import {GossipTopic} from "../gossip/index.js"; - -export type GossipAttestationsWork = { - messages: PendingGossipsubMessage[]; -}; - -export type PendingGossipsubMessage = { - topic: GossipTopic; - msg: Message; - msgId: string; - // TODO: Refactor into accepting string (requires gossipsub changes) for easier multi-threading - propagationSource: PeerId; - seenTimestampSec: number; - startProcessUnixSec: number | null; -}; diff --git a/packages/beacon-node/src/network/processor/worker.ts b/packages/beacon-node/src/network/processor/worker.ts deleted file mode 100644 index 64edfbf37077..000000000000 --- a/packages/beacon-node/src/network/processor/worker.ts +++ /dev/null @@ -1,52 +0,0 @@ -import {IBeaconChain} from "../../chain/interface.js"; -import {Metrics} from "../../metrics/metrics.js"; -import {NetworkEvent, NetworkEventBus} from "../events.js"; -import {GossipHandlers, GossipValidatorFn} from "../gossip/interface.js"; -import {getGossipHandlers, GossipHandlerOpts, ValidatorFnsModules} from "./gossipHandlers.js"; -import {getGossipValidatorFn, ValidatorFnModules} from "./gossipValidatorFn.js"; -import {PendingGossipsubMessage} from "./types.js"; - -export type NetworkWorkerModules = ValidatorFnsModules & - ValidatorFnModules & { - chain: IBeaconChain; - events: NetworkEventBus; - metrics: Metrics | null; - // Optionally pass custom GossipHandlers, for testing - gossipHandlers?: GossipHandlers; - }; - -export class NetworkWorker { - private readonly events: NetworkEventBus; - private readonly metrics: Metrics | null; - private readonly gossipValidatorFn: GossipValidatorFn; - - constructor(modules: NetworkWorkerModules, opts: GossipHandlerOpts) { - this.events = modules.events; - this.metrics = modules.metrics; - this.gossipValidatorFn = getGossipValidatorFn(modules.gossipHandlers ?? getGossipHandlers(modules, opts), modules); - } - - async processPendingGossipsubMessage(message: PendingGossipsubMessage): Promise { - message.startProcessUnixSec = Date.now() / 1000; - - const acceptance = await this.gossipValidatorFn( - message.topic, - message.msg, - message.propagationSource.toString(), - message.seenTimestampSec - ); - - if (message.startProcessUnixSec !== null) { - this.metrics?.gossipValidationQueueJobWaitTime.observe( - {topic: message.topic.type}, - message.startProcessUnixSec - message.seenTimestampSec - ); - this.metrics?.gossipValidationQueueJobTime.observe( - {topic: message.topic.type}, - Date.now() / 1000 - message.startProcessUnixSec - ); - } - - this.events.emit(NetworkEvent.gossipMessageValidationResult, message.msgId, message.propagationSource, acceptance); - } -} diff --git a/packages/beacon-node/src/util/queue/itemQueue.ts b/packages/beacon-node/src/util/queue/itemQueue.ts index 46bb2b62f55b..802a9b0e84ec 100644 --- a/packages/beacon-node/src/util/queue/itemQueue.ts +++ b/packages/beacon-node/src/util/queue/itemQueue.ts @@ -41,10 +41,6 @@ export class JobItemQueue { } } - get jobLen(): number { - return this.jobs.length; - } - push(...args: Args): Promise { if (this.opts.signal.aborted) { throw new QueueError({code: QueueErrorCode.QUEUE_ABORTED}); diff --git a/packages/beacon-node/test/e2e/network/gossipsub.test.ts b/packages/beacon-node/test/e2e/network/gossipsub.test.ts index ee1bb71b2a75..ae80c66d9a2b 100644 --- a/packages/beacon-node/test/e2e/network/gossipsub.test.ts +++ b/packages/beacon-node/test/e2e/network/gossipsub.test.ts @@ -5,7 +5,7 @@ import {capella, phase0, ssz, allForks} from "@lodestar/types"; import {sleep} from "@lodestar/utils"; import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; -import {getReqRespHandlers, Network, NetworkInitModules} from "../../../src/network/index.js"; +import {getReqRespHandlers, Network} from "../../../src/network/index.js"; import {defaultNetworkOptions, NetworkOptions} from "../../../src/network/options.js"; import {GossipType, GossipHandlers} from "../../../src/network/gossip/index.js"; @@ -25,7 +25,6 @@ const opts: NetworkOptions = { localMultiaddrs: [], discv5FirstQueryDelayMs: 0, discv5: null, - skipParamsLog: true, }; // Schedule all forks at ALTAIR_FORK_EPOCH to avoid generating the pubkeys cache @@ -87,9 +86,10 @@ describe("gossipsub", function () { const loggerA = testLogger("A"); const loggerB = testLogger("B"); - const modules: Omit = { + const modules = { config: beaconConfig, chain, + db, reqRespHandlers, gossipHandlers, signal: controller.signal, diff --git a/packages/beacon-node/test/unit/chain/validation/block.test.ts b/packages/beacon-node/test/unit/chain/validation/block.test.ts index c3a4326c7056..edcd321a8e88 100644 --- a/packages/beacon-node/test/unit/chain/validation/block.test.ts +++ b/packages/beacon-node/test/unit/chain/validation/block.test.ts @@ -44,7 +44,7 @@ describe("gossip block validation", function () { verifySignature = sinon.stub(); verifySignature.resolves(true); - chain.bls = {verifySignatureSets: verifySignature, close: () => Promise.resolve(), canAcceptWork: () => true}; + chain.bls = {verifySignatureSets: verifySignature, close: () => Promise.resolve()}; forkChoice.getFinalizedCheckpoint.returns({epoch: 0, root: ZERO_HASH, rootHex: ""}); diff --git a/packages/beacon-node/test/unit/network/processorQueues.test.ts b/packages/beacon-node/test/unit/network/processorQueues.test.ts deleted file mode 100644 index 0c272159e51a..000000000000 --- a/packages/beacon-node/test/unit/network/processorQueues.test.ts +++ /dev/null @@ -1,108 +0,0 @@ -import {expect} from "chai"; -import {sleep} from "@lodestar/utils"; - -type ValidateOpts = { - skipAsync1: boolean; - skipAsync2: boolean; -}; - -async function validateTest(job: string, tracker: string[], opts: ValidateOpts): Promise { - tracker.push(`job:${job} step:0`); - - await getStateFromCache(opts.skipAsync1); - tracker.push(`job:${job} step:1`); - - if (!opts.skipAsync2) { - await sleep(0); - } - tracker.push(`job:${job} step:2`); -} - -async function getStateFromCache(retrieveSync: boolean): Promise { - if (retrieveSync) { - return 1; - } else { - await sleep(0); - return 2; - } -} - -describe("event loop with branching async", () => { - const eachAwaitPointHoldsJobs = [ - "job:0 step:0", - "job:1 step:0", - "job:2 step:0", - "job:0 step:1", - "job:1 step:1", - "job:2 step:1", - "job:0 step:2", - "job:1 step:2", - "job:2 step:2", - ]; - - const onlyStartOfStep1HoldsJobs = [ - "job:0 step:0", - "job:1 step:0", - "job:2 step:0", - "job:0 step:1", - "job:0 step:2", - "job:1 step:1", - "job:1 step:2", - "job:2 step:1", - "job:2 step:2", - ]; - - const eachJobCompletesInSequence = [ - "job:0 step:0", - "job:0 step:1", - "job:0 step:2", - "job:1 step:0", - "job:1 step:1", - "job:1 step:2", - "job:2 step:0", - "job:2 step:1", - "job:2 step:2", - ]; - - const testCases: {opts: ValidateOpts; expectedTrackerVoid: string[]; expectedTrackerAwait: string[]}[] = [ - { - opts: {skipAsync1: false, skipAsync2: false}, - expectedTrackerVoid: eachAwaitPointHoldsJobs, - expectedTrackerAwait: eachJobCompletesInSequence, - }, - { - opts: {skipAsync1: true, skipAsync2: false}, - expectedTrackerVoid: eachAwaitPointHoldsJobs, - expectedTrackerAwait: eachJobCompletesInSequence, - }, - { - opts: {skipAsync1: false, skipAsync2: true}, - expectedTrackerVoid: onlyStartOfStep1HoldsJobs, - expectedTrackerAwait: eachJobCompletesInSequence, - }, - { - opts: {skipAsync1: true, skipAsync2: true}, - expectedTrackerVoid: onlyStartOfStep1HoldsJobs, - expectedTrackerAwait: eachJobCompletesInSequence, - }, - ]; - - for (const {opts, expectedTrackerVoid, expectedTrackerAwait} of testCases) { - const jobs: string[] = []; - for (let i = 0; i < 3; i++) jobs.push(String(i)); - - it(`${JSON.stringify(opts)} Promise.all`, async () => { - const tracker: string[] = []; - await Promise.all(jobs.map((job) => validateTest(job, tracker, opts))); - expect(tracker).deep.equals(expectedTrackerVoid); - }); - - it(`${JSON.stringify(opts)} await each`, async () => { - const tracker: string[] = []; - for (const job of jobs) { - await validateTest(job, tracker, opts); - } - expect(tracker).deep.equals(expectedTrackerAwait); - }); - } -}); diff --git a/packages/beacon-node/test/utils/mocks/bls.ts b/packages/beacon-node/test/utils/mocks/bls.ts index 57e84d509fc7..0013a2d49ead 100644 --- a/packages/beacon-node/test/utils/mocks/bls.ts +++ b/packages/beacon-node/test/utils/mocks/bls.ts @@ -10,8 +10,4 @@ export class BlsVerifierMock implements IBlsVerifier { async close(): Promise { // } - - canAcceptWork(): boolean { - return true; - } } diff --git a/packages/beacon-node/test/utils/mocks/chain/chain.ts b/packages/beacon-node/test/utils/mocks/chain/chain.ts index 9446a8eba518..eb5693871084 100644 --- a/packages/beacon-node/test/utils/mocks/chain/chain.ts +++ b/packages/beacon-node/test/utils/mocks/chain/chain.ts @@ -225,14 +225,6 @@ export class MockBeaconChain implements IBeaconChain { async updateBeaconProposerData(): Promise {} updateBuilderStatus(): void {} - - regenCanAcceptWork(): boolean { - return true; - } - - blsThreadPoolCanAcceptWork(): boolean { - return true; - } } const root = ssz.Root.defaultValue() as Uint8Array; diff --git a/packages/cli/src/options/beaconNodeOptions/network.ts b/packages/cli/src/options/beaconNodeOptions/network.ts index 7ed4d916194e..6e9b54e73c17 100644 --- a/packages/cli/src/options/beaconNodeOptions/network.ts +++ b/packages/cli/src/options/beaconNodeOptions/network.ts @@ -23,7 +23,6 @@ export type NetworkArgs = { "network.gossipsubDHigh": number; "network.gossipsubAwaitHandler": boolean; "network.rateLimitMultiplier": number; - "network.maxGossipTopicConcurrency"?: number; /** @deprecated This option is deprecated and should be removed in next major release. */ "network.requestCountPeerLimit": number; @@ -68,7 +67,6 @@ export function parseArgs(args: NetworkArgs): IBeaconNodeOptions["network"] { gossipsubAwaitHandler: args["network.gossipsubAwaitHandler"], mdns: args["mdns"], rateLimitMultiplier: args["network.rateLimitMultiplier"], - maxGossipTopicConcurrency: args["network.maxGossipTopicConcurrency"], }; } @@ -239,10 +237,4 @@ export const options: CliCommandOptions = { defaultDescription: String(defaultOptions.network.rateLimitMultiplier), group: "network", }, - - "network.maxGossipTopicConcurrency": { - type: "number", - hidden: true, - group: "network", - }, }; diff --git a/packages/cli/test/unit/options/beaconNodeOptions.test.ts b/packages/cli/test/unit/options/beaconNodeOptions.test.ts index 080d2436c476..3ce362288f5c 100644 --- a/packages/cli/test/unit/options/beaconNodeOptions.test.ts +++ b/packages/cli/test/unit/options/beaconNodeOptions.test.ts @@ -83,7 +83,6 @@ describe("options / beaconNodeOptions", () => { "network.gossipsubDHigh": 6, "network.gossipsubAwaitHandler": true, "network.rateLimitMultiplier": 1, - "network.maxGossipTopicConcurrency": 64, "sync.isSingleNode": true, "sync.disableProcessAsChainSegment": true, @@ -172,7 +171,6 @@ describe("options / beaconNodeOptions", () => { gossipsubAwaitHandler: true, mdns: false, rateLimitMultiplier: 1, - maxGossipTopicConcurrency: 64, }, sync: { isSingleNode: true,