Skip to content

Commit

Permalink
fix: Dead queries added to queue in serverless
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Apr 15, 2020
1 parent e242806 commit eca3d0c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,13 @@ class LocalQueueDriverConnection {
]; // TODO nulls
}

freeProcessingLock(queryKey, processingId) {
freeProcessingLock(queryKey, processingId, activated) {
const key = this.redisHash(queryKey);
if (this.processingLocks[key] === processingId) {
delete this.processingLocks[key];
if (activated) {
delete this.active[key];
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ class QueryQueue {
const processingId = await redisClient.getNextProcessingId();
[insertedCount, removedCount, activeKeys, queueSize, query, processingLockAcquired] =
await redisClient.retrieveForProcessing(queryKey, processingId);
if (query && insertedCount && activeKeys.indexOf(this.redisHash(queryKey)) !== -1 && processingLockAcquired) {
const activated = activeKeys.indexOf(this.redisHash(queryKey)) !== -1;
if (query && insertedCount && activated && processingLockAcquired) {
let executionResult;
const startQueryTime = (new Date()).getTime();
const timeInQueue = (new Date()).getTime() - query.addedToQueueTime;
Expand Down Expand Up @@ -319,7 +320,7 @@ class QueryQueue {
insertedCount,
activeKeys
});
await redisClient.freeProcessingLock(queryKey, processingId);
await redisClient.freeProcessingLock(queryKey, processingId, activated);
}
} catch (e) {
this.logger('Queue storage error', {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,13 +143,17 @@ class RedisQueueDriverConnection {
return [insertedCount, removedCount, activeKeys, queueSize, JSON.parse(queryDef), processingLockAcquired];
}

async freeProcessingLock(queryKey, processingId) {
async freeProcessingLock(queryKey, processingId, activated) {
const lockKey = this.queryProcessingLockKey(queryKey);
await this.redisClient.watchAsync(lockKey);
const currentProcessId = await this.redisClient.getAsync(lockKey);
if (currentProcessId === processingId) {
await this.redisClient.multi()
.del(lockKey)
let removeCommand = this.redisClient.multi()
.del(lockKey);
if (activated) {
removeCommand = removeCommand.zrem([this.activeRedisKey(), this.redisHash(queryKey)]);
}
await removeCommand
.execAsync();
}
}
Expand Down
7 changes: 7 additions & 0 deletions packages/cubejs-query-orchestrator/test/QueryQueue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ const QueryQueueTest = (name, options) => {
expect(cancelledQuery).toBe('114');
await queue.executeInQueue('delay', `114`, { delay: 50, result: '4' }, 0);
});

test('removed before reconciled', async () => {
const query = ['select * from'];
await queue.processQuery(query);
const result = await queue.executeInQueue('foo', query, query);
expect(result).toBe('select * from bar');
});
});
};

Expand Down

0 comments on commit eca3d0c

Please sign in to comment.