diff --git a/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js b/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js index fe199a8eefa8c..3a834b48bd227 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js +++ b/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js @@ -58,16 +58,14 @@ class PreAggregationLoadCache { async tablesFromCache(preAggregation, forceRenew) { let tables = forceRenew ? null : await this.cacheDriver.get(this.tablesRedisKey(preAggregation)); if (!tables) { - if (this.fetchTablesPromise) { - tables = await this.fetchTablesPromise; - } else { - this.fetchTablesPromise = this.fetchTables(preAggregation); - try { - tables = await this.fetchTablesPromise; - } finally { - this.fetchTablesPromise = null; - } - } + tables = await this.preAggregations.getLoadCacheQueue().executeInQueue( + 'query', + preAggregation.preAggregationsSchema, + { + preAggregation + }, + 0 + ); } return tables; } @@ -596,6 +594,25 @@ class PreAggregations { return this.queue; } + getLoadCacheQueue() { + if (!this.loadCacheQueue) { + this.loadCacheQueue = QueryCache.createQueue(`SQL_PRE_AGGREGATIONS_CACHE_${this.redisPrefix}`, this.driverFactory, (client, q) => { + const { + preAggregation + } = q; + const loadCache = new PreAggregationLoadCache(this.redisPrefix, this.driverFactory, this.queryCache, this); + return loadCache.fetchTables(preAggregation); + }, { + concurrency: 4, + logger: this.logger, + cacheAndQueueDriver: this.options.cacheAndQueueDriver, + redisPool: this.options.redisPool, + ...this.options.loadCacheQueueOptions + }); + } + return this.loadCacheQueue; + } + static preAggregationQueryCacheKey(preAggregation) { return preAggregation.tableName; }