Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: agent and broker expose OTEL metrics #10264

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions yarn-project/archiver/src/archiver/archiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class Archiver implements ArchiveSource {
config.l1Contracts.registryAddress,
archiverStore,
config.archiverPollingIntervalMS ?? 10_000,
new ArchiverInstrumentation(telemetry),
new ArchiverInstrumentation(telemetry, () => archiverStore.estimateSize()),
{ l1StartBlock, l1GenesisTime, epochDuration, slotDuration, ethereumSlotDuration },
);
await archiver.start(blockUntilSynced);
Expand Down Expand Up @@ -271,9 +271,6 @@ export class Archiver implements ArchiveSource {
// the chain locally before we start unwinding stuff. This can be optimized by figuring out
// up to which point we're pruning, and then requesting L2 blocks up to that point only.
await this.handleEpochPrune(provenBlockNumber, currentL1BlockNumber);

const storeSizes = this.store.estimateSize();
this.instrumentation.recordDBMetrics(storeSizes);
Comment on lines -275 to -276
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These stats are now automatically captured using observables.

}
}

Expand Down
8 changes: 3 additions & 5 deletions yarn-project/archiver/src/archiver/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type Gauge,
type Histogram,
LmdbMetrics,
type LmdbStatsCallback,
Metrics,
type TelemetryClient,
type UpDownCounter,
Expand All @@ -23,7 +24,7 @@ export class ArchiverInstrumentation {

private log = createDebugLogger('aztec:archiver:instrumentation');

constructor(private telemetry: TelemetryClient) {
constructor(private telemetry: TelemetryClient, lmdbStats?: LmdbStatsCallback) {
const meter = telemetry.getMeter('Archiver');
this.blockHeight = meter.createGauge(Metrics.ARCHIVER_BLOCK_HEIGHT, {
description: 'The height of the latest block processed by the archiver',
Expand Down Expand Up @@ -72,13 +73,10 @@ export class ArchiverInstrumentation {
name: Metrics.ARCHIVER_DB_NUM_ITEMS,
description: 'Num items in the archiver database',
},
lmdbStats,
);
}

public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) {
this.dbMetrics.recordDBMetrics(metrics);
}

public isEnabled(): boolean {
return this.telemetry.isEnabled();
}
Expand Down
13 changes: 12 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,18 @@ export async function startProverAgent(
);
const prover = await buildServerCircuitProver(config, telemetry);
const proofStore = new InlineProofStore();
const agents = times(config.proverAgentCount, () => new ProvingAgent(broker, proofStore, prover));
const agents = times(
config.proverAgentCount,
() =>
new ProvingAgent(
broker,
proofStore,
prover,
telemetry,
config.proverAgentProofTypes,
config.proverAgentPollIntervalMs,
),
);

await Promise.all(agents.map(agent => agent.start()));

Expand Down
7 changes: 6 additions & 1 deletion yarn-project/aztec/src/cli/cmds/start_prover_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { type NamespacedApiHandlers } from '@aztec/foundation/json-rpc/server';
import { type LogFn } from '@aztec/foundation/log';
import { ProvingJobBrokerSchema, createAndStartProvingBroker } from '@aztec/prover-client/broker';
import { getProverNodeBrokerConfigFromEnv } from '@aztec/prover-node';
import {
createAndStartTelemetryClient,
getConfigEnvVars as getTelemetryClientConfig,
} from '@aztec/telemetry-client/start';

import { extractRelevantOptions } from '../util.js';

Expand All @@ -22,7 +26,8 @@ export async function startProverBroker(
...extractRelevantOptions<ProverBrokerConfig>(options, proverBrokerConfigMappings, 'proverBroker'), // override with command line options
};

const broker = await createAndStartProvingBroker(config);
const client = await createAndStartTelemetryClient(getTelemetryClientConfig());
const broker = await createAndStartProvingBroker(config, client);
services.proverBroker = [broker, ProvingJobBrokerSchema];
signalHandlers.push(() => broker.stop());

Expand Down
8 changes: 3 additions & 5 deletions yarn-project/p2p/src/mem_pools/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
Attributes,
type Histogram,
LmdbMetrics,
type LmdbStatsCallback,
Metrics,
type TelemetryClient,
type UpDownCounter,
Expand Down Expand Up @@ -58,7 +59,7 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {

private defaultAttributes;

constructor(telemetry: TelemetryClient, name: PoolName) {
constructor(telemetry: TelemetryClient, name: PoolName, dbStats?: LmdbStatsCallback) {
const meter = telemetry.getMeter(name);
this.defaultAttributes = { [Attributes.POOL_NAME]: name };

Expand Down Expand Up @@ -98,13 +99,10 @@ export class PoolInstrumentation<PoolObject extends Gossipable> {
name: Metrics.MEMPOOL_DB_NUM_ITEMS,
description: 'Num items in database for the Tx mempool',
},
dbStats,
);
}

public recordDBMetrics(metrics: { mappingSize: number; numItems: number; actualSize: number }) {
this.dbMetrics.recordDBMetrics(metrics);
}

public recordSize(poolObject: PoolObject) {
this.objectSize.record(poolObject.getSize());
}
Expand Down
4 changes: 1 addition & 3 deletions yarn-project/p2p/src/mem_pools/tx_pool/aztec_kv_tx_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export class AztecKVTxPool implements TxPool {

this.#store = store;
this.#log = log;
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL);
this.#metrics = new PoolInstrumentation(telemetry, PoolName.TX_POOL, () => store.estimateSize());
}

public markAsMined(txHashes: TxHash[], blockNumber: number): Promise<void> {
Expand All @@ -53,8 +53,6 @@ export class AztecKVTxPool implements TxPool {
}
this.#metrics.recordRemovedObjects(deleted, 'pending');
this.#metrics.recordAddedObjects(txHashes.length, 'mined');
const storeSizes = this.#store.estimateSize();
this.#metrics.recordDBMetrics(storeSizes);
});
}

Expand Down
10 changes: 9 additions & 1 deletion yarn-project/prover-client/src/prover-client/prover-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,15 @@ export class ProverClient implements EpochProverManager {
const prover = await buildServerCircuitProver(this.config, this.telemetry);
this.agents = times(
this.config.proverAgentCount,
() => new ProvingAgent(this.agentClient!, proofStore, prover, [], this.config.proverAgentPollIntervalMs),
() =>
new ProvingAgent(
this.agentClient!,
proofStore,
prover,
this.telemetry,
[],
this.config.proverAgentPollIntervalMs,
),
);

await Promise.all(this.agents.map(agent => agent.start()));
Expand Down
10 changes: 7 additions & 3 deletions yarn-project/prover-client/src/proving_broker/factory.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { type ProverBrokerConfig } from '@aztec/circuit-types';
import { AztecLmdbStore } from '@aztec/kv-store/lmdb';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { ProvingBroker } from './proving_broker.js';
import { InMemoryBrokerDatabase } from './proving_broker_database/memory.js';
import { KVBrokerDatabase } from './proving_broker_database/persisted.js';

export async function createAndStartProvingBroker(config: ProverBrokerConfig): Promise<ProvingBroker> {
export async function createAndStartProvingBroker(
config: ProverBrokerConfig,
client: TelemetryClient,
): Promise<ProvingBroker> {
const database = config.proverBrokerDataDirectory
? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory))
? new KVBrokerDatabase(AztecLmdbStore.open(config.proverBrokerDataDirectory), client)
: new InMemoryBrokerDatabase();

const broker = new ProvingBroker(database, {
const broker = new ProvingBroker(database, client, {
jobTimeoutMs: config.proverBrokerJobTimeoutMs,
maxRetries: config.proverBrokerJobMaxRetries,
timeoutIntervalMs: config.proverBrokerPollIntervalMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { makeBaseParityInputs, makeParityPublicInputs } from '@aztec/circuits.js
import { randomBytes } from '@aztec/foundation/crypto';
import { AbortError } from '@aztec/foundation/error';
import { promiseWithResolvers } from '@aztec/foundation/promise';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { jest } from '@jest/globals';

Expand Down Expand Up @@ -50,7 +51,7 @@ describe('ProvingAgent', () => {
saveProofOutput: jest.fn(),
};

agent = new ProvingAgent(jobSource, proofDB, prover, [ProvingRequestType.BASE_PARITY]);
agent = new ProvingAgent(jobSource, proofDB, prover, new NoopTelemetryClient(), [ProvingRequestType.BASE_PARITY]);
});

afterEach(async () => {
Expand Down
15 changes: 15 additions & 0 deletions yarn-project/prover-client/src/proving_broker/proving_agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import {
} from '@aztec/circuit-types';
import { createDebugLogger } from '@aztec/foundation/log';
import { RunningPromise } from '@aztec/foundation/running-promise';
import { Timer } from '@aztec/foundation/timer';
import { type TelemetryClient } from '@aztec/telemetry-client';

import { type ProofStore } from './proof_store.js';
import { ProvingAgentInstrumentation } from './proving_agent_instrumentation.js';
import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_controller.js';

/**
Expand All @@ -20,6 +23,8 @@ import { ProvingJobController, ProvingJobControllerStatus } from './proving_job_
export class ProvingAgent {
private currentJobController?: ProvingJobController;
private runningPromise: RunningPromise;
private instrumentation: ProvingAgentInstrumentation;
private idleTimer: Timer | undefined;

constructor(
/** The source of proving jobs */
Expand All @@ -28,12 +33,15 @@ export class ProvingAgent {
private proofStore: ProofStore,
/** The prover implementation to defer jobs to */
private circuitProver: ServerCircuitProver,
/** A telemetry client through which to emit metrics */
client: TelemetryClient,
/** Optional list of allowed proof types to build */
private proofAllowList: Array<ProvingRequestType> = [],
/** How long to wait between jobs */
private pollIntervalMs = 1000,
private log = createDebugLogger('aztec:prover-client:proving-agent'),
) {
this.instrumentation = new ProvingAgentInstrumentation(client);
this.runningPromise = new RunningPromise(this.safeWork, this.pollIntervalMs);
}

Expand All @@ -46,6 +54,7 @@ export class ProvingAgent {
}

public start(): void {
this.idleTimer = new Timer();
this.runningPromise.start();
}

Expand Down Expand Up @@ -114,6 +123,11 @@ export class ProvingAgent {
);
}

if (this.idleTimer) {
this.instrumentation.recordIdleTime(this.idleTimer);
}
this.idleTimer = undefined;

this.currentJobController.start();
} catch (err) {
this.log.error(`Error in ProvingAgent: ${String(err)}`);
Expand All @@ -126,6 +140,7 @@ export class ProvingAgent {
err: Error | undefined,
result: ProvingJobResultsMap[T] | undefined,
) => {
this.idleTimer = new Timer();
if (err) {
const retry = err.name === ProvingError.NAME ? (err as ProvingError).retry : false;
this.log.error(`Job id=${jobId} type=${ProvingRequestType[type]} failed err=${err.message} retry=${retry}`, err);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { type Timer } from '@aztec/foundation/timer';
import { type Histogram, Metrics, type TelemetryClient, ValueType } from '@aztec/telemetry-client';

export class ProvingAgentInstrumentation {
private idleTime: Histogram;

constructor(client: TelemetryClient, name = 'ProvingAgent') {
const meter = client.getMeter(name);

this.idleTime = meter.createHistogram(Metrics.PROVING_AGENT_IDLE, {
description: 'Records how long an agent was idle',
unit: 'ms',
valueType: ValueType.INT,
});
}

recordIdleTime(msOrTimer: Timer | number) {
const duration = typeof msOrTimer === 'number' ? msOrTimer : Math.floor(msOrTimer.ms());
this.idleTime.record(duration);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type ProofUri, type ProvingJob, type ProvingJobId, ProvingRequestType } from '@aztec/circuit-types';
import { randomBytes } from '@aztec/foundation/crypto';
import { openTmpStore } from '@aztec/kv-store/utils';
import { NoopTelemetryClient } from '@aztec/telemetry-client/noop';

import { jest } from '@jest/globals';

Expand All @@ -17,7 +18,7 @@ describe.each([
() => ({ database: new InMemoryBrokerDatabase(), cleanup: undefined }),
() => {
const store = openTmpStore(true);
const database = new KVBrokerDatabase(store);
const database = new KVBrokerDatabase(store, new NoopTelemetryClient());
const cleanup = () => store.close();
return { database, cleanup };
},
Expand All @@ -35,7 +36,7 @@ describe.each([
maxRetries = 2;
({ database, cleanup } = createDb());

broker = new ProvingBroker(database, {
broker = new ProvingBroker(database, new NoopTelemetryClient(), {
jobTimeoutMs,
timeoutIntervalMs: jobTimeoutMs / 4,
maxRetries,
Expand Down Expand Up @@ -409,7 +410,7 @@ describe.each([
// fake some time passing while the broker restarts
await jest.advanceTimersByTimeAsync(10_000);

broker = new ProvingBroker(database);
broker = new ProvingBroker(database, new NoopTelemetryClient());
await broker.start();

await assertJobStatus(job1.id, 'in-queue');
Expand Down Expand Up @@ -470,7 +471,7 @@ describe.each([
// fake some time passing while the broker restarts
await jest.advanceTimersByTimeAsync(10_000);

broker = new ProvingBroker(database);
broker = new ProvingBroker(database, new NoopTelemetryClient());
await broker.start();

await assertJobStatus(job1.id, 'in-queue');
Expand Down Expand Up @@ -521,7 +522,7 @@ describe.each([
// fake some time passing while the broker restarts
await jest.advanceTimersByTimeAsync(100 * jobTimeoutMs);

broker = new ProvingBroker(database);
broker = new ProvingBroker(database, new NoopTelemetryClient());
await broker.start();
await assertJobStatus(job1.id, 'in-queue');

Expand Down
Loading
Loading