Skip to content

Commit

Permalink
feat: add and use getBlobsV1 to expedite gossip import
Browse files Browse the repository at this point in the history
hookup the getblobs api to get bob and proof data from el

remove unused

fix import

metrics overhault, test, debugging testing, some feeback

fix

add nethermind bug dicussion link

fix
  • Loading branch information
g11tech committed Nov 3, 2024
1 parent 0d1fd9c commit d6044d6
Show file tree
Hide file tree
Showing 15 changed files with 663 additions and 35 deletions.
10 changes: 5 additions & 5 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ export function getBeaconBlockApi({
// specification is very clear that this is the desired behaviour.
//
// i) Publish blobs and block before importing so that network can see them asap
// ii) publish blobs first because
// a) by the times nodes see block, they might decide to pull blobs
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
// ii) publish block first because
// a) as soon as node sees block they can start processing it while blobs arrive
// b) getting block first allows nodes to use getBlobs from local ELs and save
// import latency and hopefully bandwidth
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export async function verifyBlocksInEpoch(
} as SegmentExecStatus),

// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),
verifyBlocksDataAvailability(this, blocksInput, abortController.signal, opts),

// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {computeTimeAtSlot} from "@lodestar/state-transition";
import {DataAvailabilityStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, UintNum64} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {Logger, ErrorAborted} from "@lodestar/utils";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {Metrics} from "../../metrics/metrics.js";
Expand All @@ -27,6 +27,7 @@ const BLOB_AVAILABILITY_TIMEOUT = 12_000;
export async function verifyBlocksDataAvailability(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null},
blocks: BlockInput[],
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{
dataAvailabilityStatuses: DataAvailabilityStatus[];
Expand All @@ -43,9 +44,12 @@ export async function verifyBlocksDataAvailability(
const availableBlockInputs: BlockInput[] = [];

for (const blockInput of blocks) {
if (signal.aborted) {
throw new ErrorAborted("verifyBlocksDataAvailability");
}
// Validate status of only not yet finalized blocks, we don't need yet to propogate the status
// as it is not used upstream anywhere
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, opts);
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, signal, opts);
dataAvailabilityStatuses.push(dataAvailabilityStatus);
availableBlockInputs.push(availableBlockInput);
}
Expand All @@ -69,6 +73,7 @@ export async function verifyBlocksDataAvailability(
async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{dataAvailabilityStatus: DataAvailabilityStatus; availableBlockInput: BlockInput}> {
switch (blockInput.type) {
Expand All @@ -92,7 +97,7 @@ async function maybeValidateBlobs(
const blobsData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise);
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise, signal);
const {blobs} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
Expand Down Expand Up @@ -122,16 +127,21 @@ async function maybeValidateBlobs(
async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
availabilityPromise: Promise<T>
availabilityPromise: Promise<T>,
signal: AbortSignal
): Promise<T> {
const {block} = blockInput;
const blockSlot = block.message.slot;

const cutoffTime = Math.max(
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(),
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime));
const cutoffTime =
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now();
const cutoffTimeout =
cutoffTime > 0
? new Promise((_resolve, reject) => {
setTimeout(() => reject(new Error("Timeout exceeded")), cutoffTime);
signal.addEventListener("abort", () => reject(signal.reason));
})
: Promise.reject(new Error("Cutoff time must be greater than 0"));
chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime});

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ export class HttpRpcError extends Error {
/**
* JSON RPC spec errors https://www.jsonrpc.org/specification#response_object
*/
function parseJsonRpcErrorCode(code: number): string {
export function parseJsonRpcErrorCode(code: number): string {
if (code === -32700) return "Parse request error";
if (code === -32600) return "Invalid request object";
if (code === -32601) return "Method not found";
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getBlobs(): Promise<never> {
throw Error("Execution engine disabled");
}
}
57 changes: 56 additions & 1 deletion packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import {ExecutionPayload, ExecutionRequests, Root, RootHex, Wei} from "@lodestar/types";
import {BlobAndProof} from "@lodestar/types/deneb";
import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";
import {Logger} from "@lodestar/logger";
import {
ErrorJsonRpcResponse,
HttpRpcError,
IJsonRpcHttpClient,
JsonRpcHttpClientEvent,
parseJsonRpcErrorCode,
ReqOpts,
} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {bytesToData, numToQuantity} from "../../eth1/provider/utils.js";
import {getLodestarClientVersion} from "../../util/metadata.js";
import {
ExecutionPayloadStatus,
Expand All @@ -38,6 +40,7 @@ import {
assertReqSizeLimit,
deserializeExecutionPayloadBody,
serializeExecutionRequests,
deserializeBlobAndProofs,
} from "./types.js";
import {getExecutionEngineState} from "./utils.js";

Expand Down Expand Up @@ -111,6 +114,7 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
*/
export class ExecutionEngineHttp implements IExecutionEngine {
private logger: Logger;
private lastGetBlobsErrorTime: number = 0;

// The default state is ONLINE, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
Expand Down Expand Up @@ -463,6 +467,57 @@ export class ExecutionEngineHttp implements IExecutionEngine {
return response.map(deserializeExecutionPayloadBody);
}

async getBlobs(_fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]> {
// retry only after a day may be
const GETBLOBS_RETRY_TIMEOUT = 256 * 12;
const timeNow = Date.now() / 1000;
const timeSinceLastFail = timeNow - this.lastGetBlobsErrorTime;
if (timeSinceLastFail < GETBLOBS_RETRY_TIMEOUT) {
// do not try getblobs since it might not be available
this.logger.debug(
`disabled engine_getBlobsV1 api call since last failed < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`,
timeSinceLastFail
);
throw Error(
`engine_getBlobsV1 call recently failed timeSinceLastFail=${timeSinceLastFail} < GETBLOBS_RETRY_TIMEOUT=${GETBLOBS_RETRY_TIMEOUT}`
);
}

const method = "engine_getBlobsV1";
assertReqSizeLimit(versionedHashes.length, 128);
const versionedHashesHex = versionedHashes.map(bytesToData);
let response = await this.rpc
.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [versionedHashesHex]})
.catch((e) => {
if (e instanceof ErrorJsonRpcResponse && parseJsonRpcErrorCode(e.response.error.code) === "Method not found") {
this.lastGetBlobsErrorTime = timeNow;
this.logger.debug("disabling engine_getBlobsV1 api call since engine responded with method not availeble", {
retryTimeout: GETBLOBS_RETRY_TIMEOUT,
});
}
throw e;
});

// handle nethermind buggy response
// see: https://discord.com/channels/595666850260713488/1293605631785304088/1298956894274060301
if (
(response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs !== undefined
) {
response = (response as unknown as {blobsAndProofs: EngineApiRpcReturnTypes[typeof method]}).blobsAndProofs;
}

if (response.length !== versionedHashes.length) {
const error = `Invalid engine_getBlobsV1 response length=${response.length} versionedHashes=${versionedHashes.length}`;
this.logger.error(error);
throw Error(error);
}

return response.map(deserializeBlobAndProofs);
}

private async getClientVersion(clientVersion: ClientVersion): Promise<ClientVersion[]> {
const method = "engine_getClientVersionV1";

Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {ForkName} from "@lodestar/params";
import {KZGCommitment, Blob, KZGProof} from "@lodestar/types/deneb";
import {KZGCommitment, Blob, KZGProof, BlobAndProof} from "@lodestar/types/deneb";
import {Root, RootHex, capella, Wei, ExecutionPayload, ExecutionRequests} from "@lodestar/types";

import {DATA} from "../../eth1/provider/utils.js";
Expand Down Expand Up @@ -179,4 +179,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(fork: ForkName, blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(fork: ForkName, start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getBlobs(fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]>;
}
7 changes: 7 additions & 0 deletions packages/beacon-node/src/execution/engine/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
engine_getPayloadBodiesByHashV1: this.getPayloadBodiesByHash.bind(this),
engine_getPayloadBodiesByRangeV1: this.getPayloadBodiesByRange.bind(this),
engine_getClientVersionV1: this.getClientVersionV1.bind(this),
engine_getBlobsV1: this.getBlobs.bind(this),
};
}

Expand Down Expand Up @@ -396,6 +397,12 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
return [{code: ClientCode.XX, name: "mock", version: "", commit: ""}];
}

private getBlobs(
versionedHashes: EngineApiRpcParamTypes["engine_getBlobsV1"][1]
): EngineApiRpcReturnTypes["engine_getBlobsV1"] {
return versionedHashes.map((_vh) => null);
}

private timestampToFork(timestamp: number): ForkExecution {
if (timestamp > (this.opts.electraForkTimestamp ?? Infinity)) return ForkName.electra;
if (timestamp > (this.opts.denebForkTimestamp ?? Infinity)) return ForkName.deneb;
Expand Down
19 changes: 19 additions & 0 deletions packages/beacon-node/src/execution/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
ForkName,
ForkSeq,
} from "@lodestar/params";
import {BlobAndProof} from "@lodestar/types/deneb";

import {
bytesToData,
Expand Down Expand Up @@ -69,6 +70,8 @@ export type EngineApiRpcParamTypes = {
* Object - Instance of ClientVersion
*/
engine_getClientVersionV1: [ClientVersionRpc];

engine_getBlobsV1: [DATA[]];
};

export type PayloadStatus = {
Expand Down Expand Up @@ -111,6 +114,8 @@ export type EngineApiRpcReturnTypes = {
engine_getPayloadBodiesByRangeV1: (ExecutionPayloadBodyRpc | null)[];

engine_getClientVersionV1: ClientVersionRpc[];

engine_getBlobsV1: (BlobAndProofRpc | null)[];
};

type ExecutionPayloadRpcWithValue = {
Expand Down Expand Up @@ -185,6 +190,11 @@ export type ConsolidationRequestRpc = {
targetPubkey: DATA;
};

export type BlobAndProofRpc = {
blob: DATA;
proof: DATA;
};

export type VersionedHashesRpc = DATA[];

export type PayloadAttributesRpc = {
Expand Down Expand Up @@ -494,6 +504,15 @@ export function serializeExecutionPayloadBody(data: ExecutionPayloadBody | null)
: null;
}

export function deserializeBlobAndProofs(data: BlobAndProofRpc | null): BlobAndProof | null {
return data
? {
blob: dataToBytes(data.blob, BYTES_PER_FIELD_ELEMENT * FIELD_ELEMENTS_PER_BLOB),
proof: dataToBytes(data.proof, 48),
}
: null;
}

export function assertReqSizeLimit(blockHashesReqCount: number, count: number): void {
if (blockHashesReqCount > count) {
throw new Error(`Requested blocks must not be > ${count}`);
Expand Down
Loading

0 comments on commit d6044d6

Please sign in to comment.