From b9e71094969071e25533d91879c745776ca76351 Mon Sep 17 00:00:00 2001 From: Santiago Palladino Date: Fri, 3 Jan 2025 11:18:17 -0300 Subject: [PATCH] feat: Prover node checks txs availability before sending quote (#10965) The prover node is now responsible for fetching txs for proving the epoch, instead of delegating to the prover job. It fetches the txs once an epoch is complete and before sending the quote, and reuses them when creating the job. Fixes #10803 --- .../prover-node/src/job/epoch-proving-job.ts | 21 ++----- .../prover-node/src/prover-node.test.ts | 41 +++++++++++-- yarn-project/prover-node/src/prover-node.ts | 60 +++++++++++++++---- 3 files changed, 89 insertions(+), 33 deletions(-) diff --git a/yarn-project/prover-node/src/job/epoch-proving-job.ts b/yarn-project/prover-node/src/job/epoch-proving-job.ts index 06e5d26fa82..3db7c5ca4d2 100644 --- a/yarn-project/prover-node/src/job/epoch-proving-job.ts +++ b/yarn-project/prover-node/src/job/epoch-proving-job.ts @@ -7,9 +7,7 @@ import { type L2Block, type L2BlockSource, type ProcessedTx, - type ProverCoordination, type Tx, - type TxHash, } from '@aztec/circuit-types'; import { asyncPool } from '@aztec/foundation/async-pool'; import { createLogger } from '@aztec/foundation/log'; @@ -41,12 +39,12 @@ export class EpochProvingJob implements Traceable { private dbProvider: ForkMerkleTreeOperations, private epochNumber: bigint, private blocks: L2Block[], + private txs: Tx[], private prover: EpochProver, private publicProcessorFactory: PublicProcessorFactory, private publisher: L1Publisher, private l2BlockSource: L2BlockSource, private l1ToL2MessageSource: L1ToL2MessageSource, - private coordination: ProverCoordination, private metrics: ProverNodeMetrics, private config: { parallelBlockLimit: number } = { parallelBlockLimit: 32 }, private cleanUp: (job: EpochProvingJob) => Promise = () => Promise.resolve(), @@ -92,10 +90,9 @@ export class EpochProvingJob implements Traceable { await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => { const globalVariables = block.header.globalVariables; - const txHashes = block.body.txEffects.map(tx => tx.txHash); const txCount = block.body.numberOfTxsIncludingPadded; + const txs = this.getTxs(block); const l1ToL2Messages = await this.getL1ToL2Messages(block); - const txs = await this.getTxs(txHashes, block.number); const previousHeader = await this.getBlockHeader(block.number - 1); this.log.verbose(`Starting processing block ${block.number}`, { @@ -162,17 +159,9 @@ export class EpochProvingJob implements Traceable { return this.l2BlockSource.getBlockHeader(blockNumber); } - private async getTxs(txHashes: TxHash[], blockNumber: number): Promise { - const txs = await Promise.all( - txHashes.map(txHash => this.coordination.getTxByHash(txHash).then(tx => [txHash, tx] as const)), - ); - const notFound = txs.filter(([_, tx]) => !tx); - if (notFound.length) { - throw new Error( - `Txs not found for block ${blockNumber}: ${notFound.map(([txHash]) => txHash.toString()).join(', ')}`, - ); - } - return txs.map(([_, tx]) => tx!); + private getTxs(block: L2Block): Tx[] { + const txHashes = block.body.txEffects.map(tx => tx.txHash.toBigInt()); + return this.txs.filter(tx => txHashes.includes(tx.getTxHash().toBigInt())); } private getL1ToL2Messages(block: L2Block) { diff --git a/yarn-project/prover-node/src/prover-node.test.ts b/yarn-project/prover-node/src/prover-node.test.ts index bb73e84bfe1..56581e5e23d 100644 --- a/yarn-project/prover-node/src/prover-node.test.ts +++ b/yarn-project/prover-node/src/prover-node.test.ts @@ -1,4 +1,5 @@ import { + type Body, type EpochProofClaim, EpochProofQuote, EpochProofQuotePayload, @@ -9,6 +10,9 @@ import { type MerkleTreeWriteOperations, P2PClientType, type ProverCoordination, + type Tx, + type TxEffect, + TxHash, WorldStateRunningState, type WorldStateSynchronizer, } from '@aztec/circuit-types'; @@ -44,7 +48,8 @@ describe('prover-node', () => { let l1ToL2MessageSource: MockProxy; let contractDataSource: MockProxy; let worldState: MockProxy; - let coordination: MockProxy | ProverCoordination; + let coordination: ProverCoordination; + let mockCoordination: MockProxy; let quoteProvider: MockProxy; let quoteSigner: MockProxy; let bondManager: MockProxy; @@ -108,7 +113,8 @@ describe('prover-node', () => { l1ToL2MessageSource = mock(); contractDataSource = mock(); worldState = mock(); - coordination = mock(); + mockCoordination = mock(); + coordination = mockCoordination; quoteProvider = mock(); quoteSigner = mock(); bondManager = mock(); @@ -134,10 +140,23 @@ describe('prover-node', () => { // Signer returns an empty signature quoteSigner.sign.mockImplementation(payload => Promise.resolve(new EpochProofQuote(payload, Signature.empty()))); + // We create 3 fake blocks with 1 tx effect each + blocks = times(3, i => + mock({ + number: i + 20, + hash: () => new Fr(i), + body: mock({ txEffects: [mock({ txHash: TxHash.random() } as TxEffect)] }), + }), + ); + // Archiver returns a bunch of fake blocks - blocks = times(3, i => mock({ number: i + 20, hash: () => new Fr(i) })); l2BlockSource.getBlocksForEpoch.mockResolvedValue(blocks); + // Coordination plays along and returns a tx whenever requested + mockCoordination.getTxByHash.mockImplementation(hash => + Promise.resolve(mock({ getTxHash: () => hash, tryGetTxHash: () => hash })), + ); + // A sample claim claim = { epochToProve: 10n, bondProvider: address } as EpochProofClaim; @@ -175,6 +194,12 @@ describe('prover-node', () => { expect(coordination.addEpochProofQuote).not.toHaveBeenCalled(); }); + it('does not send a quote if there is a tx missing from coordinator', async () => { + mockCoordination.getTxByHash.mockResolvedValue(undefined); + await proverNode.handleEpochCompleted(10n); + expect(coordination.addEpochProofQuote).not.toHaveBeenCalled(); + }); + it('does not send a quote on a finished epoch if the provider does not return one', async () => { quoteProvider.getQuote.mockResolvedValue(undefined); await proverNode.handleEpochCompleted(10n); @@ -309,7 +334,7 @@ describe('prover-node', () => { // Things to test // - Another aztec node receives the proof quote via p2p // - The prover node can get the it is missing via p2p, or it has them in it's mempool - describe('Using a p2p coordination', () => { + describe('using a p2p coordination', () => { let bootnode: BootstrapNode; let epochCache: MockProxy; let p2pClient: P2PClient; @@ -346,6 +371,11 @@ describe('prover-node', () => { // Set the p2p client to be the coordination method coordination = p2pClient; + // But still mock getTxByHash + const mockGetTxByHash = (hash: TxHash) => Promise.resolve(mock({ getTxHash: () => hash })); + jest.spyOn(p2pClient, 'getTxByHash').mockImplementation(mockGetTxByHash); + jest.spyOn(otherP2PClient, 'getTxByHash').mockImplementation(mockGetTxByHash); + await Promise.all([p2pClient.start(), otherP2PClient.start()]); // Sleep to enable peer discovery @@ -373,7 +403,7 @@ describe('prover-node', () => { await proverNode.stop(); }); - it('Should send a proof quote via p2p to another node', async () => { + it('should send a proof quote via p2p to another node', async () => { const epochNumber = 10n; epochCache.getEpochAndSlotNow.mockReturnValue({ epoch: epochNumber, @@ -412,6 +442,7 @@ describe('prover-node', () => { protected override doCreateEpochProvingJob( epochNumber: bigint, _blocks: L2Block[], + _txs: Tx[], _publicProcessorFactory: PublicProcessorFactory, cleanUp: (job: EpochProvingJob) => Promise, ): EpochProvingJob { diff --git a/yarn-project/prover-node/src/prover-node.ts b/yarn-project/prover-node/src/prover-node.ts index 5bb23e97378..10a65b3594f 100644 --- a/yarn-project/prover-node/src/prover-node.ts +++ b/yarn-project/prover-node/src/prover-node.ts @@ -10,6 +10,7 @@ import { type ProverCoordination, type ProverNodeApi, type Service, + type Tx, type WorldStateSynchronizer, tryStop, } from '@aztec/circuit-types'; @@ -49,6 +50,7 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr private latestEpochWeAreProving: bigint | undefined; private jobs: Map = new Map(); + private cachedEpochData: { epochNumber: bigint; blocks: L2Block[]; txs: Tx[] } | undefined = undefined; private options: ProverNodeOptions; private metrics: ProverNodeMetrics; @@ -139,13 +141,12 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr */ async handleEpochCompleted(epochNumber: bigint): Promise { try { - // Construct a quote for the epoch - const blocks = await this.l2BlockSource.getBlocksForEpoch(epochNumber); - if (blocks.length === 0) { - this.log.info(`No blocks found for epoch ${epochNumber}`); - return; - } + // Gather data for the epoch + const epochData = await this.gatherEpochData(epochNumber); + const { blocks } = epochData; + this.cachedEpochData = { epochNumber, ...epochData }; + // Construct a quote for the epoch const partialQuote = await this.quoteProvider.getQuote(Number(epochNumber), blocks); if (!partialQuote) { this.log.info(`No quote produced for epoch ${epochNumber}`); @@ -256,10 +257,9 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr } // Gather blocks for this epoch - const blocks = await this.l2BlockSource.getBlocksForEpoch(epochNumber); - if (blocks.length === 0) { - throw new Error(`No blocks found for epoch ${epochNumber}`); - } + const cachedEpochData = this.cachedEpochData?.epochNumber === epochNumber ? this.cachedEpochData : undefined; + const { blocks, txs } = cachedEpochData ?? (await this.gatherEpochData(epochNumber)); + const fromBlock = blocks[0].number; const toBlock = blocks.at(-1)!.number; @@ -279,15 +279,51 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr return Promise.resolve(); }; - const job = this.doCreateEpochProvingJob(epochNumber, blocks, publicProcessorFactory, cleanUp); + const job = this.doCreateEpochProvingJob(epochNumber, blocks, txs, publicProcessorFactory, cleanUp); this.jobs.set(job.getId(), job); return job; } + @trackSpan('ProverNode.gatherEpochData', epochNumber => ({ [Attributes.EPOCH_NUMBER]: Number(epochNumber) })) + private async gatherEpochData(epochNumber: bigint) { + // Gather blocks for this epoch and their txs + const blocks = await this.gatherBlocks(epochNumber); + const txs = await this.gatherTxs(epochNumber, blocks); + + return { blocks, txs }; + } + + private async gatherBlocks(epochNumber: bigint) { + const blocks = await this.l2BlockSource.getBlocksForEpoch(epochNumber); + if (blocks.length === 0) { + throw new Error(`No blocks found for epoch ${epochNumber}`); + } + return blocks; + } + + private async gatherTxs(epochNumber: bigint, blocks: L2Block[]) { + const txs = await Promise.all( + blocks.flatMap(block => + block.body.txEffects + .map(tx => tx.txHash) + .map(txHash => this.coordination.getTxByHash(txHash).then(tx => [block.number, txHash, tx] as const)), + ), + ); + + const notFound = txs.filter(([_blockNum, _txHash, tx]) => !tx); + if (notFound.length) { + const notFoundList = notFound.map(([blockNum, txHash]) => `${txHash.toString()} (block ${blockNum})`).join(', '); + throw new Error(`Txs not found for epoch ${epochNumber}: ${notFoundList}`); + } + + return txs.map(([_blockNumber, _txHash, tx]) => tx!); + } + /** Extracted for testing purposes. */ protected doCreateEpochProvingJob( epochNumber: bigint, blocks: L2Block[], + txs: Tx[], publicProcessorFactory: PublicProcessorFactory, cleanUp: () => Promise, ) { @@ -295,12 +331,12 @@ export class ProverNode implements ClaimsMonitorHandler, EpochMonitorHandler, Pr this.worldState, epochNumber, blocks, + txs, this.prover.createEpochProver(), publicProcessorFactory, this.publisher, this.l2BlockSource, this.l1ToL2MessageSource, - this.coordination, this.metrics, { parallelBlockLimit: this.options.maxParallelBlocksPerEpoch }, cleanUp,