Skip to content

Commit

Permalink
feat: Slow Query Warning and scheduled refresh for cube refresh keys
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 18, 2020
1 parent 9fb0abb commit 8768b0e
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 36 deletions.
12 changes: 12 additions & 0 deletions packages/cubejs-query-orchestrator/orchestrator/QueryCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,18 @@ class QueryCache {
return cachedValue && new Date(cachedValue.time);
}

async resultFromCacheIfExists(queryBody) {
const cacheKey = QueryCache.queryCacheKey(queryBody);
const cachedValue = await this.cacheDriver.get(this.queryRedisKey(cacheKey));
if (cachedValue) {
return {
data: cachedValue.result,
lastRefreshTime: new Date(cachedValue.time)
};
}
return null;
}

queryRedisKey(cacheKey) {
return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest("hex")}`;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class QueryOrchestrator {
return { ...preAggregationStage, stage: stageMessage };
}
}

resultFromCacheIfExists(queryBody) {
return this.queryCache.resultFromCacheIfExists(queryBody);
}
}

module.exports = QueryOrchestrator;
2 changes: 1 addition & 1 deletion packages/cubejs-schema-compiler/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ class BaseQuery {
}

parseSecondDuration(interval) {
const intervalMatch = interval.match(/^(\d+) (minute|hour|day|week)s?$/);
const intervalMatch = interval.match(/^(\d+) (second|minute|hour|day|week)s?$/);
if (!intervalMatch) {
throw new UserError(`Invalid interval: ${interval}`);
}
Expand Down
4 changes: 4 additions & 0 deletions packages/cubejs-schema-compiler/compiler/CubeEvaluator.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class CubeEvaluator extends CubeSymbols {
}).reduce((a, b) => a.concat(b), []);
}

cubeNamesWithRefreshKeys() {
return Object.keys(this.evaluatedCubes).filter(c => !!this.evaluatedCubes[c].refreshKey);
}

isMeasure(measurePath) {
return this.isInstanceOfType('measures', measurePath);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-schema-compiler/compiler/CubeValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const timeInterval =
Joi.string().regex(/^(-?\d+) (minute|hour|day|week|month|year)$/, 'time interval'),
Joi.any().valid('unbounded')
]);
const everyInterval = Joi.string().regex(/^(\d+) (minute|hour|day|week)s?$/, 'refresh time interval');
const everyInterval = Joi.string().regex(/^(\d+) (second|minute|hour|day|week)s?$/, 'refresh time interval');

const BaseDimensionWithoutSubQuery = {
aliases: Joi.array().items(Joi.string()),
Expand Down
11 changes: 11 additions & 0 deletions packages/cubejs-server-core/core/OrchestratorApi.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const ContinueWaitError = require('@cubejs-backend/query-orchestrator/orchestrat
class OrchestratorApi {
constructor(driverFactory, logger, options) {
options = options || {};
this.options = options;
this.orchestrator = new QueryOrchestrator(options.redisPrefix || 'STANDALONE', driverFactory, logger, options);
this.driverFactory = driverFactory;
const { externalDriverFactory } = options;
Expand Down Expand Up @@ -47,6 +48,16 @@ class OrchestratorApi {
requestId: query.requestId
});

const fromCache = await this.orchestrator.resultFromCacheIfExists(query);
if (!query.renewQuery && fromCache) {
this.logger('Slow Query Warning', {
query: queryForLog,
requestId: query.requestId,
warning: `Query is too slow to be renewed during the user request and was served from the cache. Please consider using low latency pre-aggregations.`
});
return fromCache;
}

throw { error: 'Continue wait', stage: await this.orchestrator.queryStage(query) };
}

Expand Down
79 changes: 60 additions & 19 deletions packages/cubejs-server-core/core/RefreshScheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,10 @@ class RefreshScheduler {
});
try {
const compilerApi = this.serverCore.getCompilerApi(context);
const scheduledPreAggregations = await compilerApi.scheduledPreAggregations();
await Promise.all(scheduledPreAggregations.map(async preAggregation => {
const queries = await this.refreshQueriesForPreAggregation(
context, compilerApi, preAggregation, queryingOptions
);
await Promise.all(queries.map(async (query, i) => {
const sqlQuery = await compilerApi.getSql(query);
const orchestratorApi = this.serverCore.getOrchestratorApi({ ...context, dataSource: sqlQuery.dataSource });
await orchestratorApi.executeQuery({
...sqlQuery,
preAggregations: sqlQuery.preAggregations.map(
(p) => ({ ...p, priority: i - queries.length })
),
continueWait: true,
renewQuery: true,
requestId: context.requestId
});
}));
}));
await Promise.all([
this.refreshCubesRefreshKey(context, compilerApi, queryingOptions),
this.refreshPreAggregations(context, compilerApi, queryingOptions)
]);
} catch (e) {
if (e.error !== 'Continue wait') {
this.serverCore.logger('Refresh Scheduler Error', {
Expand All @@ -111,6 +96,62 @@ class RefreshScheduler {
}
}
}

async refreshCubesRefreshKey(context, compilerApi, queryingOptions) {
const dbType = compilerApi.getDbType();
const compilers = await compilerApi.getCompilers();
const queryForEvaluation = compilerApi.createQuery(compilers, dbType, {});
await Promise.all(queryForEvaluation.cubeEvaluator.cubeNamesWithRefreshKeys().map(async cube => {
const cubeFromPath = queryForEvaluation.cubeEvaluator.cubeFromPath(cube);
const measuresCount = Object.keys(cubeFromPath.measures || {}).length;
const dimensionsCount = Object.keys(cubeFromPath.dimensions || {}).length;
if (measuresCount === 0 && dimensionsCount === 0) {
return;
}
const query = {
...queryingOptions,
...(
measuresCount &&
{ measures: [`${cube}.${Object.keys(cubeFromPath.measures)[0]}`] }
),
...(
dimensionsCount &&
{ dimensions: [`${cube}.${Object.keys(cubeFromPath.dimensions)[0]}`] }
)
};
const sqlQuery = await compilerApi.getSql(query);
const orchestratorApi = this.serverCore.getOrchestratorApi({ ...context, dataSource: sqlQuery.dataSource });
await orchestratorApi.executeQuery({
...sqlQuery,
query: 'SELECT 1', // TODO get rid off it
continueWait: true,
renewQuery: true,
requestId: context.requestId
});
}));
}

async refreshPreAggregations(context, compilerApi, queryingOptions) {
const scheduledPreAggregations = await compilerApi.scheduledPreAggregations();
await Promise.all(scheduledPreAggregations.map(async preAggregation => {
const queries = await this.refreshQueriesForPreAggregation(
context, compilerApi, preAggregation, queryingOptions
);
await Promise.all(queries.map(async (query, i) => {
const sqlQuery = await compilerApi.getSql(query);
const orchestratorApi = this.serverCore.getOrchestratorApi({ ...context, dataSource: sqlQuery.dataSource });
await orchestratorApi.executeQuery({
...sqlQuery,
preAggregations: sqlQuery.preAggregations.map(
(p) => ({ ...p, priority: i - queries.length })
),
continueWait: true,
renewQuery: true,
requestId: context.requestId
});
}));
}));
}
}

module.exports = RefreshScheduler;
56 changes: 41 additions & 15 deletions packages/cubejs-server-core/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ const checkEnvForPlaceholders = () => {
}
};

const devLogger = (level) => (type, message, error) => {
const devLogger = (level) => (type, { error, warning, ...message }) => {
const colors = {
red: '31', // ERROR
green: '32', // INFO
yellow: '33', // WARNING
};

const withColor = (str, color = colors.green) => `\u001b[${color}m${str}\u001b[0m`;
Expand Down Expand Up @@ -79,30 +80,48 @@ const devLogger = (level) => (type, message, error) => {
return restParams;
};

const logWarning = () => console.log(
`${withColor(type, colors.yellow)}: ${format({ ...message, allSqlLines: true })} \n${withColor(warning, colors.yellow)}`
);
const logError = () => console.log(`${withColor(type, colors.red)}: ${format({ ...message, allSqlLines: true })} \n${error}`);
const logType = () => console.log(`${withColor(type)}`);
const logDetails = () => console.log(`${withColor(type)}: ${format(message)}`);

if (error) {
logError();
return;
}

switch (level) {
case "ERROR":
logType();
break;
case "TRACE":
logDetails();
break;
case "INFO":
default: {
if ([
// eslint-disable-next-line default-case
switch ((level || 'info').toLowerCase()) {
case "trace": {
if (!error && !warning) {
logDetails();
break;
}
}
// eslint-disable-next-line no-fallthrough
case "info": {
if (!error && !warning && [
'Load Request Success',
'Performing query',
'Performing query completed',
].includes(type)) {
logDetails();
break;
}
}
// eslint-disable-next-line no-fallthrough
case "warn": {
if (!error && warning) {
logWarning();
break;
}
}
// eslint-disable-next-line no-fallthrough
case "error": {
if (error) {
logError();
break;
}
}
}
Expand Down Expand Up @@ -133,9 +152,8 @@ class CubejsServerCore {
this.schemaPath = options.schemaPath || 'schema';
this.dbType = options.dbType;
this.logger = options.logger || ((msg, params) => {
const { error, ...restParams } = params;
if (process.env.NODE_ENV !== 'production') {
devLogger(process.env.CUBEJS_LOG_LEVEL)(msg, restParams, error);
devLogger(process.env.CUBEJS_LOG_LEVEL)(msg, params);
} else {
console.log(JSON.stringify({ message: msg, ...params }));
}
Expand Down Expand Up @@ -166,6 +184,13 @@ class CubejsServerCore {
setInterval(() => this.compilerCache.prune(), options.maxCompilerCacheKeepAlive);
}

if (options.scheduledRefreshTimer) {
setInterval(
() => this.runScheduledRefresh(),
typeof options.scheduledRefreshTimer === 'number' ? (options.scheduledRefreshTimer * 1000) : 5000
);
}

const { machineIdSync } = require('node-machine-id');
let anonymousId = 'unknown';
try {
Expand Down Expand Up @@ -212,7 +237,8 @@ class CubejsServerCore {
msg === 'Internal Server Error' ||
msg === 'User Error' ||
msg === 'Compiling schema' ||
msg === 'Recompiling schema'
msg === 'Recompiling schema' ||
msg === 'Slow Query Warning'
) {
this.event(msg, { error: params.error });
}
Expand Down

0 comments on commit 8768b0e

Please sign in to comment.