diff --git a/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js index 3f68ca69f7d27..112ae30b08747 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js @@ -15,6 +15,9 @@ class LocalQueueDriverConnection { this.toProcess = driver.toProcess; this.recent = driver.recent; this.active = driver.active; + this.heartBeat = driver.heartBeat; + this.processingCounter = driver.processingCounter; + this.processingLocks = driver.processingLocks; } getResultPromise(resultListKey) { @@ -30,12 +33,16 @@ class LocalQueueDriverConnection { async getResultBlocking(queryKey) { const resultListKey = this.resultListKey(queryKey); + if (!this.queryDef[this.redisHash(queryKey)] && !this.resultPromises[resultListKey]) { + return null; + } const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout)); const res = await Promise.race([ this.getResultPromise(resultListKey), timeoutPromise(this.continueWaitTimeout * 1000), ]); + if (res) { delete this.resultPromises[resultListKey]; } @@ -98,28 +105,33 @@ class LocalQueueDriverConnection { const key = this.redisHash(queryKey); const query = this.queryDef[key]; delete this.active[key]; + delete this.heartBeat[key]; delete this.toProcess[key]; delete this.recent[key]; delete this.queryDef[key]; + delete this.processingLocks[key]; return [query]; } - setResultAndRemoveQuery(queryKey, executionResult) { + async setResultAndRemoveQuery(queryKey, executionResult, processingId) { const key = this.redisHash(queryKey); + if (this.processingLocks[key] !== processingId) { + return false; + } const promise = this.getResultPromise(this.resultListKey(queryKey)); delete this.active[key]; + delete this.heartBeat[key]; delete this.toProcess[key]; delete this.recent[key]; delete this.queryDef[key]; + delete this.processingLocks[key]; promise.resolve(executionResult); + return true; } - removeQuery(queryKey) { - const key = this.redisHash(queryKey); - delete this.active[key]; - delete this.toProcess[key]; - delete this.recent[key]; - delete this.queryDef[key]; + getNextProcessingId() { + this.processingCounter.counter = this.processingCounter.counter ? this.processingCounter.counter + 1 : 1; + return this.processingCounter.counter; } getOrphanedQueries() { @@ -127,7 +139,7 @@ class LocalQueueDriverConnection { } getStalledQueries() { - return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000); + return this.queueArray(this.heartBeat, new Date().getTime() - this.heartBeatTimeout * 1000); } async getQueryStageState(onlyKeys) { @@ -140,24 +152,43 @@ class LocalQueueDriverConnection { updateHeartBeat(queryKey) { const key = this.redisHash(queryKey); - if (this.active[key]) { - this.active[key] = { key, order: new Date().getTime() }; + if (this.heartBeat[key]) { + this.heartBeat[key] = { key, order: new Date().getTime() }; } } - retrieveForProcessing(queryKey) { + retrieveForProcessing(queryKey, processingId) { const key = this.redisHash(queryKey); + let lockAcquired = false; + if (!this.processingLocks[key]) { + this.processingLocks[key] = processingId; + lockAcquired = true; + } let added = 0; if (Object.keys(this.active).length < this.concurrency && !this.active[key]) { - this.active[key] = { key, order: new Date().getTime() }; + this.active[key] = { key, order: processingId }; added = 1; } - return [added, null, this.queueArray(this.active), Object.keys(this.toProcess).length]; // TODO nulls + this.heartBeat[key] = { key, order: new Date().getTime() }; + return [ + added, null, this.queueArray(this.active), Object.keys(this.toProcess).length, this.queryDef[key], lockAcquired + ]; // TODO nulls } - async optimisticQueryUpdate(queryKey, toUpdate) { + freeProcessingLock(queryKey, processingId) { const key = this.redisHash(queryKey); + if (this.processingLocks[key] === processingId) { + delete this.processingLocks[key]; + } + } + + async optimisticQueryUpdate(queryKey, toUpdate, processingId) { + const key = this.redisHash(queryKey); + if (this.processingLocks[key] !== processingId) { + return false; + } this.queryDef[key] = { ...this.queryDef[key], ...toUpdate }; + return true; } release() { @@ -182,6 +213,9 @@ const queryDef = {}; const toProcess = {}; const recent = {}; const active = {}; +const heartBeat = {}; +const processingCounters = {}; +const processingLocks = {}; class LocalQueueDriver extends BaseQueueDriver { constructor(options) { @@ -193,12 +227,18 @@ class LocalQueueDriver extends BaseQueueDriver { toProcess[options.redisQueuePrefix] = toProcess[options.redisQueuePrefix] || {}; recent[options.redisQueuePrefix] = recent[options.redisQueuePrefix] || {}; active[options.redisQueuePrefix] = active[options.redisQueuePrefix] || {}; + heartBeat[options.redisQueuePrefix] = heartBeat[options.redisQueuePrefix] || {}; + processingCounters[options.redisQueuePrefix] = processingCounters[options.redisQueuePrefix] || {}; + processingLocks[options.redisQueuePrefix] = processingLocks[options.redisQueuePrefix] || {}; this.results = results[options.redisQueuePrefix]; this.resultPromises = resultPromises[options.redisQueuePrefix]; this.queryDef = queryDef[options.redisQueuePrefix]; this.toProcess = toProcess[options.redisQueuePrefix]; this.recent = recent[options.redisQueuePrefix]; this.active = active[options.redisQueuePrefix]; + this.heartBeat = heartBeat[options.redisQueuePrefix]; + this.processingCounter = processingCounters[options.redisQueuePrefix]; + this.processingLocks = processingLocks[options.redisQueuePrefix]; } createConnection() { diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js b/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js index 7b0c9c30d8129..eda9d291d26ee 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js @@ -9,7 +9,6 @@ class QueryOrchestrator { this.redisPrefix = redisPrefix; this.driverFactory = driverFactory; this.logger = logger; - const redisPool = new RedisPool(); const { externalDriverFactory } = options; const cacheAndQueueDriver = options.cacheAndQueueDriver || process.env.CUBEJS_CACHE_AND_QUEUE_DRIVER || ( process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? 'redis' : 'memory' @@ -17,6 +16,7 @@ class QueryOrchestrator { if (cacheAndQueueDriver !== 'redis' && cacheAndQueueDriver !== 'memory') { throw new Error(`Only 'redis' or 'memory' are supported for cacheAndQueueDriver option`); } + const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool() : undefined; this.redisPool = redisPool; this.queryCache = new QueryCache( diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js index 9f112686f74f0..d944a91cd125d 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js @@ -62,7 +62,7 @@ class QueryQueue { }); } - await this.reconcileQueue(redisClient); + await this.reconcileQueue(); const queryDef = await redisClient.getQueryDef(queryKey); const [active, toProcess] = await redisClient.getQueryStageState(true); @@ -101,34 +101,55 @@ class QueryQueue { } } - async reconcileQueue(redisClient) { - const toCancel = ( - await redisClient.getStalledQueries() - ).concat( - await redisClient.getOrphanedQueries() - ); + async reconcileQueue() { + if (!this.reconcilePromise) { + this.reconcileAgain = false; + this.reconcilePromise = this.reconcileQueueImpl().then(() => { + this.reconcilePromise = null; + if (this.reconcileAgain) { + return this.reconcileQueue(); + } + return null; + }); + } else { + this.reconcileAgain = true; + } + return this.reconcilePromise; + } - await Promise.all(toCancel.map(async queryKey => { - const [query] = await redisClient.getQueryAndRemove(queryKey); - if (query) { - this.logger('Removing orphaned query', { - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - await this.sendCancelMessageFn(query); - } - })); + async reconcileQueueImpl() { + const redisClient = await this.queueDriver.createConnection(); + try { + const toCancel = ( + await redisClient.getStalledQueries() + ).concat( + await redisClient.getOrphanedQueries() + ); - const active = await redisClient.getActiveQueries(); - const toProcess = await redisClient.getToProcessQueries(); - await Promise.all( - R.pipe( - R.filter(p => active.indexOf(p) === -1), - R.take(this.concurrency), - R.map(this.sendProcessMessageFn) - )(toProcess) - ); + await Promise.all(toCancel.map(async queryKey => { + const [query] = await redisClient.getQueryAndRemove(queryKey); + if (query) { + this.logger('Removing orphaned query', { + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId + }); + await this.sendCancelMessageFn(query); + } + })); + + const active = await redisClient.getActiveQueries(); + const toProcess = await redisClient.getToProcessQueries(); + await Promise.all( + R.pipe( + R.filter(p => active.indexOf(p) === -1), + R.take(this.concurrency), + R.map(this.sendProcessMessageFn) + )(toProcess) + ); + } finally { + this.queueDriver.release(redisClient); + } } queryTimeout(promise) { @@ -186,93 +207,104 @@ class QueryQueue { async processQuery(queryKey) { const redisClient = await this.queueDriver.createConnection(); try { + const processingId = await redisClient.getNextProcessingId(); // eslint-disable-next-line no-unused-vars - const [insertedCount, removedCount, activeKeys, queueSize] = - await redisClient.retrieveForProcessing(queryKey); - if (insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1) { - let query = await redisClient.getQueryDef(queryKey); - if (query) { - let executionResult; - const startQueryTime = (new Date()).getTime(); - const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; - this.logger('Performing query', { + const [insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] = + await redisClient.retrieveForProcessing(queryKey, processingId); + if (query && insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1 && processingLockAcquired) { + let executionResult; + const startQueryTime = (new Date()).getTime(); + const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; + this.logger('Performing query', { + processingId, + queueSize, + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId, + timeInQueue + }); + await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId); + + const heartBeatTimer = setInterval( + () => redisClient.updateHeartBeat(queryKey), + this.heartBeatInterval * 1000 + ); + try { + executionResult = { + result: await this.queryTimeout( + this.queryHandlers[query.queryHandler]( + query.query, + async (cancelHandler) => { + try { + return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId); + } catch (e) { + this.logger(`Error while query update`, { + queryKey, + error: e.stack || e, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId + }); + } + return null; + } + ) + ) + }; + this.logger('Performing query completed', { + processingId, queueSize, + duration: ((new Date()).getTime() - startQueryTime), queryKey: query.queryKey, queuePrefix: this.redisQueuePrefix, requestId: query.requestId, timeInQueue }); - await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }); - - const heartBeatTimer = setInterval( - () => redisClient.updateHeartBeat(queryKey), - this.heartBeatInterval * 1000 - ); - try { - executionResult = { - result: await this.queryTimeout( - this.queryHandlers[query.queryHandler]( - query.query, - async (cancelHandler) => { - try { - return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }); - } catch (e) { - this.logger(`Error while query update`, { - queryKey, - error: e.stack || e, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - } - return null; - } - ) - ) - }; - this.logger('Performing query completed', { - queueSize, - duration: ((new Date()).getTime() - startQueryTime), - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId, - timeInQueue - }); - } catch (e) { - executionResult = { - error: (e.message || e).toString() // TODO error handling - }; - this.logger('Error while querying', { + } catch (e) { + executionResult = { + error: (e.message || e).toString() // TODO error handling + }; + this.logger('Error while querying', { + processingId, + queryKey: query.queryKey, + error: (e.stack || e).toString(), + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId + }); + if (e instanceof TimeoutError) { + this.logger('Cancelling query due to timeout', { + processingId, queryKey: query.queryKey, - error: (e.stack || e).toString(), queuePrefix: this.redisQueuePrefix, requestId: query.requestId }); - if (e instanceof TimeoutError) { - query = await redisClient.getQueryDef(queryKey); - if (query) { - this.logger('Cancelling query due to timeout', { - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - await this.sendCancelMessageFn(query); - } - } + await this.sendCancelMessageFn(query); } + } - clearInterval(heartBeatTimer); + clearInterval(heartBeatTimer); - await redisClient.setResultAndRemoveQuery(queryKey, executionResult); - } else { - this.logger('Query cancelled in-flight', { - queueSize, - queryKey, - queuePrefix: this.redisQueuePrefix + if (!(await redisClient.setResultAndRemoveQuery(queryKey, executionResult, processingId))) { + this.logger('Orphaned execution result', { + processingId, + warn: `Result for query was not set due to processing lock wasn't acquired`, + queryKey: query.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: query.requestId }); - await redisClient.removeQuery(queryKey); } - await this.reconcileQueue(redisClient); + await this.reconcileQueue(); + } else { + this.logger('Skip processing', { + processingId, + queryKey, + queuePrefix: this.redisQueuePrefix, + processingLockAcquired, + query, + insertedCount, + activeKeys + }); + await redisClient.freeProcessingLock(queryKey, processingId); } } catch (e) { this.logger('Queue storage error', { diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js b/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js index fe6c39879a17f..63131f649a7bb 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js @@ -4,7 +4,7 @@ const { promisify } = require('util'); module.exports = function createRedisClient(url) { redis.Multi.prototype.execAsync = promisify(redis.Multi.prototype.exec); - let options = { + const options = { url, }; @@ -18,7 +18,22 @@ module.exports = function createRedisClient(url) { const client = redis.createClient(options); - ['brpop', 'del', 'get', 'hget', 'rpop', 'set', 'zadd', 'zrange', 'zrangebyscore', 'keys'].forEach( + [ + 'brpop', + 'del', + 'get', + 'hget', + 'rpop', + 'set', + 'zadd', + 'zrange', + 'zrangebyscore', + 'keys', + 'watch', + 'incr', + 'decr', + 'lpush' + ].forEach( k => { client[`${k}Async`] = promisify(client[k]); } diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js b/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js index 22d7c073164e3..8c28b4f9d67a9 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisPool.js @@ -7,7 +7,13 @@ class RedisPool { const max = (typeof poolMax !== 'undefined') ? poolMax : parseInt(process.env.CUBEJS_REDIS_POOL_MAX, 10) || 1000; const create = createClient || (() => createRedisClient(process.env.REDIS_URL)); const destroy = destroyClient || (client => client.end(true)); - const opts = { min, max, acquireTimeoutMillis: 5000, idleTimeoutMillis: 5000 } + const opts = { + min, + max, + acquireTimeoutMillis: 5000, + idleTimeoutMillis: 5000, + evictionRunIntervalMillis: 5000 + }; if (max > 0) { this.pool = genericPool.createPool({ create, destroy }, opts); } else { @@ -18,7 +24,7 @@ class RedisPool { async getClient() { if (this.pool) { - return await this.pool.acquire(); + return this.pool.acquire(); } else { return this.create(); } @@ -27,10 +33,8 @@ class RedisPool { release(client) { if (this.pool) { this.pool.release(client); - } else { - if (client) { - client.quit(); - } + } else if (client) { + client.quit(); } } diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js index 5a258ede160a9..687052e2ec985 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js @@ -15,7 +15,14 @@ class RedisQueueDriverConnection { async getResultBlocking(queryKey) { const resultListKey = this.resultListKey(queryKey); + if (!(await this.redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]))) { + return this.getResult(queryKey); + } const result = await this.redisClient.brpopAsync([resultListKey, this.continueWaitTimeout]); + if (result) { + await this.redisClient.lpushAsync([resultListKey, result[1]]); + await this.redisClient.rpopAsync(resultListKey); + } return result && JSON.parse(result[1]); } @@ -58,29 +65,30 @@ class RedisQueueDriverConnection { const [query, ...restResult] = await this.redisClient.multi() .hget([this.queriesDefKey(), this.redisHash(queryKey)]) .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) + .zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)]) .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) + .del(this.queryProcessingLockKey(queryKey)) .execAsync(); return [JSON.parse(query), ...restResult]; } - setResultAndRemoveQuery(queryKey, executionResult) { - return this.redisClient.multi() - .lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) - .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) - .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) - .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) - .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) - .execAsync(); - } + async setResultAndRemoveQuery(queryKey, executionResult, processingId) { + await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); + const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); + if (processingId !== currentProcessId) { + return false; + } - removeQuery(queryKey) { return this.redisClient.multi() + .lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) + .zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)]) .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) + .del(this.queryProcessingLockKey(queryKey)) .execAsync(); } @@ -92,7 +100,7 @@ class RedisQueueDriverConnection { getStalledQueries() { return this.redisClient.zrangebyscoreAsync( - [this.activeRedisKey(), 0, (new Date().getTime() - this.heartBeatTimeout * 1000)] + [this.heartBeatRedisKey(), 0, (new Date().getTime() - this.heartBeatTimeout * 1000)] ); } @@ -113,23 +121,49 @@ class RedisQueueDriverConnection { } updateHeartBeat(queryKey) { - return this.redisClient.zaddAsync([this.activeRedisKey(), new Date().getTime(), this.redisHash(queryKey)]); - } - - retrieveForProcessing(queryKey) { - return this.redisClient.multi() - .zadd([this.activeRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)]) - .zremrangebyrank([this.activeRedisKey(), this.concurrency, -1]) - .zrange([this.activeRedisKey(), 0, this.concurrency - 1]) - .zcard(this.toProcessRedisKey()) - .execAsync(); + return this.redisClient.zaddAsync([this.heartBeatRedisKey(), new Date().getTime(), this.redisHash(queryKey)]); + } + + async getNextProcessingId() { + const id = await this.redisClient.incrAsync(this.processingIdKey()); + return id && id.toString(); + } + + async retrieveForProcessing(queryKey, processingId) { + const [insertedCount, removedCount, activeKeys, queueSize, queryDef, processingLockAcquired] = + await this.redisClient.multi() + .zadd([this.activeRedisKey(), 'NX', processingId, this.redisHash(queryKey)]) + .zremrangebyrank([this.activeRedisKey(), this.concurrency, -1]) + .zrange([this.activeRedisKey(), 0, this.concurrency - 1]) + .zcard(this.toProcessRedisKey()) + .hget(([this.queriesDefKey(), this.redisHash(queryKey)])) + .set(this.queryProcessingLockKey(queryKey), processingId, 'NX') + .zadd([this.heartBeatRedisKey(), 'NX', new Date().getTime(), this.redisHash(queryKey)]) + .execAsync(); + return [insertedCount, removedCount, activeKeys, queueSize, JSON.parse(queryDef), processingLockAcquired]; + } + + async freeProcessingLock(queryKey, processingId) { + const lockKey = this.queryProcessingLockKey(queryKey); + await this.redisClient.watchAsync(lockKey); + const currentProcessId = await this.redisClient.getAsync(lockKey); + if (currentProcessId === processingId) { + await this.redisClient.multi() + .del(lockKey) + .execAsync(); + } } - async optimisticQueryUpdate(queryKey, toUpdate) { + async optimisticQueryUpdate(queryKey, toUpdate, processingId) { let query = await this.getQueryDef(queryKey); for (let i = 0; i < 10; i++) { if (query) { // eslint-disable-next-line no-await-in-loop + await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); + const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); + if (currentProcessId !== processingId) { + return false; + } let [beforeUpdate] = await this.redisClient .multi() .hget([this.queriesDefKey(), this.redisHash(queryKey)]) @@ -161,6 +195,10 @@ class RedisQueueDriverConnection { return this.queueRedisKey('ACTIVE'); } + heartBeatRedisKey() { + return this.queueRedisKey('HEART_BEAT'); + } + queryRedisKey(queryKey, suffix) { return `${this.redisQueuePrefix}_${this.redisHash(queryKey)}_${suffix}`; } @@ -173,10 +211,18 @@ class RedisQueueDriverConnection { return this.queueRedisKey('QUERIES'); } + processingIdKey() { + return this.queueRedisKey('PROCESSING_COUNTER'); + } + resultListKey(queryKey) { return this.queryRedisKey(queryKey, 'RESULT'); } + queryProcessingLockKey(queryKey) { + return this.queryRedisKey(queryKey, 'LOCK'); + } + redisHash(queryKey) { return this.driver.redisHash(queryKey); } diff --git a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js index 2957e5e1f3b75..e4154cfdf6fb9 100644 --- a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js +++ b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js @@ -40,6 +40,14 @@ const QueryQueueTest = (name, options) => { expect(result).toBe('select * from bar'); }); + test('instant double wait resolve', async () => { + const results = await Promise.all([ + queue.executeInQueue('delay', `instant`, { delay: 400, result: '2' }), + queue.executeInQueue('delay', `instant`, { delay: 400, result: '2' }) + ]); + expect(results).toStrictEqual(['20', '20']); + }); + test('priority', async () => { delayCount = 0; const result = await Promise.all([ @@ -50,6 +58,7 @@ const QueryQueueTest = (name, options) => { expect(parseInt(result.find(f => f[0] === '3'), 10) % 10).toBeLessThan(2); }); + test('timeout', async () => { delayCount = 0; const query = ['select * from 2']; @@ -57,6 +66,7 @@ const QueryQueueTest = (name, options) => { for (let i = 0; i < 5; i++) { try { await queue.executeInQueue('delay', query, { delay: 3000, result: '1' }); + console.log(`Delay ${i}`); } catch (e) { if (e.message === 'Continue wait') { // eslint-disable-next-line no-continue @@ -66,9 +76,10 @@ const QueryQueueTest = (name, options) => { break; } } - expect(errorString.indexOf('timeout')).not.toEqual(-1); + expect(errorString).toEqual(expect.stringContaining('timeout')); }); + test('stage reporting', async () => { delayCount = 0; const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { stageQueryKey: '1' });