From a2eb9b2483f413f32e6253d1355daf0c9f062281 Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Sat, 29 Feb 2020 19:31:45 -0800 Subject: [PATCH] fix: Redis query queue locking redesign Usage of time as sorting active queries list leads to random flipping of active keys. Such behavior leads to plural execution of single query in queue under load. Replace active keys time sorting with unique `processingId` locks for queries. Fixes #459 --- .../orchestrator/LocalQueueDriver.js | 68 ++++-- .../orchestrator/QueryOrchestrator.js | 2 +- .../orchestrator/QueryQueue.js | 226 ++++++++++-------- .../orchestrator/RedisFactory.js | 19 +- .../orchestrator/RedisPool.js | 16 +- .../orchestrator/RedisQueueDriver.js | 90 +++++-- .../test/QueryQueue.test.js | 13 +- 7 files changed, 291 insertions(+), 143 deletions(-) diff --git a/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js index 3f68ca69f7d27..112ae30b08747 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js @@ -15,6 +15,9 @@ class LocalQueueDriverConnection { this.toProcess = driver.toProcess; this.recent = driver.recent; this.active = driver.active; + this.heartBeat = driver.heartBeat; + this.processingCounter = driver.processingCounter; + this.processingLocks = driver.processingLocks; } getResultPromise(resultListKey) { @@ -30,12 +33,16 @@ class LocalQueueDriverConnection { async getResultBlocking(queryKey) { const resultListKey = this.resultListKey(queryKey); + if (!this.queryDef[this.redisHash(queryKey)] && !this.resultPromises[resultListKey]) { + return null; + } const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout)); const res = await Promise.race([ this.getResultPromise(resultListKey), timeoutPromise(this.continueWaitTimeout * 1000), ]); + if (res) { delete this.resultPromises[resultListKey]; } @@ -98,28 +105,33 @@ class LocalQueueDriverConnection { const key = this.redisHash(queryKey); const query = this.queryDef[key]; delete this.active[key]; + delete this.heartBeat[key]; delete this.toProcess[key]; delete this.recent[key]; delete this.queryDef[key]; + delete this.processingLocks[key]; return [query]; } - setResultAndRemoveQuery(queryKey, executionResult) { + async setResultAndRemoveQuery(queryKey, executionResult, processingId) { const key = this.redisHash(queryKey); + if (this.processingLocks[key] !== processingId) { + return false; + } const promise = this.getResultPromise(this.resultListKey(queryKey)); delete this.active[key]; + delete this.heartBeat[key]; delete this.toProcess[key]; delete this.recent[key]; delete this.queryDef[key]; + delete this.processingLocks[key]; promise.resolve(executionResult); + return true; } - removeQuery(queryKey) { - const key = this.redisHash(queryKey); - delete this.active[key]; - delete this.toProcess[key]; - delete this.recent[key]; - delete this.queryDef[key]; + getNextProcessingId() { + this.processingCounter.counter = this.processingCounter.counter ? this.processingCounter.counter + 1 : 1; + return this.processingCounter.counter; } getOrphanedQueries() { @@ -127,7 +139,7 @@ class LocalQueueDriverConnection { } getStalledQueries() { - return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000); + return this.queueArray(this.heartBeat, new Date().getTime() - this.heartBeatTimeout * 1000); } async getQueryStageState(onlyKeys) { @@ -140,24 +152,43 @@ class LocalQueueDriverConnection { updateHeartBeat(queryKey) { const key = this.redisHash(queryKey); - if (this.active[key]) { - this.active[key] = { key, order: new Date().getTime() }; + if (this.heartBeat[key]) { + this.heartBeat[key] = { key, order: new Date().getTime() }; } } - retrieveForProcessing(queryKey) { + retrieveForProcessing(queryKey, processingId) { const key = this.redisHash(queryKey); + let lockAcquired = false; + if (!this.processingLocks[key]) { + this.processingLocks[key] = processingId; + lockAcquired = true; + } let added = 0; if (Object.keys(this.active).length < this.concurrency && !this.active[key]) { - this.active[key] = { key, order: new Date().getTime() }; + this.active[key] = { key, order: processingId }; added = 1; } - return [added, null, this.queueArray(this.active), Object.keys(this.toProcess).length]; // TODO nulls + this.heartBeat[key] = { key, order: new Date().getTime() }; + return [ + added, null, this.queueArray(this.active), Object.keys(this.toProcess).length, this.queryDef[key], lockAcquired + ]; // TODO nulls } - async optimisticQueryUpdate(queryKey, toUpdate) { + freeProcessingLock(queryKey, processingId) { const key = this.redisHash(queryKey); + if (this.processingLocks[key] === processingId) { + delete this.processingLocks[key]; + } + } + + async optimisticQueryUpdate(queryKey, toUpdate, processingId) { + const key = this.redisHash(queryKey); + if (this.processingLocks[key] !== processingId) { + return false; + } this.queryDef[key] = { ...this.queryDef[key], ...toUpdate }; + return true; } release() { @@ -182,6 +213,9 @@ const queryDef = {}; const toProcess = {}; const recent = {}; const active = {}; +const heartBeat = {}; +const processingCounters = {}; +const processingLocks = {}; class LocalQueueDriver extends BaseQueueDriver { constructor(options) { @@ -193,12 +227,18 @@ class LocalQueueDriver extends BaseQueueDriver { toProcess[options.redisQueuePrefix] = toProcess[options.redisQueuePrefix] || {}; recent[options.redisQueuePrefix] = recent[options.redisQueuePrefix] || {}; active[options.redisQueuePrefix] = active[options.redisQueuePrefix] || {}; + heartBeat[options.redisQueuePrefix] = heartBeat[options.redisQueuePrefix] || {}; + processingCounters[options.redisQueuePrefix] = processingCounters[options.redisQueuePrefix] || {}; + processingLocks[options.redisQueuePrefix] = processingLocks[options.redisQueuePrefix] || {}; this.results = results[options.redisQueuePrefix]; this.resultPromises = resultPromises[options.redisQueuePrefix]; this.queryDef = queryDef[options.redisQueuePrefix]; this.toProcess = toProcess[options.redisQueuePrefix]; this.recent = recent[options.redisQueuePrefix]; this.active = active[options.redisQueuePrefix]; + this.heartBeat = heartBeat[options.redisQueuePrefix]; + this.processingCounter = processingCounters[options.redisQueuePrefix]; + this.processingLocks = processingLocks[options.redisQueuePrefix]; } createConnection() { diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js b/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js index 7b0c9c30d8129..eda9d291d26ee 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js @@ -9,7 +9,6 @@ class QueryOrchestrator { this.redisPrefix = redisPrefix; this.driverFactory = driverFactory; this.logger = logger; - const redisPool = new RedisPool(); const { externalDriverFactory } = options; const cacheAndQueueDriver = options.cacheAndQueueDriver || process.env.CUBEJS_CACHE_AND_QUEUE_DRIVER || ( process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? 'redis' : 'memory' @@ -17,6 +16,7 @@ class QueryOrchestrator { if (cacheAndQueueDriver !== 'redis' && cacheAndQueueDriver !== 'memory') { throw new Error(`Only 'redis' or 'memory' are supported for cacheAndQueueDriver option`); } + const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool() : undefined; this.redisPool = redisPool; this.queryCache = new QueryCache( diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js index 9f112686f74f0..d944a91cd125d 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js @@ -62,7 +62,7 @@ class QueryQueue { }); } - await this.reconcileQueue(redisClient); + await this.reconcileQueue(); const queryDef = await redisClient.getQueryDef(queryKey); const [active, toProcess] = await redisClient.getQueryStageState(true); @@ -101,34 +101,55 @@ class QueryQueue { } } - async reconcileQueue(redisClient) { - const toCancel = ( - await redisClient.getStalledQueries() - ).concat( - await redisClient.getOrphanedQueries() - ); + async reconcileQueue() { + if (!this.reconcilePromise) { + this.reconcileAgain = false; + this.reconcilePromise = this.reconcileQueueImpl().then(() => { + this.reconcilePromise = null; + if (this.reconcileAgain) { + return this.reconcileQueue(); + } + return null; + }); + } else { + this.reconcileAgain = true; + } + return this.reconcilePromise; + } - await Promise.all(toCancel.map(async queryKey => { - const [query] = await redisClient.getQueryAndRemove(queryKey); - if (query) { - this.logger('Removing orphaned query', { - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - await this.sendCancelMessageFn(query); - } - })); + async reconcileQueueImpl() { + const redisClient = await this.queueDriver.createConnection(); + try { + const toCancel = ( + await redisClient.getStalledQueries() + ).concat( + await redisClient.getOrphanedQueries() + ); - const active = await redisClient.getActiveQueries(); - const toProcess = await redisClient.getToProcessQueries(); - await Promise.all( - R.pipe( - R.filter(p => active.indexOf(p) === -1), - R.take(this.concurrency), - R.map(this.sendProcessMessageFn) - )(toProcess) - ); + await Promise.all(toCancel.map(async queryKey => { + const [query] = await redisClient.getQueryAndRemove(queryKey); + if (query) { + this.logger('Removing orphaned query', { + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId + }); + await this.sendCancelMessageFn(query); + } + })); + + const active = await redisClient.getActiveQueries(); + const toProcess = await redisClient.getToProcessQueries(); + await Promise.all( + R.pipe( + R.filter(p => active.indexOf(p) === -1), + R.take(this.concurrency), + R.map(this.sendProcessMessageFn) + )(toProcess) + ); + } finally { + this.queueDriver.release(redisClient); + } } queryTimeout(promise) { @@ -186,93 +207,104 @@ class QueryQueue { async processQuery(queryKey) { const redisClient = await this.queueDriver.createConnection(); try { + const processingId = await redisClient.getNextProcessingId(); // eslint-disable-next-line no-unused-vars - const [insertedCount, removedCount, activeKeys, queueSize] = - await redisClient.retrieveForProcessing(queryKey); - if (insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1) { - let query = await redisClient.getQueryDef(queryKey); - if (query) { - let executionResult; - const startQueryTime = (new Date()).getTime(); - const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; - this.logger('Performing query', { + const [insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] = + await redisClient.retrieveForProcessing(queryKey, processingId); + if (query && insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1 && processingLockAcquired) { + let executionResult; + const startQueryTime = (new Date()).getTime(); + const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; + this.logger('Performing query', { + processingId, + queueSize, + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId, + timeInQueue + }); + await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId); + + const heartBeatTimer = setInterval( + () => redisClient.updateHeartBeat(queryKey), + this.heartBeatInterval * 1000 + ); + try { + executionResult = { + result: await this.queryTimeout( + this.queryHandlers[query.queryHandler]( + query.query, + async (cancelHandler) => { + try { + return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId); + } catch (e) { + this.logger(`Error while query update`, { + queryKey, + error: e.stack || e, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId + }); + } + return null; + } + ) + ) + }; + this.logger('Performing query completed', { + processingId, queueSize, + duration: ((new Date()).getTime() - startQueryTime), queryKey: query.queryKey, queuePrefix: this.redisQueuePrefix, requestId: query.requestId, timeInQueue }); - await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }); - - const heartBeatTimer = setInterval( - () => redisClient.updateHeartBeat(queryKey), - this.heartBeatInterval * 1000 - ); - try { - executionResult = { - result: await this.queryTimeout( - this.queryHandlers[query.queryHandler]( - query.query, - async (cancelHandler) => { - try { - return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }); - } catch (e) { - this.logger(`Error while query update`, { - queryKey, - error: e.stack || e, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - } - return null; - } - ) - ) - }; - this.logger('Performing query completed', { - queueSize, - duration: ((new Date()).getTime() - startQueryTime), - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId, - timeInQueue - }); - } catch (e) { - executionResult = { - error: (e.message || e).toString() // TODO error handling - }; - this.logger('Error while querying', { + } catch (e) { + executionResult = { + error: (e.message || e).toString() // TODO error handling + }; + this.logger('Error while querying', { + processingId, + queryKey: query.queryKey, + error: (e.stack || e).toString(), + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId + }); + if (e instanceof TimeoutError) { + this.logger('Cancelling query due to timeout', { + processingId, queryKey: query.queryKey, - error: (e.stack || e).toString(), queuePrefix: this.redisQueuePrefix, requestId: query.requestId }); - if (e instanceof TimeoutError) { - query = await redisClient.getQueryDef(queryKey); - if (query) { - this.logger('Cancelling query due to timeout', { - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - await this.sendCancelMessageFn(query); - } - } + await this.sendCancelMessageFn(query); } + } - clearInterval(heartBeatTimer); + clearInterval(heartBeatTimer); - await redisClient.setResultAndRemoveQuery(queryKey, executionResult); - } else { - this.logger('Query cancelled in-flight', { - queueSize, - queryKey, - queuePrefix: this.redisQueuePrefix + if (!(await redisClient.setResultAndRemoveQuery(queryKey, executionResult, processingId))) { + this.logger('Orphaned execution result', { + processingId, + warn: `Result for query was not set due to processing lock wasn't acquired`, + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId }); - await redisClient.removeQuery(queryKey); } - await this.reconcileQueue(redisClient); + await this.reconcileQueue(); + } else { + this.logger('Skip processing', { + processingId, + queryKey, + queuePrefix: this.redisQueuePrefix, + processingLockAcquired, + query, + insertedCount, + activeKeys + }); + await redisClient.freeProcessingLock(queryKey, processingId); } } catch (e) { this.logger('Queue storage error', { diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js b/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js index fe6c39879a17f..63131f649a7bb 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js @@ -4,7 +4,7 @@ const { promisify } = require('util'); module.exports = function createRedisClient(url) { redis.Multi.prototype.execAsync = promisify(redis.Multi.prototype.exec); - let options = { + const options = { url, }; @@ -18,7 +18,22 @@ module.exports = function createRedisClient(url) { const client = redis.createClient(options); - ['brpop', 'del', 'get', 'hget', 'rpop', 'set', 'zadd', 'zrange', 'zrangebyscore', 'keys'].forEach( + [ + 'brpop', + 'del', + 'get', + 'hget', + 'rpop', + 'set', + 'zadd', + 'zrange', + 'zrangebyscore', + 'keys', + 'watch', + 'incr', + 'decr', + 'lpush' + ].forEach( k => { client[`${k}Async`] = promisify(client[k]); } diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js b/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js index 22d7c073164e3..8c28b4f9d67a9 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js @@ -7,7 +7,13 @@ class RedisPool { const max = (typeof poolMax !== 'undefined') ? poolMax : parseInt(process.env.CUBEJS_REDIS_POOL_MAX, 10) || 1000; const create = createClient || (() => createRedisClient(process.env.REDIS_URL)); const destroy = destroyClient || (client => client.end(true)); - const opts = { min, max, acquireTimeoutMillis: 5000, idleTimeoutMillis: 5000 } + const opts = { + min, + max, + acquireTimeoutMillis: 5000, + idleTimeoutMillis: 5000, + evictionRunIntervalMillis: 5000 + }; if (max > 0) { this.pool = genericPool.createPool({ create, destroy }, opts); } else { @@ -18,7 +24,7 @@ class RedisPool { async getClient() { if (this.pool) { - return await this.pool.acquire(); + return this.pool.acquire(); } else { return this.create(); } @@ -27,10 +33,8 @@ class RedisPool { release(client) { if (this.pool) { this.pool.release(client); - } else { - if (client) { - client.quit(); - } + } else if (client) { + client.quit(); } } diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js index 5a258ede160a9..687052e2ec985 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js @@ -15,7 +15,14 @@ class RedisQueueDriverConnection { async getResultBlocking(queryKey) { const resultListKey = this.resultListKey(queryKey); + if (!(await this.redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]))) { + return this.getResult(queryKey); + } const result = await this.redisClient.brpopAsync([resultListKey, this.continueWaitTimeout]); + if (result) { + await this.redisClient.lpushAsync([resultListKey, result[1]]); + await this.redisClient.rpopAsync(resultListKey); + } return result && JSON.parse(result[1]); } @@ -58,29 +65,30 @@ class RedisQueueDriverConnection { const [query, ...restResult] = await this.redisClient.multi() .hget([this.queriesDefKey(), this.redisHash(queryKey)]) .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) + .zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)]) .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) + .del(this.queryProcessingLockKey(queryKey)) .execAsync(); return [JSON.parse(query), ...restResult]; } - setResultAndRemoveQuery(queryKey, executionResult) { - return this.redisClient.multi() - .lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) - .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) - .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) - .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) - .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) - .execAsync(); - } + async setResultAndRemoveQuery(queryKey, executionResult, processingId) { + await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); + const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); + if (processingId !== currentProcessId) { + return false; + } - removeQuery(queryKey) { return this.redisClient.multi() + .lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) + .zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)]) .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) + .del(this.queryProcessingLockKey(queryKey)) .execAsync(); } @@ -92,7 +100,7 @@ class RedisQueueDriverConnection { getStalledQueries() { return this.redisClient.zrangebyscoreAsync( - [this.activeRedisKey(), 0, (new Date().getTime() - this.heartBeatTimeout * 1000)] + [this.heartBeatRedisKey(), 0, (new Date().getTime() - this.heartBeatTimeout * 1000)] ); } @@ -113,23 +121,49 @@ class RedisQueueDriverConnection { } updateHeartBeat(queryKey) { - return this.redisClient.zaddAsync([this.activeRedisKey(), new Date().getTime(), this.redisHash(queryKey)]); - } - - retrieveForProcessing(queryKey) { - return this.redisClient.multi() - .zadd([this.activeRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)]) - .zremrangebyrank([this.activeRedisKey(), this.concurrency, -1]) - .zrange([this.activeRedisKey(), 0, this.concurrency - 1]) - .zcard(this.toProcessRedisKey()) - .execAsync(); + return this.redisClient.zaddAsync([this.heartBeatRedisKey(), new Date().getTime(), this.redisHash(queryKey)]); + } + + async getNextProcessingId() { + const id = await this.redisClient.incrAsync(this.processingIdKey()); + return id && id.toString(); + } + + async retrieveForProcessing(queryKey, processingId) { + const [insertedCount, removedCount, activeKeys, queueSize, queryDef, processingLockAcquired] = + await this.redisClient.multi() + .zadd([this.activeRedisKey(), 'NX', processingId, this.redisHash(queryKey)]) + .zremrangebyrank([this.activeRedisKey(), this.concurrency, -1]) + .zrange([this.activeRedisKey(), 0, this.concurrency - 1]) + .zcard(this.toProcessRedisKey()) + .hget(([this.queriesDefKey(), this.redisHash(queryKey)])) + .set(this.queryProcessingLockKey(queryKey), processingId, 'NX') + .zadd([this.heartBeatRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)]) + .execAsync(); + return [insertedCount, removedCount, activeKeys, queueSize, JSON.parse(queryDef), processingLockAcquired]; + } + + async freeProcessingLock(queryKey, processingId) { + const lockKey = this.queryProcessingLockKey(queryKey); + await this.redisClient.watchAsync(lockKey); + const currentProcessId = await this.redisClient.getAsync(lockKey); + if (currentProcessId === processingId) { + await this.redisClient.multi() + .del(lockKey) + .execAsync(); + } } - async optimisticQueryUpdate(queryKey, toUpdate) { + async optimisticQueryUpdate(queryKey, toUpdate, processingId) { let query = await this.getQueryDef(queryKey); for (let i = 0; i < 10; i++) { if (query) { // eslint-disable-next-line no-await-in-loop + await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); + const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); + if (currentProcessId !== processingId) { + return false; + } let [beforeUpdate] = await this.redisClient .multi() .hget([this.queriesDefKey(), this.redisHash(queryKey)]) @@ -161,6 +195,10 @@ class RedisQueueDriverConnection { return this.queueRedisKey('ACTIVE'); } + heartBeatRedisKey() { + return this.queueRedisKey('HEART_BEAT'); + } + queryRedisKey(queryKey, suffix) { return `${this.redisQueuePrefix}_${this.redisHash(queryKey)}_${suffix}`; } @@ -173,10 +211,18 @@ class RedisQueueDriverConnection { return this.queueRedisKey('QUERIES'); } + processingIdKey() { + return this.queueRedisKey('PROCESSING_COUNTER'); + } + resultListKey(queryKey) { return this.queryRedisKey(queryKey, 'RESULT'); } + queryProcessingLockKey(queryKey) { + return this.queryRedisKey(queryKey, 'LOCK'); + } + redisHash(queryKey) { return this.driver.redisHash(queryKey); } diff --git a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js index 2957e5e1f3b75..e4154cfdf6fb9 100644 --- a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js +++ b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js @@ -40,6 +40,14 @@ const QueryQueueTest = (name, options) => { expect(result).toBe('select * from bar'); }); + test('instant double wait resolve', async () => { + const results = await Promise.all([ + queue.executeInQueue('delay', `instant`, { delay: 400, result: '2' }), + queue.executeInQueue('delay', `instant`, { delay: 400, result: '2' }) + ]); + expect(results).toStrictEqual(['20', '20']); + }); + test('priority', async () => { delayCount = 0; const result = await Promise.all([ @@ -50,6 +58,7 @@ const QueryQueueTest = (name, options) => { expect(parseInt(result.find(f => f[0] === '3'), 10) % 10).toBeLessThan(2); }); + test('timeout', async () => { delayCount = 0; const query = ['select * from 2']; @@ -57,6 +66,7 @@ const QueryQueueTest = (name, options) => { for (let i = 0; i < 5; i++) { try { await queue.executeInQueue('delay', query, { delay: 3000, result: '1' }); + console.log(`Delay ${i}`); } catch (e) { if (e.message === 'Continue wait') { // eslint-disable-next-line no-continue @@ -66,9 +76,10 @@ const QueryQueueTest = (name, options) => { break; } } - expect(errorString.indexOf('timeout')).not.toEqual(-1); + expect(errorString).toEqual(expect.stringContaining('timeout')); }); + test('stage reporting', async () => { delayCount = 0; const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { stageQueryKey: '1' });