diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index eaea30e1cf390..7ba51c983b07d 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -300,7 +300,7 @@ jobs: CUBEJS_REDIS_SENTINEL: "redis+sentinel://localhost:5000,localhost:5001,localhost:5002/mymaster/0" integration-cubestore: - needs: [unit, lint, latest-tag-sha] + needs: [latest-tag-sha] runs-on: ubuntu-20.04 timeout-minutes: 60 if: (needs['latest-tag-sha'].outputs.sha != github.sha) @@ -316,7 +316,7 @@ jobs: - name: Install Rust uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2022-03-08 + toolchain: nightly-2022-06-22 override: true components: rustfmt - name: Install Node.js ${{ matrix.node-version }} @@ -351,6 +351,17 @@ jobs: command: yarn install --frozen-lockfile - name: Lerna tsc run: yarn tsc + - uses: Swatinem/rust-cache@v1 + with: + working-directory: ./rust/cubestore + key: ubuntu-20.04 + - name: Build cubestore + uses: actions-rs/cargo@v1 + with: + command: build + args: --manifest-path rust/cubestore/Cargo.toml -j 4 + - name: Run Cube Store in background + run: RUNNER_TRACKING_ID="" && ./rust/cubestore/target/debug/cubestored & - name: Run Cubestore Integration timeout-minutes: 10 run: | @@ -367,7 +378,7 @@ jobs: node-version: [14.x] db: [ 'clickhouse', 'druid', 'elasticsearch', 'mssql', 'mysql', 'postgres', 'prestodb', - 'mysql-aurora-serverless', 'cubestore', 'crate' + 'mysql-aurora-serverless', 'crate' ] fail-fast: false diff --git a/package.json b/package.json index a7b272c15206e..e64fc283eb3cc 100644 --- a/package.json +++ b/package.json @@ -32,9 +32,9 @@ "@typescript-eslint/eslint-plugin": "^4.17.0", "core-js": "^3.6.5", "lerna": "^4.0.0", + "patch-package": "^6.4.7", "ramda": "^0.27.0", "rollup-plugin-dts": "^1.1.8", - "patch-package": "^6.4.7", "whatwg-fetch": "^3.0.0" }, "files": [ @@ -76,5 +76,6 @@ "@types/ramda": "0.27.40", "rc-tree": "4.1.5" }, - "license": "MIT" + "license": "MIT", + "packageManager": "yarn@1.22.19" } diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index 4e66e509b7d8f..2f7e5bee0862a 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -1,23 +1,61 @@ +export type QueryDef = unknown; +export type QueryKey = string | [string, any[]]; + +export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number]; +export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record]; +export type RetrieveForProcessingResponse = [added: any, removed: any, active: string[], toProcess: any, def: QueryDef, lockAquired: boolean] | null; + +export interface AddToQueueQuery { + isJob: boolean, + orphanedTimeout: unknown +} + +export interface AddToQueueOptions { + stageQueryKey: string, + requestId: string +} + +export interface QueueDriverOptions { + redisQueuePrefix: string, + concurrency: number, + continueWaitTimeout: number, + orphanedTimeout: number, + heartBeatTimeout: number, + getQueueEventsBus?: any, +} + export interface QueueDriverConnectionInterface { + redisHash(queryKey: QueryKey): string; getResultBlocking(queryKey: string): Promise; - getResult(queryKey: string): Promise; - addToQueue(queryKey: string): Promise; - getToProcessQueries(): Promise; - getActiveQueries(): Promise; - getOrphanedQueries(): Promise; - getStalledQueries(): Promise; - getQueryStageState(onlyKeys: any): Promise; + getResult(queryKey: string): Promise; + addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise; + // Return query keys which was sorted by priority and time + getToProcessQueries(): Promise; + getActiveQueries(): Promise; + getQueryDef(queryKey: string): Promise; + // Queries which was added to queue, but was not processed and not needed + getOrphanedQueries(): Promise; + // Queries which was not completed with old heartbeat + getStalledQueries(): Promise; + getQueryStageState(onlyKeys: boolean): Promise; updateHeartBeat(queryKey: string): Promise; - getNextProcessingId(): Promise; - retrieveForProcessing(queryKey: string, processingId: string): Promise; - freeProcessingLock(queryKe: string, processingId: string, activated: unknown): Promise; - optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise; - cancelQuery(queryKey: string): Promise; + getNextProcessingId(): Promise; + // Trying to acquire a lock for processing a queue item, this method can return null when + // multiple nodes tries to process the same query + retrieveForProcessing(queryKey: string, processingId: number | string): Promise; + freeProcessingLock(queryKe: string, processingId: string | number, activated: unknown): Promise; + optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise; + cancelQuery(queryKey: string): Promise<[QueryDef]>; + getQueryAndRemove(queryKey: string): Promise<[QueryDef]>; setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise; - release(): Promise; + release(): void; + // + getQueriesToCancel(): Promise + getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]>; } export interface QueueDriverInterface { + redisHash(queryKey: QueryKey): string; createConnection(): Promise; - release(connection: QueueDriverConnectionInterface): Promise; + release(connection: QueueDriverConnectionInterface): void; } diff --git a/packages/cubejs-cubestore-driver/package.json b/packages/cubejs-cubestore-driver/package.json index 1b3d4b97d3d00..0104e46cb2767 100644 --- a/packages/cubejs-cubestore-driver/package.json +++ b/packages/cubejs-cubestore-driver/package.json @@ -34,9 +34,8 @@ "fs-extra": "^9.1.0", "generic-pool": "^3.6.0", "moment-timezone": "^0.5.31", - "mysql": "^2.16.0", "node-fetch": "^2.6.1", - "sqlstring": "^2.3.2", + "sqlstring": "^2.3.3", "tempy": "^1.0.1", "uuid": "^8.3.2", "ws": "^7.4.3" @@ -45,7 +44,6 @@ "@cubejs-backend/linter": "^0.31.0", "@types/flatbuffers": "^1.10.0", "@types/generic-pool": "^3.1.9", - "@types/mysql": "^2.15.17", "@types/ws": "^7.4.0", "jest": "^26.6.3", "typescript": "~4.1.5" diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts new file mode 100644 index 0000000000000..1acf6aef9c2fb --- /dev/null +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -0,0 +1,289 @@ +import { + QueueDriverInterface, + QueueDriverConnectionInterface, + QueryStageStateResponse, + QueryDef, + RetrieveForProcessingResponse, + QueueDriverOptions, + AddToQueueQuery, + AddToQueueOptions, AddToQueueResponse, QueryKey, +} from '@cubejs-backend/base-driver'; + +import crypto from 'crypto'; +import { CubeStoreDriver } from './CubeStoreDriver'; + +function hashQueryKey(queryKey: QueryKey) { + return crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex'); +} + +class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { + public constructor( + protected readonly driver: CubeStoreDriver, + protected readonly options: QueueDriverOptions, + ) { } + + public redisHash(queryKey: QueryKey): string { + return hashQueryKey(queryKey); + } + + protected prefixKey(queryKey: QueryKey): string { + return `${this.options.redisQueuePrefix}:${queryKey}`; + } + + public async addToQueue( + keyScore: number, + queryKey: QueryKey, + orphanedTime: any, + queryHandler: string, + query: AddToQueueQuery, + priority: number, + options: AddToQueueOptions + ): Promise { + // TODO: Fix sqlparser, support negative number + priority = priority < 0 ? 0 : priority; + + const data = { + queryHandler, + query, + queryKey, + stageQueryKey: options.stageQueryKey, + priority, + requestId: options.requestId, + addedToQueueTime: new Date().getTime() + }; + + const _rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [ + priority, + this.prefixKey(this.redisHash(queryKey)), + JSON.stringify(data) + ]); + + return [ + 1, + null, + null, + 1, + data.addedToQueueTime + ]; + } + + // TODO: Looks useless, because we can do it in one step - getQueriesToCancel + public async getQueryAndRemove(queryKey: string): Promise<[QueryDef]> { + return this.cancelQuery(queryKey); + } + + public async cancelQuery(queryKey: string): Promise<[QueryDef]> { + const rows = await this.driver.query('QUEUE CANCEL ?', [ + this.prefixKey(queryKey) + ]); + if (rows && rows.length) { + return [JSON.parse(rows[0].value)]; + } + + throw new Error(`Unable to cancel query with id: "${this.prefixKey(queryKey)}"`); + } + + public async freeProcessingLock(queryKey: string, processingId: string, activated: unknown): Promise { + // nothing to do + } + + public async getActiveQueries(): Promise { + const rows = await this.driver.query('/* getActiveQueries */ QUEUE ACTIVE ?', [ + this.options.redisQueuePrefix + ]); + return rows.map((row) => row.id); + } + + public async getToProcessQueries(): Promise { + const rows = await this.driver.query('/* getToProcessQueries */ QUEUE PENDING ?', [ + this.options.redisQueuePrefix + ]); + return rows.map((row) => row.id); + } + + public async getActiveAndToProcess(): Promise<[string[], string[]]> { + const rows = await this.driver.query('QUEUE LIST ?', [ + this.options.redisQueuePrefix + ]); + if (rows.length) { + return [ + rows.filter((item) => item.status === 'active').map((row) => row.id), + rows.filter((item) => item.status === 'pending').map((row) => row.id), + ]; + } + + return [[], []]; + } + + public async getNextProcessingId(): Promise { + const rows = await this.driver.query('CACHE INCR ?', [ + `${this.options.redisQueuePrefix}:PROCESSING_COUNTER` + ]); + if (rows && rows.length) { + return rows[0].value; + } + + throw new Error('Unable to get next processing id'); + } + + public async getQueryStageState(onlyKeys: boolean): Promise { + const rows = await this.driver.query(`QUEUE LIST ${onlyKeys ? '' : ' WITH_PAYLOAD '} ?`, [ + this.options.redisQueuePrefix + ]); + + const defs: Record = {}; + const toProcess: string[] = []; + const active: string[] = []; + + for (const row of rows) { + if (!onlyKeys) { + defs[row.id] = JSON.parse(row.payload); + } + + if (row.status === 'pending') { + toProcess.push(row.status); + } else if (row.status === 'active') { + active.push(row.status); + } + } + + return [toProcess, active, defs]; + } + + public async getResult(queryKey: string): Promise { + const rows = await this.driver.query('QUEUE RESULT ?', [ + this.prefixKey(this.redisHash(queryKey)), + ]); + if (rows && rows.length) { + return JSON.parse(rows[0].value); + } + + return null; + } + + public async getStalledQueries(): Promise { + const rows = await this.driver.query('/* getStalledQueries */ QUEUE STALLED ? ?', [ + this.options.heartBeatTimeout * 1000, + this.options.redisQueuePrefix + ]); + return rows.map((row) => row.id); + } + + public async getOrphanedQueries(): Promise { + const rows = await this.driver.query('/* getOrphanedQueries */ QUEUE ORPHANED ? ?', [ + this.options.orphanedTimeout * 1000, + this.options.redisQueuePrefix + ]); + return rows.map((row) => row.id); + } + + public async getQueriesToCancel(): Promise { + const rows = await this.driver.query('/* getQueriesToCancel */ QUEUE TO_CANCEL ? ? ?', [ + this.options.heartBeatTimeout * 1000, + this.options.orphanedTimeout * 1000, + this.options.redisQueuePrefix, + ]); + return rows.map((row) => row.id); + } + + public async getQueryDef(queryKey: string): Promise { + const rows = await this.driver.query('/* getQueryDef */ QUEUE GET ?', [ + this.prefixKey(this.redisHash(queryKey)) + ]); + if (rows && rows.length) { + return Object.assign(JSON.parse(rows[0].value), JSON.parse(rows[0].extra)); + } + + return null; + } + + public async optimisticQueryUpdate(queryKey: any, toUpdate: any, _processingId: any): Promise { + await this.driver.query('/* optimisticQueryUpdate */ QUEUE MERGE_EXTRA ? ?', [ + this.prefixKey(queryKey), + JSON.stringify(toUpdate) + ]); + + return true; + } + + public async release(): Promise { + // throw new Error('Unimplemented release'); + } + + public async retrieveForProcessing(queryKey: string, _processingId: string): Promise { + const rows = await this.driver.query('/* retrieveForProcessing */ QUEUE RETRIEVE CONCURRENCY ? ?', [ + this.options.concurrency, + this.prefixKey(queryKey), + ]); + if (rows && rows.length) { + const addedCount = 1; + const active = [this.redisHash(queryKey)]; + const toProcess = 0; + const lockAcquired = true; + const def = JSON.parse(rows[0].value); + + return [ + addedCount, null, active, toProcess, def, lockAcquired + ]; + } + + return null; + } + + public async getResultBlocking(queryKey: string): Promise { + const rows = await this.driver.query('QUEUE RESULT_BLOCKING ? ?', [ + this.options.continueWaitTimeout * 1000, + this.prefixKey(this.redisHash(queryKey)), + ]); + if (rows && rows.length) { + return JSON.parse(rows[0].value); + } + + return null; + } + + public async setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise { + await this.driver.query('QUEUE ACK ? ? ', [ + this.prefixKey(queryKey), + JSON.stringify(executionResult) + ]); + + return true; + } + + public async updateHeartBeat(queryKey: string): Promise { + await this.driver.query('QUEUE HEARTBEAT ?', [ + this.prefixKey(queryKey) + ]); + } +} + +export class CubeStoreQueueDriver implements QueueDriverInterface { + public constructor( + protected readonly driverFactory: () => Promise, + protected readonly options: QueueDriverOptions + ) {} + + protected connection: CubeStoreDriver | null = null; + + public redisHash(queryKey: QueryKey) { + return hashQueryKey(queryKey); + } + + protected async getConnection(): Promise { + if (this.connection) { + return this.connection; + } + + // eslint-disable-next-line no-return-assign + return this.connection = await this.driverFactory(); + } + + public async createConnection(): Promise { + return new CubestoreQueueDriverConnection(await this.getConnection(), this.options); + } + + public async release(): Promise { + // nothing to release + } +} diff --git a/packages/cubejs-cubestore-driver/src/index.ts b/packages/cubejs-cubestore-driver/src/index.ts index 3783bc7eb435f..9904a08d58c09 100644 --- a/packages/cubejs-cubestore-driver/src/index.ts +++ b/packages/cubejs-cubestore-driver/src/index.ts @@ -1,4 +1,5 @@ export * from './CubeStoreCacheDriver'; export * from './CubeStoreDriver'; export * from './CubeStoreDevDriver'; +export * from './CubeStoreQueueDriver'; export * from './rexport'; diff --git a/packages/cubejs-mysql-driver/package.json b/packages/cubejs-mysql-driver/package.json index ffb349dd1df93..9da36f089160d 100644 --- a/packages/cubejs-mysql-driver/package.json +++ b/packages/cubejs-mysql-driver/package.json @@ -29,9 +29,9 @@ "dependencies": { "@cubejs-backend/base-driver": "^0.31.41", "@cubejs-backend/shared": "^0.31.41", - "@types/mysql": "^2.15.15", + "@types/mysql": "^2.15.21", "generic-pool": "^3.6.0", - "mysql": "^2.16.0" + "mysql": "^2.18.1" }, "devDependencies": { "@cubejs-backend/linter": "^0.31.0", diff --git a/packages/cubejs-query-orchestrator/DEVELOPMENT.md b/packages/cubejs-query-orchestrator/DEVELOPMENT.md new file mode 100644 index 0000000000000..f22c2309fe24d --- /dev/null +++ b/packages/cubejs-query-orchestrator/DEVELOPMENT.md @@ -0,0 +1,107 @@ +### General queue: + +```mermaid +sequenceDiagram + participant BackgroundQueryQueue + participant QueueQueue + participant QueueDriverInterface + participant CubeStore + + QueueQueue->>QueueDriverInterface: getResult + QueueDriverInterface->>+CubeStore: QUEUE RESULT ? + QueueDriverInterface-->>+QueueQueue: QueryResult|null + deactivate CubeStore + + QueueQueue->>QueueDriverInterface: addToQueue + QueueDriverInterface->>+CubeStore: QUEUE ADD PRIORITY N key payload + QueueDriverInterface-->>+QueueQueue: AddToQueueResponse + + loop reconcileQueueImpl + QueueQueue->>QueueDriverInterface: getQueriesToCancel + QueueQueue->>QueueDriverInterface: getQueryAndRemove + QueueDriverInterface->>CubeStore: QUEUE TO_CANCEL ?stalled_timeout ?orphaned_timeout ?prefix + + QueueQueue->>QueueDriverInterface: getActiveQueries + QueueDriverInterface->>CubeStore: QUEUE ACTIVE ?prefix + QueueDriverInterface-->>+QueueQueue: getActiveQueriesResponse + + QueueQueue->>QueueDriverInterface: getToProcessQueries + QueueDriverInterface->>CubeStore: QUEUE PENDING ?prefix + QueueDriverInterface-->>+QueueQueue: getToProcessQueriesResponse + + QueueQueue-)+BackgroundQueryQueue: processQuery + Note over QueueQueue,BackgroundQueryQueue: Async call to procesQuery, which doesnt block here + end + + alt lookUpInActive: Lookup query in processing + QueueQueue->>QueueDriverInterface: getQueryDef + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE GET ?key + CubeStore-->>+QueueQueue: QueryDef|null + deactivate CubeStore + + QueueQueue->>QueueDriverInterface: getQueryStageState + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE LIST + CubeStore-->>+QueueQueue: TODO + deactivate CubeStore + Note over QueueQueue,QueueDriverInterface: Show waiting for query + end + + QueueQueue->>QueueDriverInterface: getResultBlocking + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE RESULT_BLOCKING ?timeout ?key + CubeStore-->>+QueueQueue: QueryResult|null + deactivate CubeStore +``` + +### Background execution process: + +```mermaid +sequenceDiagram + participant QueryOrchestrator + participant BackgroundQueryQueue + participant QueueDriverInterface + participant CubeStore + + loop processQuery: Background execution + BackgroundQueryQueue->>QueueDriverInterface: getNextProcessingId + activate CubeStore + QueueDriverInterface->>CubeStore: CACHE INCR ? + CubeStore-->>+BackgroundQueryQueue: number + deactivate CubeStore + + BackgroundQueryQueue->>QueueDriverInterface: retrieveForProcessing + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE RETRIEVE CONCURRENCY ?number ?key + CubeStore-->>+BackgroundQueryQueue: QueryDef + deactivate CubeStore + + BackgroundQueryQueue->>QueueDriverInterface: optimisticQueryUpdate + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE MERGE_EXTRA ?key {"startTime"} + CubeStore-->>+BackgroundQueryQueue: ok + deactivate CubeStore + + BackgroundQueryQueue->>QueueDriverInterface: optimisticQueryUpdate + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE MERGE_EXTRA ?key {"cancelHandler"} + CubeStore-->>+BackgroundQueryQueue: ok + deactivate CubeStore + + par executing: Query + BackgroundQueryQueue->>QueueDriverInterface: updateHeartBeat + QueueDriverInterface-->>BackgroundQueryQueue: ok + Note over BackgroundQueryQueue,QueueDriverInterface: intervalTimer + + BackgroundQueryQueue->>QueryOrchestrator: execute + QueryOrchestrator-->>BackgroundQueryQueue: result + end + + BackgroundQueryQueue->>QueueDriverInterface: setResultAndRemoveQuery + activate CubeStore + QueueDriverInterface->>CubeStore: QUEUE ACK ?key ?result + CubeStore-->>+BackgroundQueryQueue: ok + deactivate CubeStore + end +``` diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts index 1e3e0091098d5..306fce0a3ce47 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts @@ -2,7 +2,7 @@ import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-ba import { getCacheHash } from './utils'; export abstract class BaseQueueDriver implements QueueDriverInterface { - public redisHash(queryKey) { + public redisHash(queryKey: string) { return getCacheHash(queryKey); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js index 7b7a5d7b17a69..f4eb941188617 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js @@ -1,6 +1,10 @@ -const R = require('ramda'); -const { BaseQueueDriver } = require('./BaseQueueDriver'); +import R from 'ramda'; +import { QueueDriverInterface, QueueDriverConnectionInterface } from '@cubejs-backend/base-driver'; +import { BaseQueueDriver } from './BaseQueueDriver'; +/** + * @implements {QueueDriverConnectionInterface} + */ export class LocalQueueDriverConnection { constructor(driver, options) { this.redisQueuePrefix = options.redisQueuePrefix; @@ -20,6 +24,22 @@ export class LocalQueueDriverConnection { this.getQueueEventsBus = options.getQueueEventsBus; } + async getQueriesToCancel() { + const [stalled, orphaned] = await Promise.all([ + this.getStalledQueries(), + this.getOrphanedQueries(), + ]); + + return stalled.concat(orphaned); + } + + async getActiveAndToProcess() { + return Promise.all([ + this.getActiveQueries(), + this.getToProcessQueries() + ]); + } + getResultPromise(resultListKey) { if (!this.resultPromises[resultListKey]) { let resolveMethod = null; @@ -312,6 +332,9 @@ const heartBeat = {}; const processingCounters = {}; const processingLocks = {}; +/** + * @implements {QueueDriverInterface} + */ export class LocalQueueDriver extends BaseQueueDriver { constructor(options) { super(); diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index b02a1c8211f08..2ff7ec77317bc 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -1938,7 +1938,7 @@ export class PreAggregations { dataSource = 'default', schema: string, table: string, - key: any[], + key: string, token: string, ): Promise<[boolean, string]> { // fetching tables diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index d67becb244f7d..64b099e7eec18 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -148,7 +148,7 @@ export class QueryOrchestrator { dataSource = 'default', schema: string, table: string, - key: any[], + key: string, token: string, ): Promise<[boolean, string]> { return this.preAggregations.isPartitionExist( diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 064a5cd473f9d..9527315f604d0 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -1,5 +1,7 @@ import R from 'ramda'; import { getEnv } from '@cubejs-backend/shared'; +import { QueueDriverInterface } from '@cubejs-backend/base-driver'; +import { CubeStoreDriver, CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver'; import { TimeoutError } from './TimeoutError'; import { ContinueWaitError } from './ContinueWaitError'; @@ -9,8 +11,26 @@ import { getProcessUid } from './utils'; import { QueryStream } from './QueryStream'; /** - * QueryQueue class. + * @param cacheAndQueueDriver + * @param queueDriverOptions + * @returns {QueueDriverInterface} */ +function factoryQueueDriver(cacheAndQueueDriver, queueDriverOptions) { + switch (cacheAndQueueDriver || 'memory') { + case 'redis': + return new RedisQueueDriver(queueDriverOptions); + case 'memory': + return new LocalQueueDriver(queueDriverOptions); + case 'cubestore': + return new CubeStoreQueueDriver( + queueDriverOptions.cubeStoreDriverFactory || (async () => new CubeStoreDriver({})), + queueDriverOptions + ); + default: + throw new Error(`Unknown queue driver: ${cacheAndQueueDriver}`); + } +} + export class QueryQueue { /** * Class constructor. @@ -30,46 +50,55 @@ export class QueryQueue { this.concurrency = options.concurrency || 2; /** + * @protected * @type {number} */ this.continueWaitTimeout = options.continueWaitTimeout || 5; /** + * @protected * @type {number} */ this.executionTimeout = options.executionTimeout || getEnv('dbQueryTimeout'); /** + * @protected * @type {number} */ this.orphanedTimeout = options.orphanedTimeout || 120; /** + * @protected * @type {number} */ this.heartBeatInterval = options.heartBeatInterval || 30; /** + * @protected * @type {function(string): Promise} */ this.sendProcessMessageFn = options.sendProcessMessageFn || ((queryKey) => { this.processQuery(queryKey); }); /** + * @protected * @type {function(*): Promise} */ this.sendCancelMessageFn = options.sendCancelMessageFn || ((query) => { this.processCancel(query); }); /** + * @protected * @type {*} */ this.queryHandlers = options.queryHandlers; /** + * @protected * @type {*} */ this.cancelHandlers = options.cancelHandlers; /** + * @protected * @type {function(string, *): void} */ this.logger = options.logger || ((message, event) => console.log(`${message} ${JSON.stringify(event)}`)); @@ -81,33 +110,16 @@ export class QueryQueue { orphanedTimeout: this.orphanedTimeout, heartBeatTimeout: this.heartBeatInterval * 4, redisPool: options.redisPool, + cubeStoreDriverFactory: options.cubeStoreDriverFactory, getQueueEventsBus: options.getQueueEventsBus }; - switch (options.cacheAndQueueDriver || 'memory') { - case 'redis': - /** - * @type {LocalQueueDriver | RedisQueueDriver} - */ - this.queueDriver = new RedisQueueDriver(queueDriverOptions); - break; - case 'memory': - /** - * @type {LocalQueueDriver | RedisQueueDriver} - */ - this.queueDriver = new LocalQueueDriver(queueDriverOptions); - break; - case 'cubestore': - /** - * @type {LocalQueueDriver | RedisQueueDriver} - */ - this.queueDriver = new LocalQueueDriver(queueDriverOptions); - break; - default: - throw new Error(`Unknown queue driver: ${options.cacheAndQueueDriver}`); - } - /** + * @type {QueueDriverInterface} + */ + this.queueDriver = factoryQueueDriver(options.cacheAndQueueDriver, queueDriverOptions); + /** + * @protected * @type {boolean} */ this.skipQueue = options.skipQueue; @@ -206,7 +218,7 @@ export class QueryQueue { options, ) { options = options || {}; - const redisClient = await this.queueDriver.createConnection(); + const queueConnection = await this.queueDriver.createConnection(); try { priority = priority || 0; if (!(priority >= -10000 && priority <= 10000)) { @@ -220,7 +232,7 @@ export class QueryQueue { ? query.orphanedTimeout : this.orphanedTimeout; const orphanedTime = time + (orphanedTimeout * 1000); - const [added, _b, _c, queueSize, addedToQueueTime] = await redisClient.addToQueue( + const [added, _b, _c, queueSize, addedToQueueTime] = await queueConnection.addToQueue( keyScore, queryKey, orphanedTime, queryHandler, query, priority, options ); if (added > 0) { @@ -241,7 +253,7 @@ export class QueryQueue { } this.reconcileQueue(); } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } @@ -287,7 +299,8 @@ export class QueryQueue { const result = await this.processQuerySkipQueue(queryDef); return this.parseResult(result); } - const redisClient = await this.queueDriver.createConnection(); + + const queueConnection = await this.queueDriver.createConnection(); try { if (priority == null) { priority = 0; @@ -299,14 +312,13 @@ export class QueryQueue { // Result here won't be fetched for a forced build query and a jobed build // query (initialized by the /cubejs-system/v1/pre-aggregations/jobs // endpoint). - let result = !query.forceBuild && await redisClient.getResult(queryKey); - + let result = !query.forceBuild && await queueConnection.getResult(queryKey); if (result) { return this.parseResult(result); } if (query.forceBuild) { - const jobExists = await redisClient.getQueryDef(queryKey); + const jobExists = await queueConnection.getQueryDef(queryKey); if (jobExists) return null; } @@ -315,7 +327,7 @@ export class QueryQueue { const orphanedTimeout = 'orphanedTimeout' in query ? query.orphanedTimeout : this.orphanedTimeout; const orphanedTime = time + (orphanedTimeout * 1000); - const [added, _b, _c, queueSize, addedToQueueTime] = await redisClient.addToQueue( + const [added, _b, _c, queueSize, addedToQueueTime] = await queueConnection.addToQueue( keyScore, queryKey, orphanedTime, queryHandler, query, priority, options ); @@ -338,8 +350,8 @@ export class QueryQueue { await this.reconcileQueue(); - const queryDef = await redisClient.getQueryDef(queryKey); - const [active, toProcess] = await redisClient.getQueryStageState(true); + const queryDef = await queueConnection.getQueryDef(queryKey); + const [active, toProcess] = await queueConnection.getQueryStageState(true); if (queryDef) { this.logger('Waiting for query', { @@ -357,8 +369,8 @@ export class QueryQueue { // Result here won't be fetched for a jobed build query (initialized by // the /cubejs-system/v1/pre-aggregations/jobs endpoint). - result = !query.isJob && await redisClient.getResultBlocking(queryKey); - + result = !query.isJob && await queueConnection.getResultBlocking(queryKey); + // We don't want to throw the ContinueWaitError for a jobed build query. if (!query.isJob && !result) { throw new ContinueWaitError(); @@ -366,7 +378,7 @@ export class QueryQueue { return this.parseResult(result); } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } @@ -422,6 +434,12 @@ export class QueryQueue { return this.reconcilePromise; } + async shutdown() { + if (this.reconcilePromise) { + await this.reconcilePromise; + } + } + /** * Returns a full list of queued queries, including stalled, orphaned, active * and planned to be processed with their statuses and queries definitions. @@ -429,17 +447,17 @@ export class QueryQueue { * @returns {Promise} */ async getQueries() { - const redisClient = await this.queueDriver.createConnection(); + const queueConnection = await this.queueDriver.createConnection(); try { const [stalledQueries, orphanedQueries, activeQueries, toProcessQueries] = await Promise.all([ - redisClient.getStalledQueries(), - redisClient.getOrphanedQueries(), - redisClient.getActiveQueries(), - redisClient.getToProcessQueries() + queueConnection.getStalledQueries(), + queueConnection.getOrphanedQueries(), + queueConnection.getActiveQueries(), + queueConnection.getToProcessQueries() ]); const mapWithDefinition = (arr) => Promise.all(arr.map(async queryKey => ({ - ...(await redisClient.getQueryDef(queryKey)), + ...(await queueConnection.getQueryDef(queryKey)), queryKey }))); @@ -468,7 +486,7 @@ export class QueryQueue { return obj; }, {})); } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } @@ -479,9 +497,9 @@ export class QueryQueue { * @returns {void} */ async cancelQuery(queryKey) { - const redisClient = await this.queueDriver.createConnection(); + const queueConnection = await this.queueDriver.createConnection(); try { - const query = await redisClient.cancelQuery(queryKey); + const query = await queueConnection.cancelQuery(queryKey); if (query) { this.logger('Cancelling query manual', { @@ -499,7 +517,7 @@ export class QueryQueue { return true; } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } @@ -511,16 +529,11 @@ export class QueryQueue { * @returns {Promise} */ async reconcileQueueImpl() { - const redisClient = await this.queueDriver.createConnection(); + const queueConnection = await this.queueDriver.createConnection(); try { - const toCancel = /** @type {Array} */( - await redisClient.getStalledQueries() - ).concat( - await redisClient.getOrphanedQueries() - ); - + const toCancel = await queueConnection.getQueriesToCancel(); await Promise.all(toCancel.map(async queryKey => { - const [query] = await redisClient.getQueryAndRemove(queryKey); + const [query] = await queueConnection.getQueryAndRemove(queryKey); if (query) { this.logger('Removing orphaned query', { queryKey: query.queryKey, @@ -536,8 +549,11 @@ export class QueryQueue { } })); - const active = await redisClient.getActiveQueries(); - const toProcess = await redisClient.getToProcessQueries(); + const [active, toProcess] = await Promise.all([ + queueConnection.getActiveQueries(), + queueConnection.getToProcessQueries() + ]); + await Promise.all( R.pipe( R.filter(p => { @@ -562,7 +578,7 @@ export class QueryQueue { )(toProcess) ); } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } @@ -602,11 +618,11 @@ export class QueryQueue { * @returns {Array} */ async fetchQueryStageState() { - const redisClient = await this.queueDriver.createConnection(); + const queueConnection = await this.queueDriver.createConnection(); try { - return redisClient.getQueryStageState(); + return queueConnection.getQueryStageState(false); } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } @@ -716,7 +732,8 @@ export class QueryQueue { * @return {Promise<{ result: undefined | Object, error: string | undefined }>} */ async processQuery(queryKey) { - const redisClient = await this.queueDriver.createConnection(); + const queueConnection = await this.queueDriver.createConnection(); + let insertedCount; let _removedCount; let activeKeys; @@ -724,16 +741,18 @@ export class QueryQueue { let query; let processingLockAcquired; try { - const processingId = await redisClient.getNextProcessingId(); - const retrieveResult = await redisClient.retrieveForProcessing(queryKey, processingId); + const processingId = await queueConnection.getNextProcessingId(); + const retrieveResult = await queueConnection.retrieveForProcessing(queryKey, processingId); + if (retrieveResult) { [insertedCount, _removedCount, activeKeys, queueSize, query, processingLockAcquired] = retrieveResult; } const activated = activeKeys && activeKeys.indexOf(this.redisHash(queryKey)) !== -1; if (!query) { - query = await redisClient.getQueryDef(this.redisHash(queryKey)); + query = await queueConnection.getQueryDef(this.redisHash(queryKey)); } + if (query && insertedCount && activated && processingLockAcquired) { let executionResult; const startQueryTime = (new Date()).getTime(); @@ -751,10 +770,10 @@ export class QueryQueue { preAggregation: query.query?.preAggregation, addedToQueueTime: query.addedToQueueTime, }); - await redisClient.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId); + await queueConnection.optimisticQueryUpdate(queryKey, { startQueryTime }, processingId); const heartBeatTimer = setInterval( - () => redisClient.updateHeartBeat(queryKey), + () => queueConnection.updateHeartBeat(queryKey), this.heartBeatInterval * 1000 ); try { @@ -772,7 +791,7 @@ export class QueryQueue { query.query, async (cancelHandler) => { try { - return redisClient.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId); + return queueConnection.optimisticQueryUpdate(queryKey, { cancelHandler }, processingId); } catch (e) { this.logger('Error while query update', { queryKey: query.queryKey, @@ -793,6 +812,7 @@ export class QueryQueue { }; break; } + this.logger('Performing query completed', { processingId, queueSize, @@ -827,7 +847,7 @@ export class QueryQueue { error: (e.stack || e).toString() }); if (e instanceof TimeoutError) { - const queryWithCancelHandle = await redisClient.getQueryDef(queryKey); + const queryWithCancelHandle = await queueConnection.getQueryDef(queryKey); if (queryWithCancelHandle) { this.logger('Cancelling query due to timeout', { processingId, @@ -847,7 +867,7 @@ export class QueryQueue { clearInterval(heartBeatTimer); - if (!(await redisClient.setResultAndRemoveQuery(queryKey, executionResult, processingId))) { + if (!(await queueConnection.setResultAndRemoveQuery(queryKey, executionResult, processingId))) { this.logger('Orphaned execution result', { processingId, warn: 'Result for query was not set due to processing lock wasn\'t acquired', @@ -876,7 +896,7 @@ export class QueryQueue { activated, queryExists: !!query }); - const currentProcessingId = await redisClient.freeProcessingLock(queryKey, processingId, activated); + const currentProcessingId = await queueConnection.freeProcessingLock(queryKey, processingId, activated); if (currentProcessingId) { this.logger('Skipping free processing lock', { processingId, @@ -901,7 +921,7 @@ export class QueryQueue { queuePrefix: this.redisQueuePrefix }); } finally { - this.queueDriver.release(redisClient); + this.queueDriver.release(queueConnection); } } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js index 07e98cc4633bc..45ceb164a7a7c 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js @@ -1,6 +1,11 @@ import R from 'ramda'; +import { QueueDriverInterface, QueueDriverConnectionInterface } from '@cubejs-backend/base-driver'; + import { BaseQueueDriver } from './BaseQueueDriver'; +/** + * @implements {QueueDriverConnectionInterface} + */ export class RedisQueueDriverConnection { constructor(driver, options) { this.driver = driver; @@ -31,6 +36,22 @@ export class RedisQueueDriverConnection { return result && JSON.parse(result); } + async getQueriesToCancel() { + const [stalled, orphaned] = await Promise.all([ + this.getStalledQueries(), + this.getOrphanedQueries(), + ]); + + return stalled.concat(orphaned); + } + + async getActiveAndToProcess() { + return Promise.all([ + this.getActiveQueries(), + this.getToProcessQueries() + ]); + } + /** * Adds specified by the queryKey query to the queue, returns tuple * with the operation result. @@ -135,7 +156,7 @@ export class RedisQueueDriverConnection { .zrem([this.recentRedisKey(), this.redisHash(queryKey)]) .hdel([this.queriesDefKey(), this.redisHash(queryKey)]) .del(this.queryProcessingLockKey(queryKey)); - + if (this.getQueueEventsBus) { tx.publish( this.getQueueEventsBus().eventsChannel, @@ -337,6 +358,9 @@ export class RedisQueueDriverConnection { } } +/** + * @implements {QueueDriverInterface} + */ export class RedisQueueDriver extends BaseQueueDriver { constructor(options) { super(); diff --git a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts index 8ef03049ab5f1..951af29721efc 100644 --- a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts +++ b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryCacheCubestore.test.ts @@ -2,10 +2,22 @@ import { CubeStoreDevDriver, CubeStoreDriver, CubeStoreHandler } from '@cubejs-b import { QueryCacheTest } from '../../unit/QueryCache.abstract'; let beforeAll; -let afterAll; -let cubeStoreDriverFactory = async () => new CubeStoreDriver({}); +let cubeStoreDriver; +let afterAll = async () => { + if (cubeStoreDriver) { + await cubeStoreDriver.release(); + } +}; +let cubeStoreDriverFactory = async () => { + if (cubeStoreDriver) { + return cubeStoreDriver; + } + + // eslint-disable-next-line no-return-assign + return cubeStoreDriver = new CubeStoreDriver({}); +}; -if ((process.env.CUBEJS_TESTING_CUBESTORE_AUTO_PROVISIONING || 'true') === 'true') { +if ((process.env.CUBEJS_TESTING_CUBESTORE_AUTO_PROVISIONING || 'false') === 'true') { const cubeStoreHandler = new CubeStoreHandler({ stdout: (data) => { console.log(data.toString().trim()); diff --git a/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryQueueCubestore.test.ts b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryQueueCubestore.test.ts new file mode 100644 index 0000000000000..957ccbe8a8451 --- /dev/null +++ b/packages/cubejs-query-orchestrator/test/integration/cubestore/QueryQueueCubestore.test.ts @@ -0,0 +1,28 @@ +import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; +import { QueryQueueTest } from '../../unit/QueryQueue.abstract'; + +let beforeAll; +let cubeStoreDriver; +const afterAll = async () => { + if (cubeStoreDriver) { + await cubeStoreDriver.release(); + } +}; +const cubeStoreDriverFactory = async () => { + if (cubeStoreDriver) { + return cubeStoreDriver; + } + + // eslint-disable-next-line no-return-assign + return cubeStoreDriver = new CubeStoreDriver({}); +}; + +QueryQueueTest( + 'CubeStore Queue Driver', + { + cacheAndQueueDriver: 'cubestore', + cubeStoreDriverFactory, + beforeAll, + afterAll + } +); diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts index 3e7af2c955103..7d79adcebc049 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryCache.abstract.ts @@ -56,7 +56,7 @@ export const QueryCacheTest = (name: string, options?: QueryCacheTestOptions) => doLock(1000) ]; - await pausePromise(25); + await pausePromise(50); locks.push(doLock(1000)); locks.push(doLock(1000)); diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 4b28f444835e2..c034c3ff8a6f4 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -1,8 +1,17 @@ /* globals describe, test, expect, afterAll */ -import { QueryQueue } from '../../src/orchestrator/QueryQueue'; +import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; +import { QueryQueue } from '../../src'; import { processUidRE } from '../../src/orchestrator/utils'; -export const QueryQueueTest = (name: string, options?: any) => { +export type QueryQueueTestOptions = { + cacheAndQueueDriver?: string, + redisPool?: any, + cubeStoreDriverFactory?: () => Promise, + beforeAll?: () => Promise, + afterAll?: () => Promise, +}; + +export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}) => { describe(`QueryQueue${name}`, () => { let delayCount = 0; const delayFn = (result, delay) => new Promise(resolve => setTimeout(() => resolve(result), delay)); @@ -31,16 +40,30 @@ export const QueryQueueTest = (name: string, options?: any) => { }); afterAll(async () => { - await options.redisPool.cleanup(); + await queue.shutdown(); + + if (options?.afterAll) { + await options?.afterAll(); + } + + await options?.redisPool.cleanup(); }); + if (options?.beforeAll) { + beforeAll(async () => { + await options.beforeAll(); + }); + } + test('gutter', async () => { const query = ['select * from']; const result = await queue.executeInQueue('foo', query, query); expect(result).toBe('select * from bar'); }); - test('instant double wait resolve', async () => { + const nonCubestoreTest = options.cacheAndQueueDriver !== 'cubestore' ? test : xtest; + + nonCubestoreTest('instant double wait resolve', async () => { const results = await Promise.all([ queue.executeInQueue('delay', 'instant', { delay: 400, result: '2' }), queue.executeInQueue('delay', 'instant', { delay: 400, result: '2' }) @@ -48,7 +71,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(results).toStrictEqual(['20', '20']); }); - test('priority', async () => { + nonCubestoreTest('priority', async () => { delayCount = 0; const result = await Promise.all([ queue.executeInQueue('delay', '11', { delay: 600, result: '1' }, 1), @@ -58,7 +81,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(parseInt(result.find(f => f[0] === '3'), 10) % 10).toBeLessThan(2); }); - test('timeout', async () => { + nonCubestoreTest('timeout', async () => { delayCount = 0; const query = ['select * from 2']; let errorString = ''; @@ -78,7 +101,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(errorString).toEqual(expect.stringContaining('timeout')); }); - test('stage reporting', async () => { + nonCubestoreTest('stage reporting', async () => { delayCount = 0; const resultPromise = queue.executeInQueue('delay', '1', { delay: 200, result: '1' }, 0, { stageQueryKey: '1' }); await delayFn(null, 50); @@ -87,7 +110,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(await queue.getQueryStage('1')).toEqual(undefined); }); - test('priority stage reporting', async () => { + nonCubestoreTest('priority stage reporting', async () => { delayCount = 0; const resultPromise = queue.executeInQueue('delay', '31', { delay: 200, result: '1' }, 20, { stageQueryKey: '12' }); await delayFn(null, 50); @@ -99,7 +122,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(await queue.getQueryStage('12')).toEqual(undefined); }); - test('negative priority', async () => { + nonCubestoreTest('negative priority', async () => { delayCount = 0; const results = []; @@ -116,7 +139,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(results.map(r => parseInt(r[0], 10) - parseInt(results[0][0], 10))).toEqual([0, 1, 2]); }); - test('orphaned', async () => { + nonCubestoreTest('orphaned', async () => { for (let i = 1; i <= 4; i++) { await queue.executeInQueue('delay', `11${i}`, { delay: 50, result: `${i}` }, 0); } @@ -137,7 +160,7 @@ export const QueryQueueTest = (name: string, options?: any) => { await queue.executeInQueue('delay', '114', { delay: 50, result: '4' }, 0); }); - test('queue hash process persistent flag properly', () => { + nonCubestoreTest('queue hash process persistent flag properly', () => { const query = ['select * from table']; const key1 = queue.redisHash(query); // @ts-ignore @@ -156,7 +179,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(queue.redisHash('string')).toBe('string'); }); - test('removed before reconciled', async () => { + nonCubestoreTest('removed before reconciled', async () => { const query = ['select * from']; const key = queue.redisHash(query); await queue.processQuery(key); @@ -164,7 +187,7 @@ export const QueryQueueTest = (name: string, options?: any) => { expect(result).toBe('select * from bar'); }); - test('queue driver lock obtain race condition', async () => { + nonCubestoreTest('queue driver lock obtain race condition', async () => { const redisClient: any = await queue.queueDriver.createConnection(); const redisClient2: any = await queue.queueDriver.createConnection(); const priority = 10; @@ -219,7 +242,7 @@ export const QueryQueueTest = (name: string, options?: any) => { await queue.queueDriver.release(redisClient2); }); - test('activated but lock is not acquired', async () => { + nonCubestoreTest('activated but lock is not acquired', async () => { const redisClient = await queue.queueDriver.createConnection(); const redisClient2 = await queue.queueDriver.createConnection(); const priority = 10; @@ -229,11 +252,11 @@ export const QueryQueueTest = (name: string, options?: any) => { await queue.reconcileQueue(); await redisClient.addToQueue( - keyScore, 'activated1', time, 'handler', ['select'], priority, { stageQueryKey: 'race' } + keyScore, 'activated1', time, 'handler', ['select'], priority, { stageQueryKey: 'race', requestId: '1' } ); await redisClient.addToQueue( - keyScore + 100, 'activated2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2' } + keyScore + 100, 'activated2', time + 100, 'handler2', ['select2'], priority, { stageQueryKey: 'race2', requestId: '1' } ); const processingId1 = await redisClient.getNextProcessingId(); diff --git a/packages/cubejs-server-core/src/core/OrchestratorApi.ts b/packages/cubejs-server-core/src/core/OrchestratorApi.ts index 8d6cc6b635114..49c21a91112d0 100644 --- a/packages/cubejs-server-core/src/core/OrchestratorApi.ts +++ b/packages/cubejs-server-core/src/core/OrchestratorApi.ts @@ -91,7 +91,7 @@ export class OrchestratorApi { const job = await fetchQueryPromise; return job; } - + fetchQueryPromise = pt.timeout(fetchQueryPromise, this.continueWaitTimeout * 1000); const data = await fetchQueryPromise; @@ -239,7 +239,7 @@ export class OrchestratorApi { dataSource = 'default', schema: string, table: string, - key: any[], + key: string, token: string, ): Promise<[boolean, string]> { return this.orchestrator.isPartitionExist( diff --git a/yarn.lock b/yarn.lock index 933f1ff80e09f..7fa1a6d703c08 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6601,13 +6601,20 @@ dependencies: moment-range "*" -"@types/mysql@^2.15.15", "@types/mysql@^2.15.17", "@types/mysql@^2.15.19": +"@types/mysql@^2.15.15", "@types/mysql@^2.15.19": version "2.15.19" resolved "https://registry.yarnpkg.com/@types/mysql/-/mysql-2.15.19.tgz#d158927bb7c1a78f77e56de861a3b15cae0e7aed" integrity sha512-wSRg2QZv14CWcZXkgdvHbbV2ACufNy5EgI8mBBxnJIptchv7DBy/h53VMa2jDhyo0C9MO4iowE6z9vF8Ja1DkQ== dependencies: "@types/node" "*" +"@types/mysql@^2.15.21": + version "2.15.21" + resolved "https://registry.yarnpkg.com/@types/mysql/-/mysql-2.15.21.tgz#7516cba7f9d077f980100c85fd500c8210bd5e45" + integrity sha512-NPotx5CVful7yB+qZbWtXL2fA4e7aEHkihHLjklc6ID8aq7bhguHgeIoC1EmSNTAuCgI6ZXrjt2ZSaXnYX0EUg== + dependencies: + "@types/node" "*" + "@types/node-fetch@^2.5.0", "@types/node-fetch@^2.5.7", "@types/node-fetch@^2.5.8": version "2.5.12" resolved "https://registry.yarnpkg.com/@types/node-fetch/-/node-fetch-2.5.12.tgz#8a6f779b1d4e60b7a57fb6fd48d84fb545b9cc66" @@ -19604,7 +19611,7 @@ mysql2@^2.2.5, mysql2@^2.3.3: seq-queue "^0.0.5" sqlstring "^2.3.2" -mysql@^2.16.0, mysql@^2.18.1: +mysql@^2.18.1: version "2.18.1" resolved "https://registry.yarnpkg.com/mysql/-/mysql-2.18.1.tgz#2254143855c5a8c73825e4522baf2ea021766717" integrity sha512-Bca+gk2YWmqp2Uf6k5NFEurwY/0td0cpebAucFpY/3jhrwrVGuxU2uQFCHjU19SJfje0yQvi+rVWdq78hR5lig== @@ -25527,12 +25534,12 @@ sqlite3@^5.0.11: sqlstring@2.3.1: version "2.3.1" resolved "https://registry.yarnpkg.com/sqlstring/-/sqlstring-2.3.1.tgz#475393ff9e91479aea62dcaf0ca3d14983a7fb40" - integrity sha1-R1OT/56RR5rqYtyvDKPRSYOn+0A= + integrity sha512-ooAzh/7dxIG5+uDik1z/Rd1vli0+38izZhGzSa34FwR7IbelPWCCKSNIl8jlL/F7ERvy8CB2jNeM1E9i9mXMAQ== -sqlstring@^2.3.0, sqlstring@^2.3.1, sqlstring@^2.3.2: - version "2.3.2" - resolved "https://registry.yarnpkg.com/sqlstring/-/sqlstring-2.3.2.tgz#cdae7169389a1375b18e885f2e60b3e460809514" - integrity sha512-vF4ZbYdKS8OnoJAWBmMxCQDkiEBkGQYU7UZPtL8flbDRSNkhaXvRJ279ZtI6M+zDaQovVU4tuRgzK5fVhvFAhg== +sqlstring@^2.3.0, sqlstring@^2.3.1, sqlstring@^2.3.2, sqlstring@^2.3.3: + version "2.3.3" + resolved "https://registry.yarnpkg.com/sqlstring/-/sqlstring-2.3.3.tgz#2ddc21f03bce2c387ed60680e739922c65751d0c" + integrity sha512-qC9iz2FlN7DQl3+wjwn3802RTyjCx7sDvfQEXchwa6CWOx07/WVfh91gBmQ9fahw8snwGEWU3xGzOt4tFyHLxg== ssh-remote-port-forward@^1.0.4: version "1.0.4"