Skip to content

Commit

Permalink
fix: Limit pre-aggregations fetch table requests using queue -- handl…
Browse files Browse the repository at this point in the history
…e HA for pre-aggregations
  • Loading branch information
paveltiunov committed Mar 1, 2020
1 parent 7bcb941 commit 75833b1
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,14 @@ class PreAggregationLoadCache {
async tablesFromCache(preAggregation, forceRenew) {
let tables = forceRenew ? null : await this.cacheDriver.get(this.tablesRedisKey(preAggregation));
if (!tables) {
if (this.fetchTablesPromise) {
tables = await this.fetchTablesPromise;
} else {
this.fetchTablesPromise = this.fetchTables(preAggregation);
try {
tables = await this.fetchTablesPromise;
} finally {
this.fetchTablesPromise = null;
}
}
tables = await this.preAggregations.getLoadCacheQueue().executeInQueue(
'query',
preAggregation.preAggregationsSchema,
{
preAggregation
},
0
);
}
return tables;
}
Expand Down Expand Up @@ -596,6 +594,25 @@ class PreAggregations {
return this.queue;
}

getLoadCacheQueue() {
if (!this.loadCacheQueue) {
this.loadCacheQueue = QueryCache.createQueue(`SQL_PRE_AGGREGATIONS_CACHE_${this.redisPrefix}`, this.driverFactory, (client, q) => {
const {
preAggregation
} = q;
const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this);
return loadCache.fetchTables(preAggregation);
}, {
concurrency: 4,
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
...this.options.loadCacheQueueOptions
});
}
return this.loadCacheQueue;
}

static preAggregationQueryCacheKey(preAggregation) {
return preAggregation.tableName;
}
Expand Down

0 comments on commit 75833b1

Please sign in to comment.