From 8768b0e0e71a4f564ec5c7b3eb4d2cb028abaa54 Mon Sep 17 00:00:00 2001 From: Pavel Tiunov Date: Sat, 18 Jan 2020 14:30:14 -0800 Subject: [PATCH] feat: Slow Query Warning and scheduled refresh for cube refresh keys --- .../orchestrator/QueryCache.js | 12 +++ .../orchestrator/QueryOrchestrator.js | 4 + .../adapter/BaseQuery.js | 2 +- .../compiler/CubeEvaluator.js | 4 + .../compiler/CubeValidator.js | 2 +- .../core/OrchestratorApi.js | 11 +++ .../core/RefreshScheduler.js | 79 ++++++++++++++----- packages/cubejs-server-core/core/index.js | 56 +++++++++---- 8 files changed, 134 insertions(+), 36 deletions(-) diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryCache.js b/packages/cubejs-query-orchestrator/orchestrator/QueryCache.js index 7b70359b70ab2..ad4750f5d5a40 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryCache.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryCache.js @@ -350,6 +350,18 @@ class QueryCache { return cachedValue && new Date(cachedValue.time); } + async resultFromCacheIfExists(queryBody) { + const cacheKey = QueryCache.queryCacheKey(queryBody); + const cachedValue = await this.cacheDriver.get(this.queryRedisKey(cacheKey)); + if (cachedValue) { + return { + data: cachedValue.result, + lastRefreshTime: new Date(cachedValue.time) + }; + } + return null; + } + queryRedisKey(cacheKey) { return `SQL_QUERY_RESULT_${this.redisPrefix}_${crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest("hex")}`; } diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js b/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js index 49db74fc4e96d..ffd437522cbb8 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryOrchestrator.js @@ -79,6 +79,10 @@ class QueryOrchestrator { return { ...preAggregationStage, stage: stageMessage }; } } + + resultFromCacheIfExists(queryBody) { + return this.queryCache.resultFromCacheIfExists(queryBody); + } } module.exports = QueryOrchestrator; diff --git a/packages/cubejs-schema-compiler/adapter/BaseQuery.js b/packages/cubejs-schema-compiler/adapter/BaseQuery.js index 8e8e26d4654ea..c46ebf86d7933 100644 --- a/packages/cubejs-schema-compiler/adapter/BaseQuery.js +++ b/packages/cubejs-schema-compiler/adapter/BaseQuery.js @@ -1463,7 +1463,7 @@ class BaseQuery { } parseSecondDuration(interval) { - const intervalMatch = interval.match(/^(\d+) (minute|hour|day|week)s?$/); + const intervalMatch = interval.match(/^(\d+) (second|minute|hour|day|week)s?$/); if (!intervalMatch) { throw new UserError(`Invalid interval: ${interval}`); } diff --git a/packages/cubejs-schema-compiler/compiler/CubeEvaluator.js b/packages/cubejs-schema-compiler/compiler/CubeEvaluator.js index a7da29439a306..f8ae0a3a09e6d 100644 --- a/packages/cubejs-schema-compiler/compiler/CubeEvaluator.js +++ b/packages/cubejs-schema-compiler/compiler/CubeEvaluator.js @@ -62,6 +62,10 @@ class CubeEvaluator extends CubeSymbols { }).reduce((a, b) => a.concat(b), []); } + cubeNamesWithRefreshKeys() { + return Object.keys(this.evaluatedCubes).filter(c => !!this.evaluatedCubes[c].refreshKey); + } + isMeasure(measurePath) { return this.isInstanceOfType('measures', measurePath); } diff --git a/packages/cubejs-schema-compiler/compiler/CubeValidator.js b/packages/cubejs-schema-compiler/compiler/CubeValidator.js index 0536fb456b0f2..17ec1e54a3ce6 100644 --- a/packages/cubejs-schema-compiler/compiler/CubeValidator.js +++ b/packages/cubejs-schema-compiler/compiler/CubeValidator.js @@ -7,7 +7,7 @@ const timeInterval = Joi.string().regex(/^(-?\d+) (minute|hour|day|week|month|year)$/, 'time interval'), Joi.any().valid('unbounded') ]); -const everyInterval = Joi.string().regex(/^(\d+) (minute|hour|day|week)s?$/, 'refresh time interval'); +const everyInterval = Joi.string().regex(/^(\d+) (second|minute|hour|day|week)s?$/, 'refresh time interval'); const BaseDimensionWithoutSubQuery = { aliases: Joi.array().items(Joi.string()), diff --git a/packages/cubejs-server-core/core/OrchestratorApi.js b/packages/cubejs-server-core/core/OrchestratorApi.js index ddffd12570ff8..baff549fa2ca8 100644 --- a/packages/cubejs-server-core/core/OrchestratorApi.js +++ b/packages/cubejs-server-core/core/OrchestratorApi.js @@ -6,6 +6,7 @@ const ContinueWaitError = require('@cubejs-backend/query-orchestrator/orchestrat class OrchestratorApi { constructor(driverFactory, logger, options) { options = options || {}; + this.options = options; this.orchestrator = new QueryOrchestrator(options.redisPrefix || 'STANDALONE', driverFactory, logger, options); this.driverFactory = driverFactory; const { externalDriverFactory } = options; @@ -47,6 +48,16 @@ class OrchestratorApi { requestId: query.requestId }); + const fromCache = await this.orchestrator.resultFromCacheIfExists(query); + if (!query.renewQuery && fromCache) { + this.logger('Slow Query Warning', { + query: queryForLog, + requestId: query.requestId, + warning: `Query is too slow to be renewed during the user request and was served from the cache. Please consider using low latency pre-aggregations.` + }); + return fromCache; + } + throw { error: 'Continue wait', stage: await this.orchestrator.queryStage(query) }; } diff --git a/packages/cubejs-server-core/core/RefreshScheduler.js b/packages/cubejs-server-core/core/RefreshScheduler.js index bbd27340a6555..4c8028c2a8369 100644 --- a/packages/cubejs-server-core/core/RefreshScheduler.js +++ b/packages/cubejs-server-core/core/RefreshScheduler.js @@ -82,25 +82,10 @@ class RefreshScheduler { }); try { const compilerApi = this.serverCore.getCompilerApi(context); - const scheduledPreAggregations = await compilerApi.scheduledPreAggregations(); - await Promise.all(scheduledPreAggregations.map(async preAggregation => { - const queries = await this.refreshQueriesForPreAggregation( - context, compilerApi, preAggregation, queryingOptions - ); - await Promise.all(queries.map(async (query, i) => { - const sqlQuery = await compilerApi.getSql(query); - const orchestratorApi = this.serverCore.getOrchestratorApi({ ...context, dataSource: sqlQuery.dataSource }); - await orchestratorApi.executeQuery({ - ...sqlQuery, - preAggregations: sqlQuery.preAggregations.map( - (p) => ({ ...p, priority: i - queries.length }) - ), - continueWait: true, - renewQuery: true, - requestId: context.requestId - }); - })); - })); + await Promise.all([ + this.refreshCubesRefreshKey(context, compilerApi, queryingOptions), + this.refreshPreAggregations(context, compilerApi, queryingOptions) + ]); } catch (e) { if (e.error !== 'Continue wait') { this.serverCore.logger('Refresh Scheduler Error', { @@ -111,6 +96,62 @@ class RefreshScheduler { } } } + + async refreshCubesRefreshKey(context, compilerApi, queryingOptions) { + const dbType = compilerApi.getDbType(); + const compilers = await compilerApi.getCompilers(); + const queryForEvaluation = compilerApi.createQuery(compilers, dbType, {}); + await Promise.all(queryForEvaluation.cubeEvaluator.cubeNamesWithRefreshKeys().map(async cube => { + const cubeFromPath = queryForEvaluation.cubeEvaluator.cubeFromPath(cube); + const measuresCount = Object.keys(cubeFromPath.measures || {}).length; + const dimensionsCount = Object.keys(cubeFromPath.dimensions || {}).length; + if (measuresCount === 0 && dimensionsCount === 0) { + return; + } + const query = { + ...queryingOptions, + ...( + measuresCount && + { measures: [`${cube}.${Object.keys(cubeFromPath.measures)[0]}`] } + ), + ...( + dimensionsCount && + { dimensions: [`${cube}.${Object.keys(cubeFromPath.dimensions)[0]}`] } + ) + }; + const sqlQuery = await compilerApi.getSql(query); + const orchestratorApi = this.serverCore.getOrchestratorApi({ ...context, dataSource: sqlQuery.dataSource }); + await orchestratorApi.executeQuery({ + ...sqlQuery, + query: 'SELECT 1', // TODO get rid off it + continueWait: true, + renewQuery: true, + requestId: context.requestId + }); + })); + } + + async refreshPreAggregations(context, compilerApi, queryingOptions) { + const scheduledPreAggregations = await compilerApi.scheduledPreAggregations(); + await Promise.all(scheduledPreAggregations.map(async preAggregation => { + const queries = await this.refreshQueriesForPreAggregation( + context, compilerApi, preAggregation, queryingOptions + ); + await Promise.all(queries.map(async (query, i) => { + const sqlQuery = await compilerApi.getSql(query); + const orchestratorApi = this.serverCore.getOrchestratorApi({ ...context, dataSource: sqlQuery.dataSource }); + await orchestratorApi.executeQuery({ + ...sqlQuery, + preAggregations: sqlQuery.preAggregations.map( + (p) => ({ ...p, priority: i - queries.length }) + ), + continueWait: true, + renewQuery: true, + requestId: context.requestId + }); + })); + })); + } } module.exports = RefreshScheduler; diff --git a/packages/cubejs-server-core/core/index.js b/packages/cubejs-server-core/core/index.js index 15d19010a0d2c..ff54586df70f8 100644 --- a/packages/cubejs-server-core/core/index.js +++ b/packages/cubejs-server-core/core/index.js @@ -48,10 +48,11 @@ const checkEnvForPlaceholders = () => { } }; -const devLogger = (level) => (type, message, error) => { +const devLogger = (level) => (type, { error, warning, ...message }) => { const colors = { red: '31', // ERROR green: '32', // INFO + yellow: '33', // WARNING }; const withColor = (str, color = colors.green) => `\u001b[${color}m${str}\u001b[0m`; @@ -79,8 +80,10 @@ const devLogger = (level) => (type, message, error) => { return restParams; }; + const logWarning = () => console.log( + `${withColor(type, colors.yellow)}: ${format({ ...message, allSqlLines: true })} \n${withColor(warning, colors.yellow)}` + ); const logError = () => console.log(`${withColor(type, colors.red)}: ${format({ ...message, allSqlLines: true })} \n${error}`); - const logType = () => console.log(`${withColor(type)}`); const logDetails = () => console.log(`${withColor(type)}: ${format(message)}`); if (error) { @@ -88,21 +91,37 @@ const devLogger = (level) => (type, message, error) => { return; } - switch (level) { - case "ERROR": - logType(); - break; - case "TRACE": - logDetails(); - break; - case "INFO": - default: { - if ([ + // eslint-disable-next-line default-case + switch ((level || 'info').toLowerCase()) { + case "trace": { + if (!error && !warning) { + logDetails(); + break; + } + } + // eslint-disable-next-line no-fallthrough + case "info": { + if (!error && !warning && [ 'Load Request Success', 'Performing query', 'Performing query completed', ].includes(type)) { logDetails(); + break; + } + } + // eslint-disable-next-line no-fallthrough + case "warn": { + if (!error && warning) { + logWarning(); + break; + } + } + // eslint-disable-next-line no-fallthrough + case "error": { + if (error) { + logError(); + break; } } } @@ -133,9 +152,8 @@ class CubejsServerCore { this.schemaPath = options.schemaPath || 'schema'; this.dbType = options.dbType; this.logger = options.logger || ((msg, params) => { - const { error, ...restParams } = params; if (process.env.NODE_ENV !== 'production') { - devLogger(process.env.CUBEJS_LOG_LEVEL)(msg, restParams, error); + devLogger(process.env.CUBEJS_LOG_LEVEL)(msg, params); } else { console.log(JSON.stringify({ message: msg, ...params })); } @@ -166,6 +184,13 @@ class CubejsServerCore { setInterval(() => this.compilerCache.prune(), options.maxCompilerCacheKeepAlive); } + if (options.scheduledRefreshTimer) { + setInterval( + () => this.runScheduledRefresh(), + typeof options.scheduledRefreshTimer === 'number' ? (options.scheduledRefreshTimer * 1000) : 5000 + ); + } + const { machineIdSync } = require('node-machine-id'); let anonymousId = 'unknown'; try { @@ -212,7 +237,8 @@ class CubejsServerCore { msg === 'Internal Server Error' || msg === 'User Error' || msg === 'Compiling schema' || - msg === 'Recompiling schema' + msg === 'Recompiling schema' || + msg === 'Slow Query Warning' ) { this.event(msg, { error: params.error }); }