Skip to content

Commit

Permalink
fix: Ensure Indexer Config Updates Are Read in Runner (#384)
Browse files Browse the repository at this point in the history
Runner sets Indexer Config when the thread is started. As a result, it
does not react to updates to that config, such as updates to code. This
is a problem as that means unless runner is restarted, published code
changes won't be used. I've changed it so that the config is read each
iteration of the loop. That way, config updates will be consumed. In the
short term, this can be tuned such as reading every X loops and on every
failure, if need be. In the long term, improved communication between
coordinator and runner can facilitate coordinator communicating to
runner to read the config as opposed to doing so all the time.

In addition, the metrics for block wait duration and overall execution
duration were wrong. I've moved the start time to the correct spot.
  • Loading branch information
darunrs authored Nov 14, 2023
1 parent 3591090 commit 01ede89
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,33 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri

async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const indexer = new Indexer();
const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey);
const indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;
const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
const isHistorical = workerContext.streamType === 'historical';
let streamMessageId = '';
let indexerName = '';

while (true) {
let streamMessageId = '';
try {
while (workerContext.queue.length === 0) {
await sleep(100);
}
const startTime = performance.now();
const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey);
indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;
const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
const blockStartTime = performance.now();
const queueMessage = await workerContext.queue.at(0);
if (queueMessage === undefined) {
continue;
}
const startTime = performance.now();
const blockStartTime = startTime;
const block = queueMessage.block;
const isHistorical = workerContext.streamType === 'historical';
streamMessageId = queueMessage.streamMessageId;

if (block === undefined || block.blockHeight == null) {
Expand All @@ -120,9 +121,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri

METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime);

if (isHistorical) {
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight);
}
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight);

console.log(`Success: ${indexerName}`);
} catch (err) {
Expand Down

0 comments on commit 01ede89

Please sign in to comment.