Skip to content

Commit

Permalink
fix: Report worker metrics even when no messages in Stream (#844)
Browse files Browse the repository at this point in the history
We skip reporting metrics if there are no messages in the pre-fetch
queue/Redis Stream. This is especially problematic for `EXECUTOR_UP`, as
we won't increment the metric even though we are processing.

This PR moves the metrics logic so that it is always reported, even when
no messages in the stream.
  • Loading branch information
morgsmccauley authored Jun 28, 2024
1 parent adaadfc commit a73331a
Showing 1 changed file with 16 additions and 14 deletions.
30 changes: 16 additions & 14 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,26 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>
while (true) {
METRICS.EXECUTOR_UP.labels({ indexer: indexerConfig.fullName() }).inc();

const metricsSpan = tracer.startSpan('Record metrics after processing block', {}, context.active());

const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(indexerConfig.redisStreamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerConfig.fullName() }).set(unprocessedMessageCount);

const memoryUsage = process.memoryUsage();
METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapTotal / (1024 * 1024));
METRICS.HEAP_USED.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapUsed / (1024 * 1024));
METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerConfig.fullName() }).set(workerContext.queue.length);

const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() };
parentPort?.postMessage(metricsMessage);

metricsSpan.end();

if (workerContext.queue.length === 0) {
await sleep(100);
continue;
}

await tracer.startActiveSpan(`${indexerConfig.fullName()}`, async (parentSpan: Span) => {
parentSpan.setAttribute('indexer', indexerConfig.fullName());
parentSpan.setAttribute('account', indexerConfig.accountId);
Expand Down Expand Up @@ -168,20 +184,6 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise<void>
await sleep(10000);
sleepSpan.end();
} finally {
const metricsSpan = tracer.startSpan('Record metrics after processing block', {}, context.active());

const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(indexerConfig.redisStreamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerConfig.fullName() }).set(unprocessedMessageCount);

const memoryUsage = process.memoryUsage();
METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapTotal / (1024 * 1024));
METRICS.HEAP_USED.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapUsed / (1024 * 1024));
METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerConfig.fullName() }).set(workerContext.queue.length);

const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() };
parentPort?.postMessage(metricsMessage);

metricsSpan.end();
parentSpan.end();
}
});
Expand Down

0 comments on commit a73331a

Please sign in to comment.