Skip to content

Commit

Permalink
feat: add proving queue
Browse files Browse the repository at this point in the history
  • Loading branch information
alexghr committed Apr 16, 2024
1 parent 2e30e5c commit 42feed8
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { makeBaseParityInputs, makeParityPublicInputs, makeProof } from '@aztec/circuits.js/testing';

import { type MockProxy, mock } from 'jest-mock-extended';

import { type CircuitProver } from '../prover/interface.js';
import { LocalProvingAgent } from './local-proving-agent.js';
import { MemoryProvingQueue } from './memory-proving-queue.js';
import { type ProvingAgent } from './proving-agent.js';
import { type ProvingQueue } from './proving-queue.js';

describe('LocalProvingAgent', () => {
let queue: ProvingQueue;
let agent: ProvingAgent;
let prover: MockProxy<CircuitProver>;

beforeEach(() => {
prover = mock<CircuitProver>();
queue = new MemoryProvingQueue();
agent = new LocalProvingAgent(prover);
});

beforeEach(() => {
agent.start(queue);
});

afterEach(async () => {
await agent.stop();
});

it('takes jobs from the queue', async () => {
const publicInputs = makeParityPublicInputs();
const proof = makeProof();
prover.getBaseParityProof.mockResolvedValue([publicInputs, proof]);

const inputs = makeBaseParityInputs();
const promise = queue.getBaseParityProof(inputs);

await expect(promise).resolves.toEqual([publicInputs, proof]);

expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs);
});

it('reports errors', async () => {
const error = new Error('test error');
prover.getBaseParityProof.mockRejectedValue(error);

const inputs = makeBaseParityInputs();
const promise = queue.getBaseParityProof(inputs);

await expect(promise).rejects.toEqual(error);

expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs);
});

it('continues to process jobs', async () => {
const publicInputs = makeParityPublicInputs();
const proof = makeProof();
prover.getBaseParityProof.mockResolvedValue([publicInputs, proof]);

const inputs = makeBaseParityInputs();
const promise1 = queue.getBaseParityProof(inputs);

await expect(promise1).resolves.toEqual([publicInputs, proof]);

const inputs2 = makeBaseParityInputs();
const promise2 = queue.getBaseParityProof(inputs2);

await expect(promise2).resolves.toEqual([publicInputs, proof]);

expect(prover.getBaseParityProof).toHaveBeenCalledTimes(2);
expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs);
expect(prover.getBaseParityProof).toHaveBeenCalledWith(inputs2);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';

import { type CircuitProver } from '../prover/interface.js';
import { type ProvingAgent } from './proving-agent.js';
import { ProvingJobType, type ProvingQueueConsumer, type ProvingRequest, type ProvingResult } from './proving-queue.js';

export class LocalProvingAgent implements ProvingAgent {
private runningPromise?: RunningPromise;

constructor(
/** The prover implementation to defer jobs to */
private prover: CircuitProver,
/** How long to wait between jobs */
private intervalMs = 10,
/** A name for this agent (if there are multiple agents running) */
name = '',
private log = createDebugLogger('aztec:prover-client:proving-agent' + name ? `:${name}` : ''),
) {}

start(queue: ProvingQueueConsumer): void {
this.runningPromise = new RunningPromise(async () => {
this.log.debug('Asking for proving jobs');
const job = await queue.getJob();
if (!job) {
return;
}

try {
this.log.debug(`Processing proving job id=${job.id} type=${ProvingJobType[job.request.type]}`);
await queue.resolveJob(job.id, await this.work(job.request));
} catch (err) {
this.log.error(`Error processing proving job id=${job.id} type=${ProvingJobType[job.request.type]}: ${err}`);
await queue.rejectJob(job.id, err);
}
}, this.intervalMs);

this.runningPromise.start();
}

async stop(): Promise<void> {
await this.runningPromise?.stop();
this.runningPromise = undefined;
}

private work({ type, inputs }: ProvingRequest): Promise<ProvingResult<typeof type>> {
switch (type) {
case ProvingJobType.BASE_PARITY: {
return this.prover.getBaseParityProof(inputs);
}

case ProvingJobType.ROOT_PARITY: {
return this.prover.getRootParityProof(inputs);
}

case ProvingJobType.BASE_ROLLUP: {
return this.prover.getBaseRollupProof(inputs);
}

case ProvingJobType.MERGE_ROLLUP: {
return this.prover.getMergeRollupProof(inputs);
}

case ProvingJobType.ROOT_ROLLUP: {
return this.prover.getRootRollupProof(inputs);
}

default: {
return Promise.reject(new Error(`Invalid proof request type: ${type}`));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {
makeBaseParityInputs,
makeBaseRollupInputs,
makeParityPublicInputs,
makeProof,
} from '@aztec/circuits.js/testing';

import { MemoryProvingQueue } from './memory-proving-queue.js';
import { ProvingJobType, type ProvingQueue } from './proving-queue.js';

describe('MemoryProvingQueue', () => {
let queue: ProvingQueue;

beforeEach(() => {
queue = new MemoryProvingQueue();
});

it('accepts multiple proving requests', () => {
expect(queue.getBaseParityProof(makeBaseParityInputs())).toBeInstanceOf(Promise);
expect(queue.getBaseRollupProof(makeBaseRollupInputs())).toBeInstanceOf(Promise);
});

it('returns jobs in order', async () => {
void queue.getBaseParityProof(makeBaseParityInputs());
void queue.getBaseRollupProof(makeBaseRollupInputs());

const job1 = await queue.getJob();
expect(job1?.request.type).toEqual(ProvingJobType.BASE_PARITY);

const job2 = await queue.getJob();
expect(job2?.request.type).toEqual(ProvingJobType.BASE_ROLLUP);
});

it('returns null when no jobs are available', async () => {
await expect(queue.getJob({ timeoutSec: 0 })).resolves.toBeNull();
});

it('notifies of completion', async () => {
const inputs = makeBaseParityInputs();
const promise = queue.getBaseParityProof(inputs);
const job = await queue.getJob();
expect(job?.request.inputs).toEqual(inputs);

const publicInputs = makeParityPublicInputs();
const proof = makeProof();
await queue.resolveJob(job!.id, [publicInputs, proof]);
await expect(promise).resolves.toEqual([publicInputs, proof]);
});

it('notifies of errors', async () => {
const inputs = makeBaseParityInputs();
const promise = queue.getBaseParityProof(inputs);
const job = await queue.getJob();
expect(job?.request.inputs).toEqual(inputs);

const error = new Error('test error');
await queue.rejectJob(job!.id, error);
await expect(promise).rejects.toEqual(error);
});
});
125 changes: 125 additions & 0 deletions yarn-project/prover-client/src/proving-queue/memory-proving-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import {
type BaseOrMergeRollupPublicInputs,
type BaseParityInputs,
type BaseRollupInputs,
type MergeRollupInputs,
type ParityPublicInputs,
type Proof,
type RootParityInputs,
type RootRollupInputs,
type RootRollupPublicInputs,
} from '@aztec/circuits.js';
import { InterruptError } from '@aztec/foundation/error';
import { MemoryFifo } from '@aztec/foundation/fifo';
import { createDebugLogger } from '@aztec/foundation/log';

import {
type ProvingJob,
ProvingJobType,
type ProvingQueue,
type ProvingRequest,
type ProvingResult,
} from './proving-queue.js';

type ProvingJobWithResolvers<T extends ProvingRequest = ProvingRequest> = ProvingJob<T> & {
resolve: (result: ProvingResult<T['type']>) => void;
reject: (err: any) => void;
};

export class MemoryProvingQueue implements ProvingQueue {
private jobId = 0;
private log = createDebugLogger('aztec:prover:proving_queue');
private queue = new MemoryFifo<ProvingJobWithResolvers>();
private jobsInProgress = new Map<string, ProvingJobWithResolvers>();

getBaseParityProof(inputs: BaseParityInputs): Promise<[ParityPublicInputs, Proof]> {
return this.put({ type: ProvingJobType.BASE_PARITY, inputs });
}

getRootParityProof(inputs: RootParityInputs): Promise<[ParityPublicInputs, Proof]> {
return this.put({ type: ProvingJobType.ROOT_PARITY, inputs });
}

getBaseRollupProof(input: BaseRollupInputs): Promise<[BaseOrMergeRollupPublicInputs, Proof]> {
return this.put({ type: ProvingJobType.BASE_ROLLUP, inputs: input });
}

getMergeRollupProof(input: MergeRollupInputs): Promise<[BaseOrMergeRollupPublicInputs, Proof]> {
return this.put({ type: ProvingJobType.MERGE_ROLLUP, inputs: input });
}

getRootRollupProof(input: RootRollupInputs): Promise<[RootRollupPublicInputs, Proof]> {
return this.put({ type: ProvingJobType.ROOT_ROLLUP, inputs: input });
}

async getJob({ timeoutSec = 1 } = {}): Promise<ProvingJob | null> {
try {
const job = await this.queue.get(timeoutSec);
if (!job) {
return null;
}

this.jobsInProgress.set(job.id, job);
return job;
} catch (err) {
if (err instanceof InterruptError) {
return null;
}

throw err;
}
}

resolveJob<T extends ProvingJob>(jobId: T['id'], result: ProvingResult<T['request']['type']>): Promise<void> {
const job = this.jobsInProgress.get(jobId);
if (!job) {
return Promise.reject(new Error('Job not found'));
}

this.jobsInProgress.delete(jobId);
job.resolve(result);
return Promise.resolve();
}

rejectJob<T extends ProvingJob>(jobId: T['id'], err: any): Promise<void> {
const job = this.jobsInProgress.get(jobId);
if (!job) {
return Promise.reject(new Error('Job not found'));
}

this.jobsInProgress.delete(jobId);
job.reject(err);
return Promise.resolve();
}

private put<T extends ProvingRequest>(request: T): Promise<ProvingResult<T['type']>> {
let resolve: ProvingJobWithResolvers<typeof request>['resolve'] | undefined;
let reject: ProvingJobWithResolvers<T>['reject'] | undefined;

const promise = new Promise<ProvingResult<T['type']>>((res, rej) => {
resolve = res;
reject = rej;
});

// ES spec guarantees that resolve and reject are defined
// this `if` makes TypeScript happy
if (!resolve || !reject) {
throw new Error('Promise not created');
}

const item: ProvingJobWithResolvers<T> = {
id: String(this.jobId++),
request,
resolve,
reject,
};

this.log.info(`Adding ${ProvingJobType[request.type]} proving job to queue`);
// TODO (alexg) remove the `any`
if (!this.queue.put(item as any)) {
return Promise.reject(new Error('Failed to submit proving request'));
}

return promise;
}
}
15 changes: 15 additions & 0 deletions yarn-project/prover-client/src/proving-queue/proving-agent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { type ProvingQueueConsumer } from './proving-queue.js';

/** An agent that reads proving jobs from the queue, creates the proof and submits back the result */
export interface ProvingAgent {
/**
* Starts the agent to read proving jobs from the queue.
* @param queue - The queue to read proving jobs from.
*/
start(queue: ProvingQueueConsumer): void;

/**
* Stops the agent. Does nothing if the agent is not running.
*/
stop(): Promise<void>;
}
Loading

0 comments on commit 42feed8

Please sign in to comment.