Skip to content

Commit

Permalink
fix: Redis query queue locking redesign
Browse files Browse the repository at this point in the history
Usage of time as sorting active queries list leads to random flipping of active keys.
Such behavior leads to plural execution of single query in queue under load.
Replace active keys time sorting with unique `processingId` locks for queries.

Fixes #459
  • Loading branch information
paveltiunov committed Mar 1, 2020
1 parent 5634b62 commit a2eb9b2
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 143 deletions.
68 changes: 54 additions & 14 deletions packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class LocalQueueDriverConnection {
this.toProcess = driver.toProcess;
this.recent = driver.recent;
this.active = driver.active;
this.heartBeat = driver.heartBeat;
this.processingCounter = driver.processingCounter;
this.processingLocks = driver.processingLocks;
}

getResultPromise(resultListKey) {
Expand All @@ -30,12 +33,16 @@ class LocalQueueDriverConnection {

async getResultBlocking(queryKey) {
const resultListKey = this.resultListKey(queryKey);
if (!this.queryDef[this.redisHash(queryKey)] && !this.resultPromises[resultListKey]) {
return null;
}
const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout));

const res = await Promise.race([
this.getResultPromise(resultListKey),
timeoutPromise(this.continueWaitTimeout * 1000),
]);

if (res) {
delete this.resultPromises[resultListKey];
}
Expand Down Expand Up @@ -98,36 +105,41 @@ class LocalQueueDriverConnection {
const key = this.redisHash(queryKey);
const query = this.queryDef[key];
delete this.active[key];
delete this.heartBeat[key];
delete this.toProcess[key];
delete this.recent[key];
delete this.queryDef[key];
delete this.processingLocks[key];
return [query];
}

setResultAndRemoveQuery(queryKey, executionResult) {
async setResultAndRemoveQuery(queryKey, executionResult, processingId) {
const key = this.redisHash(queryKey);
if (this.processingLocks[key] !== processingId) {
return false;
}
const promise = this.getResultPromise(this.resultListKey(queryKey));
delete this.active[key];
delete this.heartBeat[key];
delete this.toProcess[key];
delete this.recent[key];
delete this.queryDef[key];
delete this.processingLocks[key];
promise.resolve(executionResult);
return true;
}

removeQuery(queryKey) {
const key = this.redisHash(queryKey);
delete this.active[key];
delete this.toProcess[key];
delete this.recent[key];
delete this.queryDef[key];
getNextProcessingId() {
this.processingCounter.counter = this.processingCounter.counter ? this.processingCounter.counter + 1 : 1;
return this.processingCounter.counter;
}

getOrphanedQueries() {
return this.queueArray(this.recent, new Date().getTime() - this.orphanedTimeout * 1000);
}

getStalledQueries() {
return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000);
return this.queueArray(this.heartBeat, new Date().getTime() - this.heartBeatTimeout * 1000);
}

async getQueryStageState(onlyKeys) {
Expand All @@ -140,24 +152,43 @@ class LocalQueueDriverConnection {

updateHeartBeat(queryKey) {
const key = this.redisHash(queryKey);
if (this.active[key]) {
this.active[key] = { key, order: new Date().getTime() };
if (this.heartBeat[key]) {
this.heartBeat[key] = { key, order: new Date().getTime() };
}
}

retrieveForProcessing(queryKey) {
retrieveForProcessing(queryKey, processingId) {
const key = this.redisHash(queryKey);
let lockAcquired = false;
if (!this.processingLocks[key]) {
this.processingLocks[key] = processingId;
lockAcquired = true;
}
let added = 0;
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
this.active[key] = { key, order: new Date().getTime() };
this.active[key] = { key, order: processingId };
added = 1;
}
return [added, null, this.queueArray(this.active), Object.keys(this.toProcess).length]; // TODO nulls
this.heartBeat[key] = { key, order: new Date().getTime() };
return [
added, null, this.queueArray(this.active), Object.keys(this.toProcess).length, this.queryDef[key], lockAcquired
]; // TODO nulls
}

async optimisticQueryUpdate(queryKey, toUpdate) {
freeProcessingLock(queryKey, processingId) {
const key = this.redisHash(queryKey);
if (this.processingLocks[key] === processingId) {
delete this.processingLocks[key];
}
}

async optimisticQueryUpdate(queryKey, toUpdate, processingId) {
const key = this.redisHash(queryKey);
if (this.processingLocks[key] !== processingId) {
return false;
}
this.queryDef[key] = { ...this.queryDef[key], ...toUpdate };
return true;
}

release() {
Expand All @@ -182,6 +213,9 @@ const queryDef = {};
const toProcess = {};
const recent = {};
const active = {};
const heartBeat = {};
const processingCounters = {};
const processingLocks = {};

class LocalQueueDriver extends BaseQueueDriver {
constructor(options) {
Expand All @@ -193,12 +227,18 @@ class LocalQueueDriver extends BaseQueueDriver {
toProcess[options.redisQueuePrefix] = toProcess[options.redisQueuePrefix] || {};
recent[options.redisQueuePrefix] = recent[options.redisQueuePrefix] || {};
active[options.redisQueuePrefix] = active[options.redisQueuePrefix] || {};
heartBeat[options.redisQueuePrefix] = heartBeat[options.redisQueuePrefix] || {};
processingCounters[options.redisQueuePrefix] = processingCounters[options.redisQueuePrefix] || {};
processingLocks[options.redisQueuePrefix] = processingLocks[options.redisQueuePrefix] || {};
this.results = results[options.redisQueuePrefix];
this.resultPromises = resultPromises[options.redisQueuePrefix];
this.queryDef = queryDef[options.redisQueuePrefix];
this.toProcess = toProcess[options.redisQueuePrefix];
this.recent = recent[options.redisQueuePrefix];
this.active = active[options.redisQueuePrefix];
this.heartBeat = heartBeat[options.redisQueuePrefix];
this.processingCounter = processingCounters[options.redisQueuePrefix];
this.processingLocks = processingLocks[options.redisQueuePrefix];
}

createConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ class QueryOrchestrator {
this.redisPrefix = redisPrefix;
this.driverFactory = driverFactory;
this.logger = logger;
const redisPool = new RedisPool();
const { externalDriverFactory } = options;
const cacheAndQueueDriver = options.cacheAndQueueDriver || process.env.CUBEJS_CACHE_AND_QUEUE_DRIVER || (
process.env.NODE_ENV === 'production' || process.env.REDIS_URL ? 'redis' : 'memory'
);
if (cacheAndQueueDriver !== 'redis' && cacheAndQueueDriver !== 'memory') {
throw new Error(`Only 'redis' or 'memory' are supported for cacheAndQueueDriver option`);
}
const redisPool = cacheAndQueueDriver === 'redis' ? new RedisPool() : undefined;

this.redisPool = redisPool;
this.queryCache = new QueryCache(
Expand Down
Loading

0 comments on commit a2eb9b2

Please sign in to comment.