Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use throw instead of reject in broker facade #10735

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions yarn-project/prover-client/src/orchestrator/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ export class ProvingOrchestrator implements EpochProver {
}

// And build the block header
logger.verbose(`Block ${provingState.globalVariables.blockNumber} completed. Assembling header.`);
logger.verbose(`Block ${blockNumber} completed. Assembling header.`);
await this.buildBlock(provingState, expectedHeader);

// If the proofs were faster than the block building, then we need to try the block root rollup again here
Expand Down Expand Up @@ -1196,10 +1196,13 @@ export class ProvingOrchestrator implements EpochProver {
return await this.prover.getAvmProof(inputs, signal, provingState.epochNumber);
} catch (err) {
if (process.env.AVM_PROVING_STRICT) {
logger.error(`Error thrown when proving AVM circuit with AVM_PROVING_STRICT on`, err);
throw err;
} else {
logger.warn(
`Error thrown when proving AVM circuit, but AVM_PROVING_STRICT is off, so faking AVM proof and carrying on. Error: ${err}.`,
`Error thrown when proving AVM circuit but AVM_PROVING_STRICT is off. Faking AVM proof and carrying on. ${inspect(
err,
)}.`,
);
return {
proof: makeEmptyRecursiveProof(AVM_PROOF_LENGTH_IN_FIELDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class CachingBrokerFacade implements ServerCircuitProver {
): Promise<ProvingJobResultsMap[T]> {
// first try the cache
let jobEnqueued = false;
let jobRejected = undefined;
try {
const cachedResult = await this.cache.getProvingJobStatus(id);
if (cachedResult.status !== 'not-found') {
Expand All @@ -78,8 +79,7 @@ export class CachingBrokerFacade implements ServerCircuitProver {
this.log.warn(`Cached result type mismatch for job=${id}. Expected=${type} but got=${output.type}`);
}
} else if (cachedResult.status === 'rejected') {
// prefer returning a rejected promises so that we don't trigger the catch block below
return Promise.reject(new Error(cachedResult.reason));
jobRejected = cachedResult.reason ?? 'Job rejected for unknown reason';
} else if (cachedResult.status === 'in-progress' || cachedResult.status === 'in-queue') {
jobEnqueued = true;
} else {
Expand All @@ -89,6 +89,10 @@ export class CachingBrokerFacade implements ServerCircuitProver {
this.log.warn(`Failed to get cached proving job id=${id}: ${err}. Re-running job`);
}

if (jobRejected) {
throw new Error(jobRejected);
}

if (!jobEnqueued) {
try {
const inputsUri = await this.proofStore.saveProofInput(id, type, inputs);
Expand Down Expand Up @@ -142,10 +146,10 @@ export class CachingBrokerFacade implements ServerCircuitProver {
if (output.type === type) {
return output.result as ProvingJobResultsMap[T];
} else {
return Promise.reject(new Error(`Unexpected proof type: ${output.type}. Expected: ${type}`));
throw new Error(`Unexpected proof type: ${output.type}. Expected: ${type}`);
}
} else {
return Promise.reject(new Error(result.reason));
throw new Error(result.reason);
}
} finally {
signal?.removeEventListener('abort', abortFn);
Expand Down
29 changes: 18 additions & 11 deletions yarn-project/prover-node/src/job/epoch-proving-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,32 @@ export class EpochProvingJob {
public async run() {
const epochNumber = Number(this.epochNumber);
const epochSize = this.blocks.length;
const firstBlockNumber = this.blocks[0].number;
this.log.info(`Starting epoch proving job`, { firstBlockNumber, epochSize, epochNumber, uuid: this.uuid });
const [fromBlock, toBlock] = [this.blocks[0].number, this.blocks.at(-1)!.number];
this.log.info(`Starting epoch ${epochNumber} proving job with blocks ${fromBlock} to ${toBlock}`, {
fromBlock,
toBlock,
epochSize,
epochNumber,
uuid: this.uuid,
});
this.state = 'processing';
const timer = new Timer();

const { promise, resolve } = promiseWithResolvers<void>();
this.runPromise = promise;

try {
this.prover.startNewEpoch(epochNumber, firstBlockNumber, epochSize);
this.prover.startNewEpoch(epochNumber, fromBlock, epochSize);

await asyncPool(this.config.parallelBlockLimit, this.blocks, async block => {
const globalVariables = block.header.globalVariables;
const txHashes = block.body.txEffects.map(tx => tx.txHash);
const txCount = block.body.numberOfTxsIncludingPadded;
const l1ToL2Messages = await this.getL1ToL2Messages(block);
const txs = await this.getTxs(txHashes);
const txs = await this.getTxs(txHashes, block.number);
const previousHeader = await this.getBlockHeader(block.number - 1);

this.log.verbose(`Starting block processing`, {
this.log.verbose(`Starting processing block ${block.number}`, {
number: block.number,
blockHash: block.hash().toString(),
lastArchive: block.header.lastArchive.root,
Expand All @@ -104,7 +110,7 @@ export class EpochProvingJob {
const publicProcessor = this.publicProcessorFactory.create(db, previousHeader, globalVariables);
await this.processTxs(publicProcessor, txs, txCount);
await db.close();
this.log.verbose(`Processed all txs for block`, {
this.log.verbose(`Processed all ${txs.length} txs for block ${block.number}`, {
blockNumber: block.number,
blockHash: block.hash().toString(),
uuid: this.uuid,
Expand All @@ -116,17 +122,16 @@ export class EpochProvingJob {

this.state = 'awaiting-prover';
const { publicInputs, proof } = await this.prover.finaliseEpoch();
this.log.info(`Finalised proof for epoch`, { epochNumber, uuid: this.uuid, duration: timer.ms() });
this.log.info(`Finalised proof for epoch ${epochNumber}`, { epochNumber, uuid: this.uuid, duration: timer.ms() });

this.state = 'publishing-proof';
const [fromBlock, toBlock] = [this.blocks[0].number, this.blocks.at(-1)!.number];
await this.publisher.submitEpochProof({ fromBlock, toBlock, epochNumber, publicInputs, proof });
this.log.info(`Submitted proof for epoch`, { epochNumber, uuid: this.uuid });

this.state = 'completed';
this.metrics.recordProvingJob(timer);
} catch (err) {
this.log.error(`Error running epoch prover job`, err, { uuid: this.uuid });
this.log.error(`Error running epoch ${epochNumber} prover job`, err, { uuid: this.uuid, epochNumber });
this.state = 'failed';
} finally {
await this.cleanUp(this);
Expand All @@ -149,13 +154,15 @@ export class EpochProvingJob {
return this.l2BlockSource.getBlockHeader(blockNumber);
}

private async getTxs(txHashes: TxHash[]): Promise<Tx[]> {
private async getTxs(txHashes: TxHash[], blockNumber: number): Promise<Tx[]> {
const txs = await Promise.all(
txHashes.map(txHash => this.coordination.getTxByHash(txHash).then(tx => [txHash, tx] as const)),
);
const notFound = txs.filter(([_, tx]) => !tx);
if (notFound.length) {
throw new Error(`Txs not found: ${notFound.map(([txHash]) => txHash.toString()).join(', ')}`);
throw new Error(
`Txs not found for block ${blockNumber}: ${notFound.map(([txHash]) => txHash.toString()).join(', ')}`,
);
}
return txs.map(([_, tx]) => tx!);
}
Expand Down
Loading