From 01ede89deb5b50b27d056979769b9dc0d2798efc Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Mon, 13 Nov 2023 16:58:02 -0800 Subject: [PATCH] fix: Ensure Indexer Config Updates Are Read in Runner (#384) Runner sets Indexer Config when the thread is started. As a result, it does not react to updates to that config, such as updates to code. This is a problem as that means unless runner is restarted, published code changes won't be used. I've changed it so that the config is read each iteration of the loop. That way, config updates will be consumed. In the short term, this can be tuned such as reading every X loops and on every failure, if need be. In the long term, improved communication between coordinator and runner can facilitate coordinator communicating to runner to read the config as opposed to doing so all the time. In addition, the metrics for block wait duration and overall execution duration were wrong. I've moved the start time to the correct spot. --- runner/src/stream-handler/worker.ts | 35 ++++++++++++++--------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 785c755a9..ed1d1061e 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -80,32 +80,33 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise { const indexer = new Indexer(); - const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); - const indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; - const functions = { - [indexerName]: { - account_id: indexerConfig.account_id, - function_name: indexerConfig.function_name, - code: indexerConfig.code, - schema: indexerConfig.schema, - provisioned: false, - }, - }; + const isHistorical = workerContext.streamType === 'historical'; + let streamMessageId = ''; + let indexerName = ''; while (true) { - let streamMessageId = ''; try { while (workerContext.queue.length === 0) { await sleep(100); } + const startTime = performance.now(); + const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); + indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; + const functions = { + [indexerName]: { + account_id: indexerConfig.account_id, + function_name: indexerConfig.function_name, + code: indexerConfig.code, + schema: indexerConfig.schema, + provisioned: false, + }, + }; + const blockStartTime = performance.now(); const queueMessage = await workerContext.queue.at(0); if (queueMessage === undefined) { continue; } - const startTime = performance.now(); - const blockStartTime = startTime; const block = queueMessage.block; - const isHistorical = workerContext.streamType === 'historical'; streamMessageId = queueMessage.streamMessageId; if (block === undefined || block.blockHeight == null) { @@ -120,9 +121,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); - if (isHistorical) { - METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); - } + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); console.log(`Success: ${indexerName}`); } catch (err) {