Skip to content

Commit

Permalink
feat: Scheduled refresh for pre-aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Jan 13, 2020
1 parent 452086d commit c87b525
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 39 deletions.
22 changes: 15 additions & 7 deletions packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,18 @@ class PreAggregationLoadCache {
return this.versionEntries;
}

async keyQueryResult(keyQuery) {
async keyQueryResult(keyQuery, waitForRenew) {
if (!this.queryResults[this.queryCache.queryRedisKey(keyQuery)]) {
this.queryResults[this.queryCache.queryRedisKey(keyQuery)] = await this.queryCache.cacheQueryResult(
Array.isArray(keyQuery) ? keyQuery[0] : keyQuery,
Array.isArray(keyQuery) ? keyQuery[1] : [],
keyQuery,
60 * 60,
{ renewalThreshold: 5 * 60, renewalKey: keyQuery }
{
renewalThreshold: this.queryCache.options.refreshKeyRenewalThreshold || 2 * 60,
renewalKey: keyQuery,
waitForRenew
}
);
}
return this.queryResults[this.queryCache.queryRedisKey(keyQuery)];
Expand Down Expand Up @@ -265,15 +269,15 @@ class PreAggregationLoader {
preAggregation: this.preAggregation,
requestId: this.requestId
});
await this.executeInQueue(invalidationKeys, 10, newVersionEntry);
await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry);
return mostRecentTargetTableName();
} else if (versionEntry.content_version !== newVersionEntry.content_version) {
if (this.waitForRenew) {
this.logger('Waiting for pre-aggregation renew', {
preAggregation: this.preAggregation,
requestId: this.requestId
});
await this.executeInQueue(invalidationKeys, 0, newVersionEntry);
await this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry);
return mostRecentTargetTableName();
} else {
if (
Expand All @@ -291,16 +295,20 @@ class PreAggregationLoader {
preAggregation: this.preAggregation,
requestId: this.requestId
});
await this.executeInQueue(invalidationKeys, 10, newVersionEntry);
await this.executeInQueue(invalidationKeys, this.priority(10), newVersionEntry);
return mostRecentTargetTableName();
}
return this.targetTableName(versionEntry);
}

priority(defaultValue) {
return this.preAggregation.priority != null ? this.preAggregation.priority : defaultValue;
}

getInvalidationKeyValues() {
return Promise.all(
(this.preAggregation.invalidateKeyQueries || [])
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery))
.map(keyQuery => this.loadCache.keyQueryResult(keyQuery, this.waitForRenew))
);
}

Expand All @@ -309,7 +317,7 @@ class PreAggregationLoader {
preAggregation: this.preAggregation,
requestId: this.requestId
});
this.executeInQueue(invalidationKeys, 0, newVersionEntry)
this.executeInQueue(invalidationKeys, this.priority(0), newVersionEntry)
.then(() => {
delete this.preAggregations.refreshErrors[newVersionEntry.table_name];
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,18 @@ class QueryOrchestrator {
async fetchQuery(queryBody) {
return this.preAggregations.loadAllPreAggregationsIfNeeded(queryBody)
.then(async preAggregationsTablesToTempTables => {
const usedPreAggregations = R.fromPairs(preAggregationsTablesToTempTables);
if (!queryBody.query) {
return {
usedPreAggregations
};
}
const result = await this.queryCache.cachedQueryResult(
queryBody, preAggregationsTablesToTempTables
);
return {
...result,
usedPreAggregations: R.fromPairs(preAggregationsTablesToTempTables)
usedPreAggregations
};
});
}
Expand Down
6 changes: 3 additions & 3 deletions packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ class QueryQueue {
if (priority == null) {
priority = 0;
}
if (!(priority >= 0 && priority <= 100)) {
throw new Error('Priority should be between 0 and 100');
if (!(priority >= -10000 && priority <= 10000)) {
throw new Error('Priority should be between -10000 and 10000');
}
let result = await redisClient.getResult(queryKey);
if (result) {
return this.parseResult(result);
}
const time = new Date().getTime();
const keyScore = time + (100 - priority) * 1E14;
const keyScore = time + (10000 - priority) * 1E14;

// eslint-disable-next-line no-unused-vars
const [added, b, c, queueSize] = await redisClient.addToQueue(
Expand Down
13 changes: 13 additions & 0 deletions packages/cubejs-query-orchestrator/test/QueryQueueTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ const QueryQueueTest = (name, options) => {
should(await queue.getQueryStage('12')).be.eql(undefined);
});

it('negative priority', async () => {
delayCount = 0;
const results = [];
await Promise.all([
queue.executeInQueue('delay', '31', { delay: 100, result: '4' }, -10).then(r => results.push(r)),
queue.executeInQueue('delay', '32', { delay: 100, result: '3' }, -9).then(r => results.push(r)),
queue.executeInQueue('delay', '33', { delay: 100, result: '2' }, -8).then(r => results.push(r)),
queue.executeInQueue('delay', '34', { delay: 100, result: '1' }, -7).then(r => results.push(r))
]);

should(results).be.eql(['10', '21', '32', '43']);
});

it('orphaned', async () => {
for (let i = 1; i <= 4; i++) {
await queue.executeInQueue('delay', `11` + i, { delay: 50, result: `${i}` }, 0);
Expand Down
38 changes: 30 additions & 8 deletions packages/cubejs-schema-compiler/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,17 @@ class BaseQuery {
}

get dataSource() {
const dataSources = R.uniq(this.allCubeNames.map(c => this.cubeEvaluator.cubeFromPath(c).dataSource || 'default'));
const dataSources = R.uniq(this.allCubeNames.map(c => this.cubeDataSource(c)));
if (dataSources.length > 1) {
throw new UserError(`Joins across data sources aren't supported in community edition. Found data sources: ${dataSources.join(', ')}`);
}
return dataSources[0];
}

cubeDataSource(cube) {
return this.cubeEvaluator.cubeFromPath(cube).dataSource || 'default';
}

get aliasNameToMember() {
return R.fromPairs(
this.measures.map(m => [m.unescapedAliasName(), m.measure]).concat(
Expand Down Expand Up @@ -1335,13 +1339,9 @@ class BaseQuery {
if (timeDimensions.length) {
const dimension = timeDimensions.filter(f => f.toLowerCase().indexOf('update') !== -1)[0] || timeDimensions[0];
const foundMainTimeDimension = this.newTimeDimension({ dimension });
const cubeNamesForTimeDimension = this.collectFrom(
[foundMainTimeDimension],
this.collectCubeNamesFor.bind(this)
);
if (cubeNamesForTimeDimension.length === 1 && cubeNamesForTimeDimension[0] === cube) {
const dimensionSql = this.dimensionSql(foundMainTimeDimension);
return `select max(${dimensionSql}) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
const aggSelect = this.aggSelectForDimension(cube, foundMainTimeDimension, 'max');
if (aggSelect) {
return aggSelect;
}
}
return `select count(*) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
Expand All @@ -1352,6 +1352,18 @@ class BaseQuery {
};
}

aggSelectForDimension(cube, dimension, aggFunction) {
const cubeNamesForTimeDimension = this.collectFrom(
[dimension],
this.collectCubeNamesFor.bind(this)
);
if (cubeNamesForTimeDimension.length === 1 && cubeNamesForTimeDimension[0] === cube) {
const dimensionSql = this.dimensionSql(dimension);
return `select ${aggFunction}(${dimensionSql}) from ${this.cubeSql(cube)} ${this.asSyntaxTable} ${this.cubeAlias(cube)}`;
}
return null;
}

cubeCardinalityQueries() { // TODO collect sub queries
return R.fromPairs(this.collectCubeNames()
.map(cube => [
Expand Down Expand Up @@ -1420,6 +1432,16 @@ class BaseQuery {
)];
}

preAggregationStartEndQueries(cube, preAggregation) {
const references = this.cubeEvaluator.evaluatePreAggregationReferences(cube, preAggregation);
const timeDimension = this.newTimeDimension(references.timeDimensions[0]);

return this.evaluateSymbolSqlWithContext(() => [
this.paramAllocator.buildSqlAndParams(this.aggSelectForDimension(cube, timeDimension, 'min')),
this.paramAllocator.buildSqlAndParams(this.aggSelectForDimension(cube, timeDimension, 'max'))
], { preAggregationQuery: true });
}

parametrizedContextSymbols() {
return Object.assign({
filterParams: this.filtersProxy(),
Expand Down
18 changes: 1 addition & 17 deletions packages/cubejs-schema-compiler/adapter/PreAggregations.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,23 +422,7 @@ class PreAggregations {
}

evaluateAllReferences(cube, aggregation) {
const timeDimensions = aggregation.timeDimensionReference ? [{
dimension: this.evaluateReferences(cube, aggregation.timeDimensionReference),
granularity: this.castGranularity(aggregation.granularity)
}] : [];
return {
dimensions:
(aggregation.dimensionReferences && this.evaluateReferences(cube, aggregation.dimensionReferences) || []).concat(
aggregation.segmentReferences && this.evaluateReferences(cube, aggregation.segmentReferences) || []
),
measures:
aggregation.measureReferences && this.evaluateReferences(cube, aggregation.measureReferences) || [],
timeDimensions
};
}

evaluateReferences(cube, referencesFn) {
return this.query.cubeEvaluator.evaluateReferences(cube, referencesFn);
return this.query.cubeEvaluator.evaluatePreAggregationReferences(cube, aggregation);
}

rollupPreAggregation(preAggregationForQuery) {
Expand Down
33 changes: 33 additions & 0 deletions packages/cubejs-schema-compiler/compiler/CubeEvaluator.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ class CubeEvaluator extends CubeSymbols {
return this.cubeFromPath(path).preAggregations || {};
}

scheduledPreAggregations() {
return Object.keys(this.evaluatedCubes).map(cube => {
const preAggregations = this.preAggregationsForCube(cube);
return Object.keys(preAggregations)
.filter(name => preAggregations[name].scheduledRefresh)
.map(preAggregationName => ({
preAggregationName,
preAggregation: preAggregations[preAggregationName],
cube,
references: preAggregations[preAggregationName].type === 'rollup' ?
this.evaluatePreAggregationReferences(cube, preAggregations[preAggregationName]) :
null
}));
}).reduce((a, b) => a.concat(b), []);
}

isMeasure(measurePath) {
return this.isInstanceOfType('measures', measurePath);
}
Expand Down Expand Up @@ -147,6 +163,23 @@ class CubeEvaluator extends CubeSymbols {
const references = arrayOrSingle.map(p => p.toString());
return options.originalSorting ? references : R.sortBy(R.identity, references);
}

evaluatePreAggregationReferences(cube, aggregation) {
const timeDimensions = aggregation.timeDimensionReference ? [{
dimension: this.evaluateReferences(cube, aggregation.timeDimensionReference),
granularity: aggregation.granularity
}] : [];
return {
dimensions:
(aggregation.dimensionReferences && this.evaluateReferences(cube, aggregation.dimensionReferences) || [])
.concat(
aggregation.segmentReferences && this.evaluateReferences(cube, aggregation.segmentReferences) || []
),
measures:
aggregation.measureReferences && this.evaluateReferences(cube, aggregation.measureReferences) || [],
timeDimensions
};
}
}

module.exports = CubeEvaluator;
3 changes: 2 additions & 1 deletion packages/cubejs-schema-compiler/compiler/CubeValidator.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const BasePreAggregation = {
}),
useOriginalSqlPreAggregations: Joi.boolean(),
external: Joi.boolean(),
partitionGranularity: Joi.any().valid('day', 'week', 'month', 'year')
partitionGranularity: Joi.any().valid('day', 'week', 'month', 'year'),
scheduledRefresh: Joi.boolean()
};

const cubeSchema = Joi.object().keys({
Expand Down
5 changes: 5 additions & 0 deletions packages/cubejs-server-core/core/CompilerApi.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class CompilerApi {
}));
}

async scheduledPreAggregations() {
const { cubeEvaluator } = await this.getCompilers();
return cubeEvaluator.scheduledPreAggregations();
}

createQuery(compilers, dbType, query) {
return QueryBuilder.query(
compilers,
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-server-core/core/OrchestratorApi.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class OrchestratorApi {
}

async executeQuery(query) {
const queryForLog = query.query.replace(/\s+/g, ' ');
const queryForLog = query.query && query.query.replace(/\s+/g, ' ');
const startQueryTime = (new Date()).getTime();

try {
Expand Down
Loading

0 comments on commit c87b525

Please sign in to comment.