diff --git a/yarn-project/archiver/src/archiver/archiver.ts b/yarn-project/archiver/src/archiver/archiver.ts index 94c664d0f11..7b3b7ebbf2b 100644 --- a/yarn-project/archiver/src/archiver/archiver.ts +++ b/yarn-project/archiver/src/archiver/archiver.ts @@ -176,7 +176,7 @@ export class Archiver implements ArchiveSource { config.l1Contracts.registryAddress, archiverStore, config.archiverPollingIntervalMS ?? 10_000, - new ArchiverInstrumentation(telemetry), + new ArchiverInstrumentation(telemetry, () => archiverStore.estimateSize()), { l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration }, ); await archiver.start(blockUntilSynced); @@ -271,9 +271,6 @@ export class Archiver implements ArchiveSource { // the chain locally before we start unwinding stuff. This can be optimized by figuring out // up to which point we're pruning, and then requesting L2 blocks up to that point only. await this.handleEpochPrune(provenBlockNumber, currentL1BlockNumber); - - const storeSizes = this.store.estimateSize(); - this.instrumentation.recordDBMetrics(storeSizes); } } diff --git a/yarn-project/archiver/src/archiver/instrumentation.ts b/yarn-project/archiver/src/archiver/instrumentation.ts index 1d6343b8f9d..7c44a9a4618 100644 --- a/yarn-project/archiver/src/archiver/instrumentation.ts +++ b/yarn-project/archiver/src/archiver/instrumentation.ts @@ -5,6 +5,7 @@ import { type Gauge, type Histogram, LmdbMetrics, + type LmdbStatsCallback, Metrics, type TelemetryClient, type UpDownCounter, @@ -23,7 +24,7 @@ export class ArchiverInstrumentation { private log = createDebugLogger('aztec:archiver:instrumentation'); - constructor(private telemetry: TelemetryClient) { + constructor(private telemetry: TelemetryClient, lmdbStats?: LmdbStatsCallback) { const meter = telemetry.getMeter('Archiver'); this.blockHeight = meter.createGauge(Metrics.ARCHIVER_BLOCK_HEIGHT, { description: 'The height of the latest block processed by the archiver', @@ -72,13 +73,10 @@ export class ArchiverInstrumentation { name: Metrics.ARCHIVER_DB_NUM_ITEMS, description: 'Num items in the archiver database', }, + lmdbStats, ); } - public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) { - this.dbMetrics.recordDBMetrics(metrics); - } - public isEnabled(): boolean { return this.telemetry.isEnabled(); } diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts b/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts index 3ae24df0ad9..df382eb6251 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_agent.ts @@ -40,7 +40,18 @@ export async function startProverAgent( ); const prover = await buildServerCircuitProver(config, telemetry); const proofStore = new InlineProofStore(); - const agents = times(config.proverAgentCount, () => new ProvingAgent(broker, proofStore, prover)); + const agents = times( + config.proverAgentCount, + () => + new ProvingAgent( + broker, + proofStore, + prover, + telemetry, + config.proverAgentProofTypes, + config.proverAgentPollIntervalMs, + ), + ); await Promise.all(agents.map(agent => agent.start())); diff --git a/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts b/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts index 197d48971c9..ce5ef637ff6 100644 --- a/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts +++ b/yarn-project/aztec/src/cli/cmds/start_prover_broker.ts @@ -3,6 +3,10 @@ import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server'; import { type LogFn } from '@aztec/foundation/log'; import { ProvingJobBrokerSchema, createAndStartProvingBroker } from '@aztec/prover-client/broker'; import { getProverNodeBrokerConfigFromEnv } from '@aztec/prover-node'; +import { + createAndStartTelemetryClient, + getConfigEnvVars as getTelemetryClientConfig, +} from '@aztec/telemetry-client/start'; import { extractRelevantOptions } from '../util.js'; @@ -22,7 +26,8 @@ export async function startProverBroker( ...extractRelevantOptions(options, proverBrokerConfigMappings, 'proverBroker'), // override with command line options }; - const broker = await createAndStartProvingBroker(config); + const client = await createAndStartTelemetryClient(getTelemetryClientConfig()); + const broker = await createAndStartProvingBroker(config, client); services.proverBroker = [broker, ProvingJobBrokerSchema]; signalHandlers.push(() => broker.stop()); diff --git a/yarn-project/p2p/src/mem_pools/instrumentation.ts b/yarn-project/p2p/src/mem_pools/instrumentation.ts index e4271029ba2..d80b2f69d55 100644 --- a/yarn-project/p2p/src/mem_pools/instrumentation.ts +++ b/yarn-project/p2p/src/mem_pools/instrumentation.ts @@ -3,6 +3,7 @@ import { Attributes, type Histogram, LmdbMetrics, + type LmdbStatsCallback, Metrics, type TelemetryClient, type UpDownCounter, @@ -58,7 +59,7 @@ export class PoolInstrumentation { private defaultAttributes; - constructor(telemetry: TelemetryClient, name: PoolName) { + constructor(telemetry: TelemetryClient, name: PoolName, dbStats?: LmdbStatsCallback) { const meter = telemetry.getMeter(name); this.defaultAttributes = { [Attributes.POOL_NAME]: name }; @@ -98,13 +99,10 @@ export class PoolInstrumentation { name: Metrics.MEMPOOL_DB_NUM_ITEMS, description: 'Num items in database for the Tx mempool', }, + dbStats, ); } - public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) { - this.dbMetrics.recordDBMetrics(metrics); - } - public recordSize(poolObject: PoolObject) { this.objectSize.record(poolObject.getSize()); } diff --git a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts index 865fbd8fdf2..18ba3c5fc1d 100644 --- a/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts +++ b/yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts @@ -37,7 +37,7 @@ export class AztecKVTxPool implements TxPool { this.#store = store; this.#log = log; - this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL); + this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize()); } public markAsMined(txHashes: TxHash[], blockNumber: number): Promise { @@ -53,8 +53,6 @@ export class AztecKVTxPool implements TxPool { } this.#metrics.recordRemovedObjects(deleted, 'pending'); this.#metrics.recordAddedObjects(txHashes.length, 'mined'); - const storeSizes = this.#store.estimateSize(); - this.#metrics.recordDBMetrics(storeSizes); }); } diff --git a/yarn-project/prover-client/src/prover-client/prover-client.ts b/yarn-project/prover-client/src/prover-client/prover-client.ts index 3cc5b9aa32b..d41e3ad8851 100644 --- a/yarn-project/prover-client/src/prover-client/prover-client.ts +++ b/yarn-project/prover-client/src/prover-client/prover-client.ts @@ -137,7 +137,15 @@ export class ProverClient implements EpochProverManager { const prover = await buildServerCircuitProver(this.config, this.telemetry); this.agents = times( this.config.proverAgentCount, - () => new ProvingAgent(this.agentClient!, proofStore, prover, [], this.config.proverAgentPollIntervalMs), + () => + new ProvingAgent( + this.agentClient!, + proofStore, + prover, + this.telemetry, + [], + this.config.proverAgentPollIntervalMs, + ), ); await Promise.all(this.agents.map(agent => agent.start())); diff --git a/yarn-project/prover-client/src/proving_broker/factory.ts b/yarn-project/prover-client/src/proving_broker/factory.ts index 02a5fcb314b..67295fb6011 100644 --- a/yarn-project/prover-client/src/proving_broker/factory.ts +++ b/yarn-project/prover-client/src/proving_broker/factory.ts @@ -1,16 +1,20 @@ import { type ProverBrokerConfig } from '@aztec/circuit-types'; import { AztecLmdbStore } from '@aztec/kv-store/lmdb'; +import { type TelemetryClient } from '@aztec/telemetry-client'; import { ProvingBroker } from './proving_broker.js'; import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js'; import { KVBrokerDatabase } from './proving_broker_database/persisted.js'; -export async function createAndStartProvingBroker(config: ProverBrokerConfig): Promise { +export async function createAndStartProvingBroker( + config: ProverBrokerConfig, + client: TelemetryClient, +): Promise { const database = config.proverBrokerDataDirectory - ? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory)) + ? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory), client) : new InMemoryBrokerDatabase(); - const broker = new ProvingBroker(database, { + const broker = new ProvingBroker(database, client, { jobTimeoutMs: config.proverBrokerJobTimeoutMs, maxRetries: config.proverBrokerJobMaxRetries, timeoutIntervalMs: config.proverBrokerPollIntervalMs, diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts index cc49057ab6d..5a33598a31d 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.test.ts @@ -19,6 +19,7 @@ import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js import { randomBytes } from '@aztec/foundation/crypto'; import { AbortError } from '@aztec/foundation/error'; import { promiseWithResolvers } from '@aztec/foundation/promise'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { jest } from '@jest/globals'; @@ -50,7 +51,7 @@ describe('ProvingAgent', () => { saveProofOutput: jest.fn(), }; - agent = new ProvingAgent(jobSource, proofDB, prover, [ProvingRequestType.BASE_PARITY]); + agent = new ProvingAgent(jobSource, proofDB, prover, new NoopTelemetryClient(), [ProvingRequestType.BASE_PARITY]); }); afterEach(async () => { diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent.ts b/yarn-project/prover-client/src/proving_broker/proving_agent.ts index 6d17c8176b5..333ac91a4a9 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_agent.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_agent.ts @@ -10,8 +10,11 @@ import { } from '@aztec/circuit-types'; import { createDebugLogger } from '@aztec/foundation/log'; import { RunningPromise } from '@aztec/foundation/running-promise'; +import { Timer } from '@aztec/foundation/timer'; +import { type TelemetryClient } from '@aztec/telemetry-client'; import { type ProofStore } from './proof_store.js'; +import { ProvingAgentInstrumentation } from './proving_agent_instrumentation.js'; import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_controller.js'; /** @@ -20,6 +23,8 @@ import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_ export class ProvingAgent { private currentJobController?: ProvingJobController; private runningPromise: RunningPromise; + private instrumentation: ProvingAgentInstrumentation; + private idleTimer: Timer | undefined; constructor( /** The source of proving jobs */ @@ -28,12 +33,15 @@ export class ProvingAgent { private proofStore: ProofStore, /** The prover implementation to defer jobs to */ private circuitProver: ServerCircuitProver, + /** A telemetry client through which to emit metrics */ + client: TelemetryClient, /** Optional list of allowed proof types to build */ private proofAllowList: Array = [], /** How long to wait between jobs */ private pollIntervalMs = 1000, private log = createDebugLogger('aztec:prover-client:proving-agent'), ) { + this.instrumentation = new ProvingAgentInstrumentation(client); this.runningPromise = new RunningPromise(this.safeWork, this.pollIntervalMs); } @@ -46,6 +54,7 @@ export class ProvingAgent { } public start(): void { + this.idleTimer = new Timer(); this.runningPromise.start(); } @@ -114,6 +123,11 @@ export class ProvingAgent { ); } + if (this.idleTimer) { + this.instrumentation.recordIdleTime(this.idleTimer); + } + this.idleTimer = undefined; + this.currentJobController.start(); } catch (err) { this.log.error(`Error in ProvingAgent: ${String(err)}`); @@ -126,6 +140,7 @@ export class ProvingAgent { err: Error | undefined, result: ProvingJobResultsMap[T] | undefined, ) => { + this.idleTimer = new Timer(); if (err) { const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false; this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err); diff --git a/yarn-project/prover-client/src/proving_broker/proving_agent_instrumentation.ts b/yarn-project/prover-client/src/proving_broker/proving_agent_instrumentation.ts new file mode 100644 index 00000000000..573b71f2e93 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_agent_instrumentation.ts @@ -0,0 +1,21 @@ +import { type Timer } from '@aztec/foundation/timer'; +import { type Histogram, Metrics, type TelemetryClient, ValueType } from '@aztec/telemetry-client'; + +export class ProvingAgentInstrumentation { + private idleTime: Histogram; + + constructor(client: TelemetryClient, name = 'ProvingAgent') { + const meter = client.getMeter(name); + + this.idleTime = meter.createHistogram(Metrics.PROVING_AGENT_IDLE, { + description: 'Records how long an agent was idle', + unit: 'ms', + valueType: ValueType.INT, + }); + } + + recordIdleTime(msOrTimer: Timer | number) { + const duration = typeof msOrTimer === 'number' ? msOrTimer : Math.floor(msOrTimer.ms()); + this.idleTime.record(duration); + } +} diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts index 543843a6e15..76eef870b21 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.test.ts @@ -1,6 +1,7 @@ import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types'; import { randomBytes } from '@aztec/foundation/crypto'; import { openTmpStore } from '@aztec/kv-store/utils'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { jest } from '@jest/globals'; @@ -17,7 +18,7 @@ describe.each([ () => ({ database: new InMemoryBrokerDatabase(), cleanup: undefined }), () => { const store = openTmpStore(true); - const database = new KVBrokerDatabase(store); + const database = new KVBrokerDatabase(store, new NoopTelemetryClient()); const cleanup = () => store.close(); return { database, cleanup }; }, @@ -35,7 +36,7 @@ describe.each([ maxRetries = 2; ({ database, cleanup } = createDb()); - broker = new ProvingBroker(database, { + broker = new ProvingBroker(database, new NoopTelemetryClient(), { jobTimeoutMs, timeoutIntervalMs: jobTimeoutMs / 4, maxRetries, @@ -409,7 +410,7 @@ describe.each([ // fake some time passing while the broker restarts await jest.advanceTimersByTimeAsync(10_000); - broker = new ProvingBroker(database); + broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); await assertJobStatus(job1.id, 'in-queue'); @@ -470,7 +471,7 @@ describe.each([ // fake some time passing while the broker restarts await jest.advanceTimersByTimeAsync(10_000); - broker = new ProvingBroker(database); + broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); await assertJobStatus(job1.id, 'in-queue'); @@ -521,7 +522,7 @@ describe.each([ // fake some time passing while the broker restarts await jest.advanceTimersByTimeAsync(100 * jobTimeoutMs); - broker = new ProvingBroker(database); + broker = new ProvingBroker(database, new NoopTelemetryClient()); await broker.start(); await assertJobStatus(job1.id, 'in-queue'); diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker.ts b/yarn-project/prover-client/src/proving_broker/proving_broker.ts index 62667821ec7..1c73b62b84a 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker.ts @@ -12,10 +12,13 @@ import { import { createDebugLogger } from '@aztec/foundation/log'; import { type PromiseWithResolvers, RunningPromise, promiseWithResolvers } from '@aztec/foundation/promise'; import { PriorityMemoryQueue } from '@aztec/foundation/queue'; +import { Timer } from '@aztec/foundation/timer'; +import { type TelemetryClient } from '@aztec/telemetry-client'; import assert from 'assert'; import { type ProvingBrokerDatabase } from './proving_broker_database.js'; +import { type MonitorCallback, ProvingBrokerInstrumentation } from './proving_broker_instrumentation.js'; type InProgressMetadata = { id: ProvingJobId; @@ -58,6 +61,9 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { // as above, but for results private resultsCache = new Map(); + // tracks when each job was enqueued + private enqueuedAt = new Map(); + // keeps track of which jobs are currently being processed // in the event of a crash this information is lost, but that's ok // the next time the broker starts it will recreate jobsCache and still @@ -75,18 +81,37 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { private jobTimeoutMs: number; private maxRetries: number; + private instrumentation: ProvingBrokerInstrumentation; + public constructor( private database: ProvingBrokerDatabase, - { jobTimeoutMs = 30, timeoutIntervalMs = 10, maxRetries = 3 }: ProofRequestBrokerConfig = {}, + client: TelemetryClient, + { jobTimeoutMs = 30_000, timeoutIntervalMs = 10_000, maxRetries = 3 }: ProofRequestBrokerConfig = {}, private logger = createDebugLogger('aztec:prover-client:proving-broker'), ) { + this.instrumentation = new ProvingBrokerInstrumentation(client); this.timeoutPromise = new RunningPromise(this.timeoutCheck, timeoutIntervalMs); this.jobTimeoutMs = jobTimeoutMs; this.maxRetries = maxRetries; } - // eslint-disable-next-line require-await - public async start(): Promise { + private measureQueueDepth: MonitorCallback = (type: ProvingRequestType) => { + return this.queues[type].length(); + }; + + private countActiveJobs: MonitorCallback = (type: ProvingRequestType) => { + let count = 0; + for (const { id } of this.inProgress.values()) { + const job = this.jobsCache.get(id); + if (job?.type === type) { + count++; + } + } + + return count; + }; + + public start(): Promise { for (const [item, result] of this.database.allProvingJobs()) { this.logger.info(`Restoring proving job id=${item.id} settled=${!!result}`); @@ -103,6 +128,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { } this.timeoutPromise.start(); + + this.instrumentation.monitorQueueDepth(this.measureQueueDepth); + this.instrumentation.monitorActiveJobs(this.countActiveJobs); + + return Promise.resolve(); } public stop(): Promise { @@ -187,6 +217,10 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { startedAt: time, lastUpdatedAt: time, }); + const enqueuedAt = this.enqueuedAt.get(job.id); + if (enqueuedAt) { + this.instrumentation.recordJobWait(job.type, enqueuedAt); + } return { job, time }; } @@ -216,6 +250,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.logger.info(`Retrying proving job id=${id} type=${ProvingRequestType[item.type]} retry=${retries + 1}`); this.retries.set(id, retries + 1); this.enqueueJobInternal(item); + this.instrumentation.incRetriedJobs(item.type); return; } @@ -228,6 +263,11 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { const result: ProvingJobSettledResult = { status: 'rejected', reason: String(err) }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.instrumentation.incRejectedJobs(item.type); + if (info) { + const duration = this.timeSource() - info.startedAt; + this.instrumentation.recordJobDuration(item.type, duration * 1000); + } } reportProvingJobProgress( @@ -303,6 +343,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { const result: ProvingJobSettledResult = { status: 'fulfilled', value }; this.resultsCache.set(id, result); this.promises.get(id)!.resolve(result); + this.instrumentation.incResolvedJobs(item.type); } private timeoutCheck = () => { @@ -320,6 +361,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.logger.warn(`Proving job id=${id} timed out. Adding it back to the queue.`); this.inProgress.delete(id); this.enqueueJobInternal(item); + this.instrumentation.incTimedOutJobs(item.type); } } }; @@ -329,6 +371,7 @@ export class ProvingBroker implements ProvingJobProducer, ProvingJobConsumer { this.promises.set(job.id, promiseWithResolvers()); } this.queues[job.type].put(job); + this.enqueuedAt.set(job.id, new Timer()); this.logger.debug(`Enqueued new proving job id=${job.id}`); } } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts index 909b2d6e4e1..61ca5232015 100644 --- a/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_database/persisted.ts @@ -1,14 +1,29 @@ import { type ProofUri, ProvingJob, type ProvingJobId, ProvingJobSettledResult } from '@aztec/circuit-types'; import { jsonParseWithSchema, jsonStringify } from '@aztec/foundation/json-rpc'; import { type AztecKVStore, type AztecMap } from '@aztec/kv-store'; +import { LmdbMetrics, Metrics, type TelemetryClient } from '@aztec/telemetry-client'; import { type ProvingBrokerDatabase } from '../proving_broker_database.js'; export class KVBrokerDatabase implements ProvingBrokerDatabase { private jobs: AztecMap; private jobResults: AztecMap; - - constructor(private store: AztecKVStore) { + private metrics: LmdbMetrics; + + constructor(private store: AztecKVStore, client: TelemetryClient) { + this.metrics = new LmdbMetrics( + client.getMeter('KVBrokerDatabase'), + { + name: Metrics.PROVING_QUEUE_DB_MAP_SIZE, + description: 'Database map size for the proving broker', + }, + { + name: Metrics.PROVING_QUEUE_DB_USED_SIZE, + description: 'Database used size for the proving broker', + }, + { name: Metrics.PROVING_QUEUE_DB_NUM_ITEMS, description: 'Number of items in the broker database' }, + () => store.estimateSize(), + ); this.jobs = store.openMap('proving_jobs'); this.jobResults = store.openMap('proving_job_results'); } diff --git a/yarn-project/prover-client/src/proving_broker/proving_broker_instrumentation.ts b/yarn-project/prover-client/src/proving_broker/proving_broker_instrumentation.ts new file mode 100644 index 00000000000..2379bdd8a32 --- /dev/null +++ b/yarn-project/prover-client/src/proving_broker/proving_broker_instrumentation.ts @@ -0,0 +1,130 @@ +import { ProvingRequestType } from '@aztec/circuit-types'; +import { type Timer } from '@aztec/foundation/timer'; +import { + Attributes, + type Histogram, + Metrics, + type ObservableGauge, + type ObservableResult, + type TelemetryClient, + type UpDownCounter, + ValueType, + millisecondBuckets, +} from '@aztec/telemetry-client'; + +export type MonitorCallback = (proofType: ProvingRequestType) => number; + +export class ProvingBrokerInstrumentation { + private queueSize: ObservableGauge; + private activeJobs: ObservableGauge; + private resolvedJobs: UpDownCounter; + private rejectedJobs: UpDownCounter; + private timedOutJobs: UpDownCounter; + private jobWait: Histogram; + private jobDuration: Histogram; + private retriedJobs: UpDownCounter; + + constructor(client: TelemetryClient, name = 'ProvingBroker') { + const meter = client.getMeter(name); + + this.queueSize = meter.createObservableGauge(Metrics.PROVING_QUEUE_SIZE, { + valueType: ValueType.INT, + }); + + this.activeJobs = meter.createObservableGauge(Metrics.PROVING_QUEUE_ACTIVE_JOBS, { + valueType: ValueType.INT, + }); + + this.resolvedJobs = meter.createUpDownCounter(Metrics.PROVING_QUEUE_RESOLVED_JOBS, { + valueType: ValueType.INT, + }); + + this.rejectedJobs = meter.createUpDownCounter(Metrics.PROVING_QUEUE_REJECTED_JOBS, { + valueType: ValueType.INT, + }); + + this.retriedJobs = meter.createUpDownCounter(Metrics.PROVING_QUEUE_RETRIED_JOBS, { + valueType: ValueType.INT, + }); + + this.timedOutJobs = meter.createUpDownCounter(Metrics.PROVING_QUEUE_TIMED_OUT_JOBS, { + valueType: ValueType.INT, + }); + + this.jobWait = meter.createHistogram(Metrics.PROVING_QUEUE_JOB_WAIT, { + description: 'Records how long a job sits in the queue', + unit: 'ms', + valueType: ValueType.INT, + advice: { + explicitBucketBoundaries: millisecondBuckets(1), // 10ms -> ~327s + }, + }); + + this.jobDuration = meter.createHistogram(Metrics.PROVING_QUEUE_JOB_DURATION, { + description: 'Records how long a job takes to complete', + unit: 'ms', + valueType: ValueType.INT, + advice: { + explicitBucketBoundaries: millisecondBuckets(1), // 10ms -> ~327s + }, + }); + } + + monitorQueueDepth(fn: MonitorCallback) { + this.queueSize.addCallback(obs => this.observe(obs, fn)); + } + + monitorActiveJobs(fn: MonitorCallback) { + this.activeJobs.addCallback(obs => this.observe(obs, fn)); + } + + incResolvedJobs(proofType: ProvingRequestType) { + this.resolvedJobs.add(1, { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + + incRejectedJobs(proofType: ProvingRequestType) { + this.rejectedJobs.add(1, { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + + incRetriedJobs(proofType: ProvingRequestType) { + this.retriedJobs.add(1, { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + + incTimedOutJobs(proofType: ProvingRequestType) { + this.timedOutJobs.add(1, { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + + recordJobWait(proofType: ProvingRequestType, msOrTimer: Timer | number) { + const duration = typeof msOrTimer === 'number' ? msOrTimer : Math.floor(msOrTimer.ms()); + this.jobWait.record(duration, { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + + recordJobDuration(proofType: ProvingRequestType, msOrTimer: Timer | number) { + const duration = typeof msOrTimer === 'number' ? msOrTimer : Math.floor(msOrTimer.ms()); + this.jobDuration.record(duration, { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + + private observe(obs: ObservableResult, fn: MonitorCallback) { + for (const proofType of Object.values(ProvingRequestType)) { + // a type predicate for TypeScript to recognize that we're only iterating over enum values + if (typeof proofType !== 'number') { + continue; + } + obs.observe(fn(proofType), { + [Attributes.PROVING_JOB_TYPE]: ProvingRequestType[proofType], + }); + } + } +} diff --git a/yarn-project/prover-client/src/test/mock_prover.ts b/yarn-project/prover-client/src/test/mock_prover.ts index c0ea23c2643..30a26cd7838 100644 --- a/yarn-project/prover-client/src/test/mock_prover.ts +++ b/yarn-project/prover-client/src/test/mock_prover.ts @@ -43,6 +43,7 @@ import { makeRootRollupPublicInputs, } from '@aztec/circuits.js/testing'; import { times } from '@aztec/foundation/collection'; +import { NoopTelemetryClient } from '@aztec/telemetry-client/noop'; import { InlineProofStore, type ProofStore } from '../proving_broker/proof_store.js'; import { ProvingAgent } from '../proving_broker/proving_agent.js'; @@ -50,7 +51,7 @@ import { ProvingBroker } from '../proving_broker/proving_broker.js'; import { InMemoryBrokerDatabase } from '../proving_broker/proving_broker_database/memory.js'; export class TestBroker implements ProvingJobProducer { - private broker = new ProvingBroker(new InMemoryBrokerDatabase()); + private broker = new ProvingBroker(new InMemoryBrokerDatabase(), new NoopTelemetryClient()); private agents: ProvingAgent[]; constructor( @@ -58,7 +59,7 @@ export class TestBroker implements ProvingJobProducer { prover: ServerCircuitProver, private proofStore: ProofStore = new InlineProofStore(), ) { - this.agents = times(agentCount, () => new ProvingAgent(this.broker, proofStore, prover)); + this.agents = times(agentCount, () => new ProvingAgent(this.broker, proofStore, prover, new NoopTelemetryClient())); } public async start() { diff --git a/yarn-project/prover-node/src/factory.ts b/yarn-project/prover-node/src/factory.ts index 7190d81ee66..8d2db37c623 100644 --- a/yarn-project/prover-node/src/factory.ts +++ b/yarn-project/prover-node/src/factory.ts @@ -47,7 +47,7 @@ export async function createProverNode( const worldStateSynchronizer = await createWorldStateSynchronizer(worldStateConfig, archiver, telemetry); await worldStateSynchronizer.start(); - const broker = deps.broker ?? (await createAndStartProvingBroker(config)); + const broker = deps.broker ?? (await createAndStartProvingBroker(config, telemetry)); const prover = await createProverClient(config, worldStateSynchronizer, broker, telemetry); // REFACTOR: Move publisher out of sequencer package and into an L1-related package diff --git a/yarn-project/telemetry-client/src/lmdb_metrics.ts b/yarn-project/telemetry-client/src/lmdb_metrics.ts index c8efc91a801..a8e70662d65 100644 --- a/yarn-project/telemetry-client/src/lmdb_metrics.ts +++ b/yarn-project/telemetry-client/src/lmdb_metrics.ts @@ -1,38 +1,47 @@ -import { type Gauge, type Meter, type Metrics, ValueType } from './telemetry.js'; +import { type BatchObservableResult, type Meter, type Metrics, type ObservableGauge, ValueType } from './telemetry.js'; export type LmdbMetricDescriptor = { name: Metrics; description: string; }; +export type LmdbStatsCallback = () => { mappingSize: number; numItems: number; actualSize: number }; + export class LmdbMetrics { - private dbMapSize: Gauge; - private dbUsedSize: Gauge; - private dbNumItems: Gauge; + private dbMapSize: ObservableGauge; + private dbUsedSize: ObservableGauge; + private dbNumItems: ObservableGauge; constructor( meter: Meter, dbMapSizeDescriptor: LmdbMetricDescriptor, dbUsedSizeDescriptor: LmdbMetricDescriptor, dbNumItemsDescriptor: LmdbMetricDescriptor, + private getStats?: LmdbStatsCallback, ) { - this.dbMapSize = meter.createGauge(dbMapSizeDescriptor.name, { + this.dbMapSize = meter.createObservableGauge(dbMapSizeDescriptor.name, { description: dbMapSizeDescriptor.description, valueType: ValueType.INT, }); - this.dbUsedSize = meter.createGauge(dbUsedSizeDescriptor.name, { + this.dbUsedSize = meter.createObservableGauge(dbUsedSizeDescriptor.name, { description: dbUsedSizeDescriptor.description, valueType: ValueType.INT, }); - this.dbNumItems = meter.createGauge(dbNumItemsDescriptor.name, { + this.dbNumItems = meter.createObservableGauge(dbNumItemsDescriptor.name, { description: dbNumItemsDescriptor.description, valueType: ValueType.INT, }); - } - public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) { - this.dbMapSize.record(metrics.mappingSize); - this.dbNumItems.record(metrics.actualSize); - this.dbUsedSize.record(metrics.actualSize); + meter.addBatchObservableCallback(this.recordDBMetrics, [this.dbMapSize, this.dbUsedSize, this.dbNumItems]); } + + private recordDBMetrics = (observable: BatchObservableResult) => { + if (!this.getStats) { + return; + } + const metrics = this.getStats(); + observable.observe(this.dbMapSize, metrics.mappingSize); + observable.observe(this.dbNumItems, metrics.numItems); + observable.observe(this.dbUsedSize, metrics.actualSize); + }; } diff --git a/yarn-project/telemetry-client/src/metrics.ts b/yarn-project/telemetry-client/src/metrics.ts index 853ce0bb58f..d737e6dd863 100644 --- a/yarn-project/telemetry-client/src/metrics.ts +++ b/yarn-project/telemetry-client/src/metrics.ts @@ -79,6 +79,18 @@ export const PROVING_ORCHESTRATOR_BASE_ROLLUP_INPUTS_DURATION = export const PROVING_QUEUE_JOB_SIZE = 'aztec.proving_queue.job_size'; export const PROVING_QUEUE_SIZE = 'aztec.proving_queue.size'; +export const PROVING_QUEUE_ACTIVE_JOBS = 'aztec.proving_queue.active_jobs'; +export const PROVING_QUEUE_RESOLVED_JOBS = 'aztec.proving_queue.resolved_jobs'; +export const PROVING_QUEUE_REJECTED_JOBS = 'aztec.proving_queue.rejected_jobs'; +export const PROVING_QUEUE_RETRIED_JOBS = 'aztec.proving_queue.retried_jobs'; +export const PROVING_QUEUE_TIMED_OUT_JOBS = 'aztec.proving_queue.timed_out_jobs'; +export const PROVING_QUEUE_JOB_WAIT = 'aztec.proving_queue.job_wait'; +export const PROVING_QUEUE_JOB_DURATION = 'aztec.proving_queue.job_duration'; +export const PROVING_QUEUE_DB_NUM_ITEMS = 'aztec.proving_queue.db.num_items'; +export const PROVING_QUEUE_DB_MAP_SIZE = 'aztec.proving_queue.db.map_size'; +export const PROVING_QUEUE_DB_USED_SIZE = 'aztec.proving_queue.db.used_size'; + +export const PROVING_AGENT_IDLE = 'aztec.proving_queue.agent.idle'; export const PROVER_NODE_JOB_DURATION = 'aztec.prover_node.job_duration'; diff --git a/yarn-project/telemetry-client/src/telemetry.ts b/yarn-project/telemetry-client/src/telemetry.ts index a481690f155..60e55b8b1c6 100644 --- a/yarn-project/telemetry-client/src/telemetry.ts +++ b/yarn-project/telemetry-client/src/telemetry.ts @@ -1,9 +1,13 @@ import { type AttributeValue, + type BatchObservableCallback, type MetricOptions, + type Observable, + type BatchObservableResult as OtelBatchObservableResult, type Gauge as OtelGauge, type Histogram as OtelHistogram, type ObservableGauge as OtelObservableGauge, + type ObservableResult as OtelObservableResult, type ObservableUpDownCounter as OtelObservableUpDownCounter, type UpDownCounter as OtelUpDownCounter, type Span, @@ -31,6 +35,8 @@ export type Histogram = OtelHistogram; export type UpDownCounter = OtelUpDownCounter; export type ObservableGauge = OtelObservableGauge; export type ObservableUpDownCounter = OtelObservableUpDownCounter; +export type ObservableResult = OtelObservableResult; +export type BatchObservableResult = OtelBatchObservableResult; export { Tracer }; @@ -53,6 +59,16 @@ export interface Meter { */ createObservableGauge(name: Metrics, options?: MetricOptions): ObservableGauge; + addBatchObservableCallback( + callback: BatchObservableCallback, + observables: Observable[], + ): void; + + removeBatchObservableCallback( + callback: BatchObservableCallback, + observables: Observable[], + ): void; + /** * Creates a new histogram instrument. A histogram is a metric that samples observations (usually things like request durations or response sizes) and counts them in configurable buckets. * @param name - The name of the histogram