Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add and track blob source for metrics #6628

Merged
merged 6 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {computeEpochAtSlot, computeTimeAtSlot, reconstructFullBlockOrContents} f
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {sleep, toHex} from "@lodestar/utils";
import {allForks, deneb, isSignedBlockContents, ProducedBlockSource} from "@lodestar/types";
import {BlockSource, getBlockInput, ImportBlockOpts, BlockInput} from "../../../../chain/blocks/types.js";
import {BlockSource, getBlockInput, ImportBlockOpts, BlockInput, BlobsSource} from "../../../../chain/blocks/types.js";
import {promiseAllMaybeAsync} from "../../../../util/promises.js";
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
import {computeBlobSidecars} from "../../../../util/blobs.js";
Expand Down Expand Up @@ -52,6 +52,7 @@ export function getBeaconBlockApi({
signedBlock,
BlockSource.api,
blobSidecars,
BlobsSource.api,
// don't bundle any bytes for block and blobs
null,
blobSidecars.map(() => null)
Expand Down
7 changes: 5 additions & 2 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,18 @@ export async function importBlock(
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
setTimeout(async () => {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

if (blockInput.type === BlockInputType.postDeneb) {
g11tech marked this conversation as resolved.
Show resolved Hide resolved
for (const blobSidecar of blockInput.blobs) {
const {blobsSource, blobs} = blockInput;

this.metrics?.importBlock.blobBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export async function processBlocks(

// Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate
// states in the state cache through regen.
const {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus} =
const {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus, availableBlockInputs} =
await verifyBlocksInEpoch.call(this, parentBlock, relevantBlocks, opts);

// If segmentExecStatus has lvhForkchoice then, the entire segment should be invalid
Expand All @@ -81,7 +81,7 @@ export async function processBlocks(
}

const {executionStatuses} = segmentExecStatus;
const fullyVerifiedBlocks = relevantBlocks.map(
const fullyVerifiedBlocks = availableBlockInputs.map(
(block, i): FullyVerifiedBlock => ({
blockInput: block,
postState: postStates[i],
Expand Down
15 changes: 13 additions & 2 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@ export enum BlockSource {
byRoot = "req_resp_by_root",
}

/** Enum to represent where blobs come from */
export enum BlobsSource {
gossip = "gossip",
api = "api",
byRange = "req_resp_by_range",
byRoot = "req_resp_by_root",
}

export enum GossipedInputType {
block = "block",
blob = "blob",
}

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]; blobsSource: BlobsSource};
type CachedBlobs = {
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
Expand All @@ -34,6 +42,7 @@ type CachedBlobs = {
export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
// the blobsSource here is added to BlockInputBlobs when availability is resolved
| ({type: BlockInputType.blobsPromise} & CachedBlobs)
);
export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise<BlockInput>} & CachedBlobs;
Expand Down Expand Up @@ -69,6 +78,7 @@ export const getBlockInput = {
block: allForks.SignedBeaconBlock,
source: BlockSource,
blobs: deneb.BlobSidecars,
blobsSource: BlobsSource,
blockBytes: Uint8Array | null,
blobsBytes: (Uint8Array | null)[]
): BlockInput {
Expand All @@ -80,6 +90,7 @@ export const getBlockInput = {
block,
source,
blobs,
blobsSource,
blockBytes,
blobsBytes,
};
Expand Down Expand Up @@ -109,7 +120,7 @@ export const getBlockInput = {
},
};

export function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs {
export function getBlockInputBlobs(blobsCache: BlobsCache): Omit<BlockInputBlobs, "blobsSource"> {
const blobs = [];
const blobsBytes = [];

Expand Down
5 changes: 3 additions & 2 deletions packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export async function verifyBlocksInEpoch(
proposerBalanceDeltas: number[];
segmentExecStatus: SegmentExecStatus;
dataAvailabilityStatuses: DataAvailableStatus[];
availableBlockInputs: BlockInput[];
}> {
const blocks = blocksInput.map(({block}) => block);
if (blocks.length === 0) {
Expand Down Expand Up @@ -92,7 +93,7 @@ export async function verifyBlocksInEpoch(
// batch all I/O operations to reduce overhead
const [
segmentExecStatus,
{dataAvailabilityStatuses, availableTime},
{dataAvailabilityStatuses, availableTime, availableBlockInputs},
{postStates, proposerBalanceDeltas, verifyStateTime},
{verifySignaturesTime},
] = await Promise.all([
Expand Down Expand Up @@ -190,7 +191,7 @@ export async function verifyBlocksInEpoch(
}
}

return {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus};
return {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus, availableBlockInputs};
} finally {
abortController.abort();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ import {Logger} from "@lodestar/utils";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation} from "./types.js";
import {
BlockInput,
BlockInputType,
ImportBlockOpts,
BlobSidecarValidation,
getBlockInput,
BlobsSource,
} from "./types.js";

// we can now wait for full 12 seconds because unavailable block sync will try pulling
// the blobs from the network anyway after 500ms of seeing the block
Expand All @@ -27,19 +34,26 @@ export async function verifyBlocksDataAvailability(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null},
blocks: BlockInput[],
opts: ImportBlockOpts
): Promise<{dataAvailabilityStatuses: DataAvailableStatus[]; availableTime: number}> {
): Promise<{
dataAvailabilityStatuses: DataAvailableStatus[];
availableTime: number;
availableBlockInputs: BlockInput[];
}> {
if (blocks.length === 0) {
throw Error("Empty partiallyVerifiedBlocks");
}

const dataAvailabilityStatuses: DataAvailableStatus[] = [];
const seenTime = opts.seenTimestampSec !== undefined ? opts.seenTimestampSec * 1000 : Date.now();

const availableBlockInputs: BlockInput[] = [];

for (const blockInput of blocks) {
// Validate status of only not yet finalized blocks, we don't need yet to propogate the status
// as it is not used upstream anywhere
const dataAvailabilityStatus = await maybeValidateBlobs(chain, blockInput, opts);
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, opts);
dataAvailabilityStatuses.push(dataAvailabilityStatus);
availableBlockInputs.push(availableBlockInput);
}

const availableTime = blocks[blocks.length - 1].type === BlockInputType.blobsPromise ? Date.now() : seenTime;
Expand All @@ -55,21 +69,21 @@ export async function verifyBlocksDataAvailability(
});
}

return {dataAvailabilityStatuses, availableTime};
return {dataAvailabilityStatuses, availableTime, availableBlockInputs};
}

async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
opts: ImportBlockOpts
): Promise<DataAvailableStatus> {
): Promise<{dataAvailabilityStatus: DataAvailableStatus; availableBlockInput: BlockInput}> {
switch (blockInput.type) {
case BlockInputType.preDeneb:
return DataAvailableStatus.preDeneb;
return {dataAvailabilityStatus: DataAvailableStatus.preDeneb, availableBlockInput: blockInput};

case BlockInputType.postDeneb:
if (opts.validBlobSidecars === BlobSidecarValidation.Full) {
return DataAvailableStatus.available;
return {dataAvailabilityStatus: DataAvailableStatus.available, availableBlockInput: blockInput};
}

// eslint-disable-next-line no-fallthrough
Expand All @@ -82,7 +96,7 @@ async function maybeValidateBlobs(
blockInput.type === BlockInputType.postDeneb
? blockInput
: await raceWithCutoff(chain, blockInput, blockInput.availabilityPromise);
const {blobs} = blobsData;
const {blobs, blobsBytes} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
const beaconBlockRoot = chain.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);
Expand All @@ -92,7 +106,16 @@ async function maybeValidateBlobs(
const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual;
validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck});

return DataAvailableStatus.available;
const availableBlockInput = getBlockInput.postDeneb(
chain.config,
blockInput.block,
blockInput.source,
blobs,
BlobsSource.gossip,
g11tech marked this conversation as resolved.
Show resolved Hide resolved
blockInput.blockBytes,
blobsBytes
);
return {dataAvailabilityStatus: DataAvailableStatus.available, availableBlockInput: availableBlockInput};
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
BlobsCache,
GossipedInputType,
getBlockInputBlobs,
BlobsSource,
} from "../blocks/types.js";
import {Metrics} from "../../metrics/index.js";

Expand Down Expand Up @@ -135,14 +136,15 @@ export class SeenGossipBlockInput {

if (blobKzgCommitments.length === blobsCache.size) {
const allBlobs = getBlockInputBlobs(blobsCache);
resolveAvailability(allBlobs);
resolveAvailability({...allBlobs, blobsSource: BlobsSource.gossip});
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {blobs, blobsBytes} = allBlobs;
const blockInput = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
BlobsSource.gossip,
blockBytes ?? null,
blobsBytes
);
Expand Down
7 changes: 6 additions & 1 deletion packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {EpochTransitionStep, StateCloneSource, StateHashTreeRootSource} from "@lodestar/state-transition";
import {allForks} from "@lodestar/types";
import {BlockSource} from "../../chain/blocks/types.js";
import {BlockSource, BlobsSource} from "../../chain/blocks/types.js";
import {JobQueueItemType} from "../../chain/bls/index.js";
import {BlockErrorCode} from "../../chain/errors/index.js";
import {InsertOutcome} from "../../chain/opPools/types.js";
Expand Down Expand Up @@ -800,6 +800,11 @@ export function createLodestarMetrics(
help: "Total number of imported blocks by source",
labelNames: ["source"],
}),
blobBySource: register.gauge<{blobsSource: BlobsSource}>({
g11tech marked this conversation as resolved.
Show resolved Hide resolved
name: "lodestar_import_blob_by_source_total",
help: "Total number of imported blobs by source",
labelNames: ["blobsSource"],
}),
},
engineNotifyNewPayloadResult: register.gauge<{result: ExecutionPayloadStatus}>({
name: "lodestar_execution_engine_notify_new_payload_result_total",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {deneb, Epoch, phase0, allForks, Slot} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";

import {BlockInput, BlockSource, getBlockInput} from "../../chain/blocks/types.js";
import {BlobsSource, BlockInput, BlockSource, getBlockInput} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork, WithBytes} from "../interface.js";

Expand Down Expand Up @@ -43,7 +43,7 @@ export async function beaconBlocksMaybeBlobsByRange(
network.sendBlobSidecarsByRange(peerId, request),
]);

return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, endSlot, BlockSource.byRange);
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, endSlot, BlockSource.byRange, BlobsSource.byRange);
}

// Post Deneb but old blobs
Expand All @@ -58,7 +58,8 @@ export function matchBlockWithBlobs(
allBlocks: WithBytes<allForks.SignedBeaconBlock>[],
allBlobSidecars: deneb.BlobSidecar[],
endSlot: Slot,
blockSource: BlockSource
blockSource: BlockSource,
blobsSource: BlobsSource
): BlockInput[] {
const blockInputs: BlockInput[] = [];
let blobSideCarIndex = 0;
Expand Down Expand Up @@ -101,6 +102,7 @@ export function matchBlockWithBlobs(
block.data,
blockSource,
blobSidecars,
blobsSource,
null,
Array.from({length: blobKzgCommitmentsLen}, () => null)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
getBlockInputBlobs,
getBlockInput,
NullBlockInput,
BlobsSource,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
Expand Down Expand Up @@ -47,7 +48,7 @@ export async function beaconBlocksMaybeBlobsByRoot(

// The last arg is to provide slot to which all blobs should be exausted in matching
// and here it should be infinity since all bobs should match
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, Infinity, BlockSource.byRoot);
return matchBlockWithBlobs(config, allBlocks, allBlobSidecars, Infinity, BlockSource.byRoot, BlobsSource.byRoot);
}

export async function unavailableBeaconBlobsByRoot(
Expand Down Expand Up @@ -104,7 +105,7 @@ export async function unavailableBeaconBlobsByRoot(
throw Error(`Not all blobs fetched missingBlobs=${blobKzgCommitmentsLen - blobs.length}`);
}

resolveAvailability(allBlobs);
resolveAvailability({...allBlobs, blobsSource: BlobsSource.byRoot});
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC});
return getBlockInput.postDeneb(config, block, BlockSource.byRoot, blobs, blockBytes, blobsBytes);
return getBlockInput.postDeneb(config, block, BlockSource.byRoot, blobs, BlobsSource.byRoot, blockBytes, blobsBytes);
}
11 changes: 9 additions & 2 deletions packages/beacon-node/test/spec/presets/fork_choice.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
AttestationImportOpt,
BlockSource,
BlobSidecarValidation,
BlobsSource,
} from "../../../src/chain/blocks/types.js";
import {ZERO_HASH_HEX} from "../../../src/constants/constants.js";
import {PowMergeBlock} from "../../../src/eth1/interface.js";
Expand Down Expand Up @@ -209,9 +210,15 @@ const forkChoiceTest =
};
});

blockImport = getBlockInput.postDeneb(config, signedBlock, BlockSource.gossip, blobSidecars, null, [
blockImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobSidecars,
BlobsSource.gossip,
null,
]);
[null]
);
} else {
blockImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {ssz, deneb} from "@lodestar/types";
import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lodestar/config";

import {beaconBlocksMaybeBlobsByRange} from "../../../src/network/reqresp/index.js";
import {BlockInputType, BlockSource} from "../../../src/chain/blocks/types.js";
import {BlockInputType, BlockSource, BlobsSource} from "../../../src/chain/blocks/types.js";
import {initCKZG, loadEthereumTrustedSetup} from "../../../src/util/kzg.js";
import {INetwork} from "../../../src/network/interface.js";
import {ZERO_HASH} from "../../../src/constants/constants.js";
Expand Down Expand Up @@ -104,6 +104,7 @@ describe("beaconBlocksMaybeBlobsByRange", () => {
block,
source: BlockSource.byRange,
blobs,
blobsSource: BlobsSource.byRange,
blockBytes: null,
blobsBytes: blobs.map(() => null),
};
Expand Down
Loading