Skip to content

Commit

Permalink
fix: Redis driver execAsync ignores watch directives
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jul 18, 2020
1 parent 6f9393b commit ac67e5b
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ const redis = require('redis');
const { promisify } = require('util');

module.exports = function createRedisClient(url) {
redis.Multi.prototype.execAsync = promisify(redis.Multi.prototype.exec);
redis.Multi.prototype.execAsync = function execAsync() {
return new Promise((resolve, reject) => this.exec((err, res) => (
err ? reject(err) : resolve(res)
)));
};

const options = {
url,
Expand Down Expand Up @@ -30,6 +34,7 @@ module.exports = function createRedisClient(url) {
'zrangebyscore',
'keys',
'watch',
'unwatch',
'incr',
'decr',
'lpush'
Expand Down
98 changes: 55 additions & 43 deletions packages/cubejs-query-orchestrator/orchestrator/RedisQueueDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,25 @@ class RedisQueueDriverConnection {
}

async setResultAndRemoveQuery(queryKey, executionResult, processingId) {
await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey));
const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey));
if (processingId !== currentProcessId) {
return false;
}
try {
await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey));
const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey));
if (processingId !== currentProcessId) {
return false;
}

return this.redisClient.multi()
.lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)])
.zrem([this.activeRedisKey(), this.redisHash(queryKey)])
.zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)])
.zrem([this.toProcessRedisKey(), this.redisHash(queryKey)])
.zrem([this.recentRedisKey(), this.redisHash(queryKey)])
.hdel([this.queriesDefKey(), this.redisHash(queryKey)])
.del(this.queryProcessingLockKey(queryKey))
.execAsync();
return this.redisClient.multi()
.lpush([this.resultListKey(queryKey), JSON.stringify(executionResult)])
.zrem([this.activeRedisKey(), this.redisHash(queryKey)])
.zrem([this.heartBeatRedisKey(), this.redisHash(queryKey)])
.zrem([this.toProcessRedisKey(), this.redisHash(queryKey)])
.zrem([this.recentRedisKey(), this.redisHash(queryKey)])
.hdel([this.queriesDefKey(), this.redisHash(queryKey)])
.del(this.queryProcessingLockKey(queryKey))
.execAsync();
} finally {
await this.redisClient.unwatchAsync();
}
}

getOrphanedQueries() {
Expand Down Expand Up @@ -144,43 +148,51 @@ class RedisQueueDriverConnection {
}

async freeProcessingLock(queryKey, processingId, activated) {
const lockKey = this.queryProcessingLockKey(queryKey);
await this.redisClient.watchAsync(lockKey);
const currentProcessId = await this.redisClient.getAsync(lockKey);
if (currentProcessId === processingId) {
let removeCommand = this.redisClient.multi()
.del(lockKey);
if (activated) {
removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]);
try {
const lockKey = this.queryProcessingLockKey(queryKey);
await this.redisClient.watchAsync(lockKey);
const currentProcessId = await this.redisClient.getAsync(lockKey);
if (currentProcessId === processingId) {
let removeCommand = this.redisClient.multi()
.del(lockKey);
if (activated) {
removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]);
}
await removeCommand
.execAsync();
}
await removeCommand
.execAsync();
} finally {
await this.redisClient.unwatchAsync();
}
}

async optimisticQueryUpdate(queryKey, toUpdate, processingId) {
let query = await this.getQueryDef(queryKey);
for (let i = 0; i < 10; i++) {
if (query) {
// eslint-disable-next-line no-await-in-loop
await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey));
const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey));
if (currentProcessId !== processingId) {
return false;
}
let [beforeUpdate] = await this.redisClient
.multi()
.hget([this.queriesDefKey(), this.redisHash(queryKey)])
.hset([this.queriesDefKey(), this.redisHash(queryKey), JSON.stringify({ ...query, ...toUpdate })])
.execAsync();
beforeUpdate = JSON.parse(beforeUpdate);
if (JSON.stringify(query) === JSON.stringify(beforeUpdate)) {
return true;
try {
let query = await this.getQueryDef(queryKey);
for (let i = 0; i < 10; i++) {
if (query) {
// eslint-disable-next-line no-await-in-loop
await this.redisClient.watchAsync(this.queryProcessingLockKey(queryKey));
const currentProcessId = await this.redisClient.getAsync(this.queryProcessingLockKey(queryKey));
if (currentProcessId !== processingId) {
return false;
}
let [beforeUpdate] = await this.redisClient
.multi()
.hget([this.queriesDefKey(), this.redisHash(queryKey)])
.hset([this.queriesDefKey(), this.redisHash(queryKey), JSON.stringify({ ...query, ...toUpdate })])
.execAsync();
beforeUpdate = JSON.parse(beforeUpdate);
if (JSON.stringify(query) === JSON.stringify(beforeUpdate)) {
return true;
}
query = beforeUpdate;
}
query = beforeUpdate;
}
throw new Error(`Can't update ${queryKey} with ${JSON.stringify(toUpdate)}`);
} finally {
await this.redisClient.unwatchAsync();
}
throw new Error(`Can't update ${queryKey} with ${JSON.stringify(toUpdate)}`);
}

release() {
Expand Down
55 changes: 55 additions & 0 deletions packages/cubejs-query-orchestrator/test/QueryQueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,61 @@ const QueryQueueTest = (name, options) => {
const result = await queue.executeInQueue('foo', query, query);
expect(result).toBe('select * from bar');
});

test('queue driver lock obtain race condition', async () => {
const redisClient = await queue.queueDriver.createConnection();
const redisClient2 = await queue.queueDriver.createConnection();
const priority = 10;
const time = new Date().getTime();
const keyScore = time + (10000 - priority) * 1E14;

// console.log(await redisClient.getQueryAndRemove('race'));
// console.log(await redisClient.getQueryAndRemove('race1'));

if (redisClient.redisClient) {
await redisClient2.redisClient.setAsync(redisClient.queryProcessingLockKey('race'), '100');
await redisClient.redisClient.watchAsync(redisClient.queryProcessingLockKey('race'));
await redisClient2.redisClient.setAsync(redisClient.queryProcessingLockKey('race'), Math.random());

const res = await redisClient.redisClient.multi()
.set(redisClient.queryProcessingLockKey('race'), '100')
.set(redisClient.queryProcessingLockKey('race1'), '100')
.execAsync();

expect(res).toBe(null);
await redisClient.redisClient.delAsync(redisClient.queryProcessingLockKey('race'));
await redisClient.redisClient.delAsync(redisClient.queryProcessingLockKey('race1'));
}

await queue.reconcileQueue();

await redisClient.addToQueue(
keyScore, 'race', time, 'handler', ['select'], priority, { stageQueryKey: 'race' }
);

await redisClient.addToQueue(
keyScore + 100, 'race2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2' }
);

const processingId1 = await redisClient.getNextProcessingId();
const processingId4 = await redisClient.getNextProcessingId();

await redisClient.freeProcessingLock('race', processingId1, true);
await redisClient.freeProcessingLock('race2', processingId4, true);

await redisClient2.retrieveForProcessing('race2', await redisClient.getNextProcessingId());

const processingId = await redisClient.getNextProcessingId();
const retrieve6 = await redisClient.retrieveForProcessing('race', processingId);
console.log(retrieve6);
expect(!!retrieve6[5]).toBe(true);

console.log(await redisClient.getQueryAndRemove('race'));
console.log(await redisClient.getQueryAndRemove('race2'));

await queue.queueDriver.release(redisClient);
await queue.queueDriver.release(redisClient2);
});
});
};

Expand Down

0 comments on commit ac67e5b

Please sign in to comment.