From a73331a70a6ae65e826a9c26515ca37991e01128 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 28 Jun 2024 12:49:41 +1200 Subject: [PATCH] fix: Report worker metrics even when no messages in Stream (#844) We skip reporting metrics if there are no messages in the pre-fetch queue/Redis Stream. This is especially problematic for `EXECUTOR_UP`, as we won't increment the metric even though we are processing. This PR moves the metrics logic so that it is always reported, even when no messages in the stream. --- runner/src/stream-handler/worker.ts | 30 +++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 6c0759bb..d9e77184 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -103,10 +103,26 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise while (true) { METRICS.EXECUTOR_UP.labels({ indexer: indexerConfig.fullName() }).inc(); + const metricsSpan = tracer.startSpan('Record metrics after processing block', {}, context.active()); + + const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(indexerConfig.redisStreamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerConfig.fullName() }).set(unprocessedMessageCount); + + const memoryUsage = process.memoryUsage(); + METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapTotal / (1024 * 1024)); + METRICS.HEAP_USED.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapUsed / (1024 * 1024)); + METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerConfig.fullName() }).set(workerContext.queue.length); + + const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() }; + parentPort?.postMessage(metricsMessage); + + metricsSpan.end(); + if (workerContext.queue.length === 0) { await sleep(100); continue; } + await tracer.startActiveSpan(`${indexerConfig.fullName()}`, async (parentSpan: Span) => { parentSpan.setAttribute('indexer', indexerConfig.fullName()); parentSpan.setAttribute('account', indexerConfig.accountId); @@ -168,20 +184,6 @@ async function blockQueueConsumer (workerContext: WorkerContext): Promise await sleep(10000); sleepSpan.end(); } finally { - const metricsSpan = tracer.startSpan('Record metrics after processing block', {}, context.active()); - - const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(indexerConfig.redisStreamKey); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerConfig.fullName() }).set(unprocessedMessageCount); - - const memoryUsage = process.memoryUsage(); - METRICS.HEAP_TOTAL_ALLOCATION.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapTotal / (1024 * 1024)); - METRICS.HEAP_USED.labels({ indexer: indexerConfig.fullName() }).set(memoryUsage.heapUsed / (1024 * 1024)); - METRICS.PREFETCH_QUEUE_COUNT.labels({ indexer: indexerConfig.fullName() }).set(workerContext.queue.length); - - const metricsMessage: WorkerMessage = { type: WorkerMessageType.METRICS, data: await promClient.register.getMetricsAsJSON() }; - parentPort?.postMessage(metricsMessage); - - metricsSpan.end(); parentSpan.end(); } });