From ac67e5b2903bb0515317ed67da4a930dea8b91f7 Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Fri, 17 Jul 2020 23:25:52 -0700 Subject: [PATCH] fix: Redis driver execAsync ignores watch directives --- .../orchestrator/RedisFactory.js | 7 +- .../orchestrator/RedisQueueDriver.js | 98 +++++++++++-------- .../test/QueryQueue.test.js | 55 +++++++++++ 3 files changed, 116 insertions(+), 44 deletions(-) diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js b/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js index 63131f649a7bb..926a76ee50769 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisFactory.js @@ -2,7 +2,11 @@ const redis = require('redis'); const { promisify } = require('util'); module.exports = function createRedisClient(url) { - redis.Multi.prototype.execAsync = promisify(redis.Multi.prototype.exec); + redis.Multi.prototype.execAsync = function execAsync() { + return new Promise((resolve, reject) => this.exec((err, res) => ( + err ? reject(err) : resolve(res) + ))); + }; const options = { url, @@ -30,6 +34,7 @@ module.exports = function createRedisClient(url) { 'zrangebyscore', 'keys', 'watch', + 'unwatch', 'incr', 'decr', 'lpush' diff --git a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js index 533840ea74c4e..6d422d2f8c377 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js +++ b/packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js @@ -75,21 +75,25 @@ class RedisQueueDriverConnection { } async setResultAndRemoveQuery(queryKey, executionResult, processingId) { - await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); - const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); - if (processingId !== currentProcessId) { - return false; - } + try { + await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); + const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); + if (processingId !== currentProcessId) { + return false; + } - return this.redisClient.multi() - .lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) - .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) - .zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)]) - .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) - .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) - .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) - .del(this.queryProcessingLockKey(queryKey)) - .execAsync(); + return this.redisClient.multi() + .lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)]) + .zrem([this.activeRedisKey(), this.redisHash(queryKey)]) + .zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)]) + .zrem([this.toProcessRedisKey(), this.redisHash(queryKey)]) + .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) + .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) + .del(this.queryProcessingLockKey(queryKey)) + .execAsync(); + } finally { + await this.redisClient.unwatchAsync(); + } } getOrphanedQueries() { @@ -144,43 +148,51 @@ class RedisQueueDriverConnection { } async freeProcessingLock(queryKey, processingId, activated) { - const lockKey = this.queryProcessingLockKey(queryKey); - await this.redisClient.watchAsync(lockKey); - const currentProcessId = await this.redisClient.getAsync(lockKey); - if (currentProcessId === processingId) { - let removeCommand = this.redisClient.multi() - .del(lockKey); - if (activated) { - removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]); + try { + const lockKey = this.queryProcessingLockKey(queryKey); + await this.redisClient.watchAsync(lockKey); + const currentProcessId = await this.redisClient.getAsync(lockKey); + if (currentProcessId === processingId) { + let removeCommand = this.redisClient.multi() + .del(lockKey); + if (activated) { + removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]); + } + await removeCommand + .execAsync(); } - await removeCommand - .execAsync(); + } finally { + await this.redisClient.unwatchAsync(); } } async optimisticQueryUpdate(queryKey, toUpdate, processingId) { - let query = await this.getQueryDef(queryKey); - for (let i = 0; i < 10; i++) { - if (query) { - // eslint-disable-next-line no-await-in-loop - await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); - const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); - if (currentProcessId !== processingId) { - return false; - } - let [beforeUpdate] = await this.redisClient - .multi() - .hget([this.queriesDefKey(), this.redisHash(queryKey)]) - .hset([this.queriesDefKey(), this.redisHash(queryKey), JSON.stringify({ ...query, ...toUpdate })]) - .execAsync(); - beforeUpdate = JSON.parse(beforeUpdate); - if (JSON.stringify(query) === JSON.stringify(beforeUpdate)) { - return true; + try { + let query = await this.getQueryDef(queryKey); + for (let i = 0; i < 10; i++) { + if (query) { + // eslint-disable-next-line no-await-in-loop + await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey)); + const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey)); + if (currentProcessId !== processingId) { + return false; + } + let [beforeUpdate] = await this.redisClient + .multi() + .hget([this.queriesDefKey(), this.redisHash(queryKey)]) + .hset([this.queriesDefKey(), this.redisHash(queryKey), JSON.stringify({ ...query, ...toUpdate })]) + .execAsync(); + beforeUpdate = JSON.parse(beforeUpdate); + if (JSON.stringify(query) === JSON.stringify(beforeUpdate)) { + return true; + } + query = beforeUpdate; } - query = beforeUpdate; } + throw new Error(`Can't update ${queryKey} with ${JSON.stringify(toUpdate)}`); + } finally { + await this.redisClient.unwatchAsync(); } - throw new Error(`Can't update ${queryKey} with ${JSON.stringify(toUpdate)}`); } release() { diff --git a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js index 3126c761ce571..50c7981822f18 100644 --- a/packages/cubejs-query-orchestrator/test/QueryQueue.test.js +++ b/packages/cubejs-query-orchestrator/test/QueryQueue.test.js @@ -143,6 +143,61 @@ const QueryQueueTest = (name, options) => { const result = await queue.executeInQueue('foo', query, query); expect(result).toBe('select * from bar'); }); + + test('queue driver lock obtain race condition', async () => { + const redisClient = await queue.queueDriver.createConnection(); + const redisClient2 = await queue.queueDriver.createConnection(); + const priority = 10; + const time = new Date().getTime(); + const keyScore = time + (10000 - priority) * 1E14; + + // console.log(await redisClient.getQueryAndRemove('race')); + // console.log(await redisClient.getQueryAndRemove('race1')); + + if (redisClient.redisClient) { + await redisClient2.redisClient.setAsync(redisClient.queryProcessingLockKey('race'), '100'); + await redisClient.redisClient.watchAsync(redisClient.queryProcessingLockKey('race')); + await redisClient2.redisClient.setAsync(redisClient.queryProcessingLockKey('race'), Math.random()); + + const res = await redisClient.redisClient.multi() + .set(redisClient.queryProcessingLockKey('race'), '100') + .set(redisClient.queryProcessingLockKey('race1'), '100') + .execAsync(); + + expect(res).toBe(null); + await redisClient.redisClient.delAsync(redisClient.queryProcessingLockKey('race')); + await redisClient.redisClient.delAsync(redisClient.queryProcessingLockKey('race1')); + } + + await queue.reconcileQueue(); + + await redisClient.addToQueue( + keyScore, 'race', time, 'handler', ['select'], priority, { stageQueryKey: 'race' } + ); + + await redisClient.addToQueue( + keyScore + 100, 'race2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2' } + ); + + const processingId1 = await redisClient.getNextProcessingId(); + const processingId4 = await redisClient.getNextProcessingId(); + + await redisClient.freeProcessingLock('race', processingId1, true); + await redisClient.freeProcessingLock('race2', processingId4, true); + + await redisClient2.retrieveForProcessing('race2', await redisClient.getNextProcessingId()); + + const processingId = await redisClient.getNextProcessingId(); + const retrieve6 = await redisClient.retrieveForProcessing('race', processingId); + console.log(retrieve6); + expect(!!retrieve6[5]).toBe(true); + + console.log(await redisClient.getQueryAndRemove('race')); + console.log(await redisClient.getQueryAndRemove('race2')); + + await queue.queueDriver.release(redisClient); + await queue.queueDriver.release(redisClient2); + }); }); };