Skip to content

Commit

Permalink
fix: Re-use external connection for CubeStore in queue (#6028)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr authored Jan 18, 2023
1 parent 0f4a431 commit ff724f7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import {
StreamOptions,
UnloadOptions
} from '@cubejs-backend/base-driver';
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
import { PreAggTableToTempTable, Query, QueryBody, QueryCache, QueryTuple, QueryWithParams } from './QueryCache';
import { ContinueWaitError } from './ContinueWaitError';
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
import { QueryQueue } from './QueryQueue';
import { LargeStreamWarning } from './StreamObjectsCounter';
import { CacheAndQueryDriverType } from './QueryOrchestrator';
import { RedisPool } from './RedisPool';

/// Name of the inline table containing the lambda rows.
export const LAMBDA_TABLE_PREFIX = 'lambda';
Expand Down Expand Up @@ -1883,7 +1885,8 @@ type PreAggregationsOptions = {
orphanedTimeout?: number;
heartBeatInterval?: number;
}>;
redisPool?: any;
redisPool?: RedisPool;
cubeStoreDriverFactory?: () => Promise<CubeStoreDriver>;
continueWaitTimeout?: number;
cacheAndQueueDriver?: CacheAndQueryDriverType;
skipExternalCacheAndQueue?: boolean;
Expand Down Expand Up @@ -2264,6 +2267,7 @@ export class PreAggregations {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
// Centralized continueWaitTimeout that can be overridden in queueOptions
continueWaitTimeout: this.options.continueWaitTimeout,
...(await this.options.queueOptions(dataSource)),
Expand Down Expand Up @@ -2310,6 +2314,7 @@ export class PreAggregations {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
...this.options.loadCacheQueueOptions
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ export class QueryCache {
this.cacheDriver = new LocalCacheDriver();
break;
case 'cubestore':
if (!options.cubeStoreDriverFactory) {
throw new Error('cubeStoreDriverFactory is a required option for Cube Store cache driver');
}

this.cacheDriver = new CubeStoreCacheDriver(
options.cubeStoreDriverFactory || (async () => new CubeStoreDriver({}))
options.cubeStoreDriverFactory
);
break;
default:
Expand Down Expand Up @@ -498,6 +502,7 @@ export class QueryCache {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
// Centralized continueWaitTimeout that can be overridden in queueOptions
continueWaitTimeout: this.options.continueWaitTimeout,
...(await this.options.queueOptions(dataSource)),
Expand Down Expand Up @@ -546,6 +551,7 @@ export class QueryCache {
logger: this.logger,
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
redisPool: this.options.redisPool,
cubeStoreDriverFactory: this.options.cubeStoreDriverFactory,
// Centralized continueWaitTimeout that can be overridden in queueOptions
continueWaitTimeout: this.options.continueWaitTimeout,
skipQueue: this.options.skipExternalCacheAndQueue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ export class QueryOrchestrator {
externalDriverFactory,
cacheAndQueueDriver,
redisPool,
cubeStoreDriverFactory,
continueWaitTimeout,
skipExternalCacheAndQueue,
...options.preAggregationsOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ function factoryQueueDriver(cacheAndQueueDriver, queueDriverOptions) {
case 'memory':
return new LocalQueueDriver(queueDriverOptions);
case 'cubestore':
if (!queueDriverOptions.cubeStoreDriverFactory) {
throw new Error('cubeStoreDriverFactory is a required option for Cube Store queue driver');
}

return new CubeStoreQueueDriver(
queueDriverOptions.cubeStoreDriverFactory || (async () => new CubeStoreDriver({})),
queueDriverOptions.cubeStoreDriverFactory,
queueDriverOptions
);
default:
Expand Down

0 comments on commit ff724f7

Please sign in to comment.