Skip to content

Commit

Permalink
feat: use local queue and cache for local dev server instead of Redis…
Browse files Browse the repository at this point in the history
… one
  • Loading branch information
paveltiunov committed Apr 1, 2019
1 parent f45468b commit 50f1bbb
Show file tree
Hide file tree
Showing 10 changed files with 624 additions and 242 deletions.
1 change: 1 addition & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module.exports = {
"no-mixed-operators": 0,
"no-else-return": 0,
"prefer-promise-reject-errors": 0,
"no-plusplus": 0,
"operator-linebreak": ["error", "after"],
'max-len': ['error', 120, 2, {
ignoreUrls: true,
Expand Down
11 changes: 11 additions & 0 deletions packages/cubejs-query-orchestrator/orchestrator/BaseQueueDriver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
const crypto = require('crypto');

class BaseQueueDriver {
redisHash(queryKey) {
return typeof queryKey === 'string' && queryKey.length < 256 ?
queryKey :
crypto.createHash('md5').update(JSON.stringify(queryKey)).digest("hex");
}
}

module.exports = BaseQueueDriver;
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
class LocalCacheDriver {
constructor() {
this.store = {};
}

async get(key) {
if (this.store[key] && this.store[key].exp < new Date().getTime()) {
delete this.store[key];
}
return this.store[key] && this.store[key].value;
}

async set(key, value, expiration) {
this.store[key] = {
value,
exp: new Date().getTime() + expiration * 1000
};
}

async remove(key) {
delete this.store[key];
}
}

module.exports = LocalCacheDriver;
190 changes: 190 additions & 0 deletions packages/cubejs-query-orchestrator/orchestrator/LocalQueueDriver.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
const R = require('ramda');
const BaseQueueDriver = require('./BaseQueueDriver');

class LocalQueueDriverConnection {
constructor(driver, options) {
this.redisQueuePrefix = options.redisQueuePrefix;
this.continueWaitTimeout = options.continueWaitTimeout;
this.orphanedTimeout = options.orphanedTimeout;
this.heartBeatTimeout = options.heartBeatTimeout;
this.concurrency = options.concurrency;
this.driver = driver;
this.results = driver.results;
this.resultPromises = driver.resultPromises;
this.queryDef = driver.queryDef;
this.toProcess = driver.toProcess;
this.recent = driver.recent;
this.active = driver.active;
}

getResultPromise(resultListKey) {
if (!this.resultPromises[resultListKey]) {
let resolveMethod = null;
this.resultPromises[resultListKey] = new Promise(resolve => {
resolveMethod = resolve;
});
this.resultPromises[resultListKey].resolve = resolveMethod;
}
return this.resultPromises[resultListKey];
}

async getResultBlocking(queryKey, continueWaitTimeout) {
const resultListKey = this.resultListKey(queryKey);
const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout));

const res = await Promise.race([
this.getResultPromise(resultListKey),
timeoutPromise(continueWaitTimeout || this.continueWaitTimeout * 1000),
]);
if (res) {
delete this.resultPromises[resultListKey];
}
return res;
}

async getResult(queryKey) {
const resultListKey = this.resultListKey(queryKey);
if (this.resultPromises[resultListKey]) {
return this.getResultBlocking(queryKey, 5);
}
return null;
}

queueArray(queueObj, orderFilterLessThan) {
return R.pipe(
R.values,
R.filter(orderFilterLessThan ? q => q.order < orderFilterLessThan : R.identity),
R.sortBy(q => q.order),
R.map(q => q.key)
)(queueObj);
}

addToQueue(keyScore, queryKey, time, queryHandler, query, priority, options) {
const queryQueueObj = {
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority
};
const key = this.redisHash(queryKey);
if (!this.queryDef[key]) {
this.queryDef[key] = queryQueueObj;
}
let added = 0;
if (!this.toProcess[key]) {
this.toProcess[key] = {
order: keyScore,
key
};
added = 1;
}
this.recent[key] = { order: time, key };

return [added, null, null, Object.keys(this.toProcess).length]; // TODO nulls
}

getToProcessQueries() {
return this.queueArray(this.toProcess);
}

getActiveQueries() {
return this.queueArray(this.active);
}

async getQueryAndRemove(queryKey) {
const key = this.redisHash(queryKey);
const query = this.queryDef[key];
delete this.active[key];
delete this.toProcess[key];
delete this.recent[key];
delete this.queryDef[key];
return [query];
}

setResultAndRemoveQuery(queryKey, executionResult) {
const key = this.redisHash(queryKey);
const promise = this.getResultPromise(this.resultListKey(queryKey));
delete this.active[key];
delete this.toProcess[key];
delete this.recent[key];
delete this.queryDef[key];
promise.resolve(executionResult);
}

removeQuery(queryKey) {
const key = this.redisHash(queryKey);
delete this.active[key];
delete this.toProcess[key];
delete this.recent[key];
delete this.queryDef[key];
}

getOrphanedQueries() {
return this.queueArray(this.recent, new Date().getTime() - this.orphanedTimeout * 1000);
}

getStalledQueries() {
return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000);
}

async getQueryStageState() {
return [this.queueArray(this.active), this.queueArray(this.toProcess), R.clone(this.queryDef)];
}

async getQueryDef(queryKey) {
return this.queryDef[this.redisHash(queryKey)];
}

updateHeartBeat(queryKey) {
const key = this.redisHash(queryKey);
if (this.active[key]) {
this.active[key] = { key, order: new Date().getTime() };
}
}

retrieveForProcessing(queryKey) {
const key = this.redisHash(queryKey);
let added = 0;
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
this.active[key] = { key, order: new Date().getTime() };
added = 1;
}
return [added, null, this.queueArray(this.active), Object.keys(this.toProcess).length]; // TODO nulls
}

async optimisticQueryUpdate(queryKey, toUpdate) {
const key = this.redisHash(queryKey);
this.queryDef[key] = { ...this.queryDef[key], ...toUpdate };
}

release() {
}

queryRedisKey(queryKey, suffix) {
return `${this.redisQueuePrefix}_${this.redisHash(queryKey)}_${suffix}`;
}

resultListKey(queryKey) {
return this.queryRedisKey(queryKey, 'RESULT');
}

redisHash(queryKey) {
return this.driver.redisHash(queryKey);
}
}

class LocalQueueDriver extends BaseQueueDriver {
constructor(options) {
super();
this.options = options;
this.results = {};
this.resultPromises = {};
this.queryDef = {};
this.toProcess = {};
this.recent = {};
this.active = {};
}

createConnection() {
return new LocalQueueDriverConnection(this, this.options);
}
}

module.exports = LocalQueueDriver;
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const crypto = require('crypto');
const R = require('ramda');
const redis = require('redis');
const RedisCacheDriver = require('./RedisCacheDriver');
const LocalCacheDriver = require('./LocalCacheDriver');

const QueryCache = require('./QueryCache');
const ContinueWaitError = require('./ContinueWaitError');
Expand Down Expand Up @@ -50,15 +51,15 @@ class PreAggregationLoadCache {
this.queryCache = queryCache;
this.preAggregations = preAggregations;
this.queryResults = {};
this.redisClient = preAggregations.redisClient;
this.cacheDriver = preAggregations.cacheDriver;
}

async tablesFromCache(schema) {
let tables = JSON.parse(await this.redisClient.getAsync(this.tablesRedisKey()));
let tables = await this.cacheDriver.get(this.tablesRedisKey());
if (!tables) {
const client = await this.driverFactory();
tables = await client.getTablesQuery(schema);
await this.redisClient.setAsync(this.tablesRedisKey(), JSON.stringify(tables), 'EX', 120);
await this.cacheDriver.set(this.tablesRedisKey(), tables, 120);
}
return tables;
}
Expand Down Expand Up @@ -106,7 +107,7 @@ class PreAggregationLoadCache {
this.tables = undefined;
this.queryStageState = undefined;
this.versionEnries = undefined;
await this.redisClient.delAsync(this.tablesRedisKey());
await this.cacheDriver.remove(this.tablesRedisKey());
}
}

Expand Down Expand Up @@ -312,7 +313,9 @@ class PreAggregations {
this.queryCache = queryCache;
this.refreshErrors = {}; // TODO should be in redis
this.tablesUsedInQuery = {}; // TODO should be in redis
this.redisClient = redis.createClient(process.env.REDIS_URL);
this.cacheDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL ?
new RedisCacheDriver() :
new LocalCacheDriver();
}

loadAllPreAggregationsIfNeeded (queryBody) {
Expand Down
15 changes: 9 additions & 6 deletions packages/cubejs-query-orchestrator/orchestrator/QueryCache.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
const redis = require('redis');
const crypto = require('crypto');
const QueryQueue = require('./QueryQueue');
const ContinueWaitError = require('./ContinueWaitError');
const RedisCacheDriver = require('./RedisCacheDriver');
const LocalCacheDriver = require('./LocalCacheDriver');

class QueryCache {
constructor(redisPrefix, clientFactory, logger, options) {
this.options = options || {};
this.redisPrefix = redisPrefix;
this.driverFactory = clientFactory;
this.logger = logger;
this.redisClient = redis.createClient(process.env.REDIS_URL);
this.cacheDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL ?
new RedisCacheDriver() :
new LocalCacheDriver();
}

cachedQueryResult (queryBody, preAggregationsTablesToTempTables) {
Expand Down Expand Up @@ -193,15 +196,15 @@ class QueryCache {
result: res,
renewalKey
};
return this.redisClient.setAsync(redisKey, JSON.stringify(result), 'EX', expiration)
return this.cacheDriver.set(redisKey, result, expiration)
.then(() => {
this.logger('Renewed', { cacheKey });
return res
});
}).catch(e => {
if (!(e instanceof ContinueWaitError)) {
this.logger('Dropping Cache', { cacheKey, error: e.stack || e });
this.redisClient.delAsync(redisKey)
this.cacheDriver.remove(redisKey)
.catch(e => this.logger('Error removing key', { cacheKey, error: e.stack || e }));
}
throw e;
Expand All @@ -213,9 +216,9 @@ class QueryCache {
return fetchNew();
}

return this.redisClient.getAsync(redisKey).then(res => {
return this.cacheDriver.get(redisKey).then(res => {
if (res) {
const parsedResult = JSON.parse(res);
const parsedResult = res;
const renewedAgo = (new Date()).getTime() - parsedResult.time;
this.logger('Found cache entry', {
cacheKey,
Expand Down
Loading

0 comments on commit 50f1bbb

Please sign in to comment.