Skip to content

Commit

Permalink
feat(query-orchestrator): Introduce CubeStoreQueueDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Jan 13, 2023
1 parent 929ffb6 commit 1f5a543
Show file tree
Hide file tree
Showing 20 changed files with 713 additions and 131 deletions.
17 changes: 14 additions & 3 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ jobs:
CUBEJS_REDIS_SENTINEL: "redis+sentinel://localhost:5000,localhost:5001,localhost:5002/mymaster/0"

integration-cubestore:
needs: [unit, lint, latest-tag-sha]
needs: [latest-tag-sha]
runs-on: ubuntu-20.04
timeout-minutes: 60
if: (needs['latest-tag-sha'].outputs.sha != github.sha)
Expand All @@ -316,7 +316,7 @@ jobs:
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2022-03-08
toolchain: nightly-2022-06-22
override: true
components: rustfmt
- name: Install Node.js ${{ matrix.node-version }}
Expand Down Expand Up @@ -351,6 +351,17 @@ jobs:
command: yarn install --frozen-lockfile
- name: Lerna tsc
run: yarn tsc
- uses: Swatinem/rust-cache@v1
with:
working-directory: ./rust/cubestore
key: ubuntu-20.04
- name: Build cubestore
uses: actions-rs/cargo@v1
with:
command: build
args: --manifest-path rust/cubestore/Cargo.toml -j 4
- name: Run Cube Store in background
run: RUNNER_TRACKING_ID="" && ./rust/cubestore/target/debug/cubestored &
- name: Run Cubestore Integration
timeout-minutes: 10
run: |
Expand All @@ -367,7 +378,7 @@ jobs:
node-version: [14.x]
db: [
'clickhouse', 'druid', 'elasticsearch', 'mssql', 'mysql', 'postgres', 'prestodb',
'mysql-aurora-serverless', 'cubestore', 'crate'
'mysql-aurora-serverless', 'crate'
]
fail-fast: false

Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
"@typescript-eslint/eslint-plugin": "^4.17.0",
"core-js": "^3.6.5",
"lerna": "^4.0.0",
"patch-package": "^6.4.7",
"ramda": "^0.27.0",
"rollup-plugin-dts": "^1.1.8",
"patch-package": "^6.4.7",
"whatwg-fetch": "^3.0.0"
},
"files": [
Expand Down Expand Up @@ -76,5 +76,6 @@
"@types/ramda": "0.27.40",
"rc-tree": "4.1.5"
},
"license": "MIT"
"license": "MIT",
"packageManager": "yarn@1.22.19"
}
66 changes: 52 additions & 14 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,61 @@
export type QueryDef = unknown;
export type QueryKey = string | [string, any[]];

export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
export type RetrieveForProcessingResponse = [added: any, removed: any, active: string[], toProcess: any, def: QueryDef, lockAquired: boolean] | null;

export interface AddToQueueQuery {
isJob: boolean,
orphanedTimeout: unknown
}

export interface AddToQueueOptions {
stageQueryKey: string,
requestId: string
}

export interface QueueDriverOptions {
redisQueuePrefix: string,
concurrency: number,
continueWaitTimeout: number,
orphanedTimeout: number,
heartBeatTimeout: number,
getQueueEventsBus?: any,
}

export interface QueueDriverConnectionInterface {
redisHash(queryKey: QueryKey): string;
getResultBlocking(queryKey: string): Promise<unknown>;
getResult(queryKey: string): Promise<unknown>;
addToQueue(queryKey: string): Promise<unknown>;
getToProcessQueries(): Promise<unknown>;
getActiveQueries(): Promise<unknown>;
getOrphanedQueries(): Promise<unknown>;
getStalledQueries(): Promise<unknown>;
getQueryStageState(onlyKeys: any): Promise<unknown>;
getResult(queryKey: string): Promise<any>;
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
// Return query keys which was sorted by priority and time
getToProcessQueries(): Promise<string[]>;
getActiveQueries(): Promise<string[]>;
getQueryDef(queryKey: string): Promise<QueryDef | null>;
// Queries which was added to queue, but was not processed and not needed
getOrphanedQueries(): Promise<string[]>;
// Queries which was not completed with old heartbeat
getStalledQueries(): Promise<string[]>;
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
updateHeartBeat(queryKey: string): Promise<void>;
getNextProcessingId(): Promise<string>;
retrieveForProcessing(queryKey: string, processingId: string): Promise<unknown>;
freeProcessingLock(queryKe: string, processingId: string, activated: unknown): Promise<unknown>;
optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise<unknown>;
cancelQuery(queryKey: string): Promise<unknown>;
getNextProcessingId(): Promise<string | number>;
// Trying to acquire a lock for processing a queue item, this method can return null when
// multiple nodes tries to process the same query
retrieveForProcessing(queryKey: string, processingId: number | string): Promise<RetrieveForProcessingResponse>;
freeProcessingLock(queryKe: string, processingId: string | number, activated: unknown): Promise<void>;
optimisticQueryUpdate(queryKey, toUpdate, processingId): Promise<boolean>;
cancelQuery(queryKey: string): Promise<[QueryDef]>;
getQueryAndRemove(queryKey: string): Promise<[QueryDef]>;
setResultAndRemoveQuery(queryKey: string, executionResult: any, processingId: any): Promise<unknown>;
release(): Promise<void>;
release(): void;
//
getQueriesToCancel(): Promise<string[]>
getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]>;
}

export interface QueueDriverInterface {
redisHash(queryKey: QueryKey): string;
createConnection(): Promise<QueueDriverConnectionInterface>;
release(connection: QueueDriverConnectionInterface): Promise<void>;
release(connection: QueueDriverConnectionInterface): void;
}
4 changes: 1 addition & 3 deletions packages/cubejs-cubestore-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
"fs-extra": "^9.1.0",
"generic-pool": "^3.6.0",
"moment-timezone": "^0.5.31",
"mysql": "^2.16.0",
"node-fetch": "^2.6.1",
"sqlstring": "^2.3.2",
"sqlstring": "^2.3.3",
"tempy": "^1.0.1",
"uuid": "^8.3.2",
"ws": "^7.4.3"
Expand All @@ -45,7 +44,6 @@
"@cubejs-backend/linter": "^0.31.0",
"@types/flatbuffers": "^1.10.0",
"@types/generic-pool": "^3.1.9",
"@types/mysql": "^2.15.17",
"@types/ws": "^7.4.0",
"jest": "^26.6.3",
"typescript": "~4.1.5"
Expand Down
Loading

0 comments on commit 1f5a543

Please sign in to comment.