diff --git a/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js index d1c3baf972851..2b392ac34e6fa 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js @@ -176,10 +176,13 @@ class LocalQueueDriverConnection { ]; // TODO nulls } - freeProcessingLock(queryKey, processingId) { + freeProcessingLock(queryKey, processingId, activated) { const key = this.redisHash(queryKey); if (this.processingLocks[key] === processingId) { delete this.processingLocks[key]; + if (activated) { + delete this.active[key]; + } } } diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js index 29e0f3d368c42..a71a323189fed 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js @@ -219,7 +219,8 @@ class QueryQueue { const processingId = await redisClient.getNextProcessingId(); [insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] = await redisClient.retrieveForProcessing(queryKey, processingId); - if (query && insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1 && processingLockAcquired) { + const activated = activeKeys.indexOf(this.redisHash(queryKey)) !== -1; + if (query && insertedCount && activated && processingLockAcquired) { let executionResult; const startQueryTime = (new Date()).getTime(); const timeInQueue = (new Date()).getTime() - query.addedToQueueTime; @@ -319,7 +320,7 @@ class QueryQueue { insertedCount, activeKeys }); - await redisClient.freeProcessingLock(queryKey, processingId); + await redisClient.freeProcessingLock(queryKey, processingId, activated); } } catch (e) { this.logger('Queue storage error', { diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js index 687052e2ec985..533840ea74c4e 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js @@ -143,13 +143,17 @@ class RedisQueueDriverConnection { return [insertedCount, removedCount, activeKeys, queueSize, JSON.parse(queryDef), processingLockAcquired]; } - async freeProcessingLock(queryKey, processingId) { + async freeProcessingLock(queryKey, processingId, activated) { const lockKey = this.queryProcessingLockKey(queryKey); await this.redisClient.watchAsync(lockKey); const currentProcessId = await this.redisClient.getAsync(lockKey); if (currentProcessId === processingId) { - await this.redisClient.multi() - .del(lockKey) + let removeCommand = this.redisClient.multi() + .del(lockKey); + if (activated) { + removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]); + } + await removeCommand .execAsync(); } } diff --git a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js index 9c9a2c9c65691..3126c761ce571 100644 --- a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js +++ b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js @@ -136,6 +136,13 @@ const QueryQueueTest = (name, options) => { expect(cancelledQuery).toBe('114'); await queue.executeInQueue('delay', `114`, { delay: 50, result: '4' }, 0); }); + + test('removed before reconciled', async () => { + const query = ['select * from']; + await queue.processQuery(query); + const result = await queue.executeInQueue('foo', query, query); + expect(result).toBe('select * from bar'); + }); }); };