From 0e1f2ad1d4aaf4095b3fa768319a1fcf3888dc04 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 30 Jan 2023 15:40:17 +0300 Subject: [PATCH 1/6] fix(cubestore-driver): Correct cancel handling --- .../src/queue-driver.interface.ts | 11 ++++--- .../src/CubeStoreQueueDriver.ts | 23 ++++++++------- .../src/orchestrator/BaseQueueDriver.ts | 9 ++++-- .../src/orchestrator/LocalQueueDriver.js | 3 +- .../src/orchestrator/QueryCache.ts | 2 +- .../src/orchestrator/QueryQueue.js | 11 ++++--- .../src/orchestrator/utils.ts | 29 ++++++++++--------- .../test/unit/QueryQueue.abstract.ts | 15 +++++++--- .../src/cachestore/cache_rocksstore.rs | 12 ++++++-- 9 files changed, 73 insertions(+), 42 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index b64d7f84e199c..a2628726f52bb 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -2,10 +2,13 @@ export type QueryDef = unknown; export type QueryKey = (string | [string, any[]]) & { persistent?: true, }; +export interface QueryKeyHash extends String { + __type: 'QueryKeyHash' +} export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number]; export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record]; -export type RetrieveForProcessingResponse = [added: any, removed: any, active: string[], toProcess: any, def: QueryDef, lockAquired: boolean] | null; +export type RetrieveForProcessingResponse = [added: any, removed: any, active: QueryKeyHash[], toProcess: any, def: QueryDef, lockAquired: boolean] | null; export interface AddToQueueQuery { isJob: boolean, @@ -27,14 +30,14 @@ export interface QueueDriverOptions { } export interface QueueDriverConnectionInterface { - redisHash(queryKey: QueryKey): string; + redisHash(queryKey: QueryKey): QueryKeyHash; getResultBlocking(queryKey: QueryKey): Promise; getResult(queryKey: QueryKey): Promise; addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise; // Return query keys which was sorted by priority and time getToProcessQueries(): Promise; getActiveQueries(): Promise; - getQueryDef(queryKey: QueryKey): Promise; + getQueryDef(queryKey: QueryKeyHash): Promise; // Queries which was added to queue, but was not processed and not needed getOrphanedQueries(): Promise; // Queries which was not completed with old heartbeat @@ -57,7 +60,7 @@ export interface QueueDriverConnectionInterface { } export interface QueueDriverInterface { - redisHash(queryKey: QueryKey): string; + redisHash(queryKey: QueryKey): QueryKeyHash; createConnection(): Promise; release(connection: QueueDriverConnectionInterface): void; } diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 9d77547990696..a755318555df7 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -7,20 +7,23 @@ import { RetrieveForProcessingResponse, QueueDriverOptions, AddToQueueQuery, - AddToQueueOptions, AddToQueueResponse, QueryKey, + AddToQueueOptions, + AddToQueueResponse, + QueryKey, + QueryKeyHash } from '@cubejs-backend/base-driver'; import { getProcessUid } from '@cubejs-backend/shared'; import { CubeStoreDriver } from './CubeStoreDriver'; -function hashQueryKey(queryKey: QueryKey) { +function hashQueryKey(queryKey: QueryKey): QueryKeyHash { const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex'); if (typeof queryKey === 'object' && queryKey.persistent) { - return `${hash}@${getProcessUid()}`; + return `${hash}@${getProcessUid()}` as any; } - return hash; + return hash as any; } class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { @@ -29,7 +32,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { protected readonly options: QueueDriverOptions, ) { } - public redisHash(queryKey: QueryKey): string { + public redisHash(queryKey: QueryKey): QueryKeyHash { return hashQueryKey(queryKey); } @@ -170,7 +173,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { return [active, toProcess, defs]; } - public async getResult(queryKey: string): Promise { + public async getResult(queryKey: QueryKey): Promise { const rows = await this.driver.query('QUEUE RESULT ?', [ this.prefixKey(this.redisHash(queryKey)), ]); @@ -220,9 +223,9 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { return payload; } - public async getQueryDef(queryKey: string): Promise { + public async getQueryDef(queryKey: QueryKeyHash): Promise { const rows = await this.driver.query('QUEUE GET ?', [ - this.prefixKey(this.redisHash(queryKey)) + this.prefixKey(queryKey) ]); if (rows && rows.length) { return this.decodeQueryDefFromRow(rows[0], 'getQueryDef'); @@ -244,7 +247,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { // nothing to release } - public async retrieveForProcessing(queryKeyHashed: string, _processingId: string): Promise { + public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise { const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [ this.options.concurrency, this.prefixKey(queryKeyHashed), @@ -300,7 +303,7 @@ export class CubeStoreQueueDriver implements QueueDriverInterface { protected connection: CubeStoreDriver | null = null; - public redisHash(queryKey: QueryKey) { + public redisHash(queryKey: QueryKey): QueryKeyHash { return hashQueryKey(queryKey); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts index 306fce0a3ce47..bc7cd5ea4945b 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/BaseQueueDriver.ts @@ -1,8 +1,13 @@ -import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver'; +import { + QueryKey, + QueryKeyHash, + QueueDriverConnectionInterface, + QueueDriverInterface, +} from '@cubejs-backend/base-driver'; import { getCacheHash } from './utils'; export abstract class BaseQueueDriver implements QueueDriverInterface { - public redisHash(queryKey: string) { + public redisHash(queryKey: QueryKey): QueryKeyHash { return getCacheHash(queryKey); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js index 939bed6574e95..643e3b548c3b7 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/LocalQueueDriver.js @@ -78,6 +78,7 @@ export class LocalQueueDriverConnection { if (this.resultPromises[resultListKey] && this.resultPromises[resultListKey].resolved) { return this.getResultBlocking(queryKey); } + return null; } @@ -221,7 +222,7 @@ export class LocalQueueDriverConnection { } async getQueryDef(queryKey) { - return this.queryDef[this.redisHash(queryKey)]; + return this.queryDef[queryKey]; } /** diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index f0c07908b0264..4bddccc474f5e 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -960,7 +960,7 @@ export class QueryCache { } public queryRedisKey(cacheKey): string { - return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey)); + return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey) as any); } public async cleanup() { diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 78ae3e1bee2fb..8c067f04c931f 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -318,6 +318,7 @@ export class QueryQueue { if (priority == null) { priority = 0; } + if (!(priority >= -10000 && priority <= 10000)) { throw new Error('Priority should be between -10000 and 10000'); } @@ -330,8 +331,10 @@ export class QueryQueue { return this.parseResult(result); } + const queryKeyHash = this.redisHash(queryKey); + if (query.forceBuild) { - const jobExists = await queueConnection.getQueryDef(queryKey); + const jobExists = await queueConnection.getQueryDef(queryKeyHash); if (jobExists) return null; } @@ -363,7 +366,7 @@ export class QueryQueue { await this.reconcileQueue(); - const queryDef = await queueConnection.getQueryDef(queryKey); + const queryDef = await queueConnection.getQueryDef(queryKeyHash); const [active, toProcess] = await queueConnection.getQueryStageState(true); if (queryDef) { @@ -374,8 +377,8 @@ export class QueryQueue { requestId: options.requestId, activeQueryKeys: active, toProcessQueryKeys: toProcess, - active: active.indexOf(this.redisHash(queryKey)) !== -1, - queueIndex: toProcess.indexOf(this.redisHash(queryKey)), + active: active.indexOf(queryKeyHash) !== -1, + queueIndex: toProcess.indexOf(queryKeyHash), waitingForRequestId: queryDef.requestId }); } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts index b98119949a914..15c580238726d 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/utils.ts @@ -3,6 +3,7 @@ import * as querystring from 'querystring'; import crypto from 'crypto'; import { getProcessUid } from '@cubejs-backend/shared'; +import { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver'; function parseHostPort(addr: string): { host: string, port: number } { if (addr.includes(':')) { @@ -187,17 +188,19 @@ export const processUidRE = /^[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}- /** * Returns query hash by specified `queryKey`. */ -export function getCacheHash(queryKey) { - return typeof queryKey === 'string' && queryKey.length < 256 - ? queryKey - : `${ - crypto - .createHash('md5') - .update(JSON.stringify(queryKey)) - .digest('hex') - }${ - typeof queryKey === 'object' && queryKey.persistent - ? `@${getProcessUid()}` - : '' - }`; +export function getCacheHash(queryKey: QueryKey): QueryKeyHash { + if (typeof queryKey === 'string' && queryKey.length < 256) { + return queryKey as any; + } + + const hash = crypto + .createHash('md5') + .update(JSON.stringify(queryKey)) + .digest('hex'); + + if (typeof queryKey === 'object' && queryKey.persistent) { + return `${hash}@${getProcessUid()}` as any; + } + + return hash as any; } diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 8861037093036..3e65367f3e58d 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -133,7 +133,14 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} break; } } + await awaitProcessing(); + expect(errorString).toEqual(expect.stringContaining('timeout')); + + expect(logger.mock.calls.length).toEqual(6); + // assert that query queue is able to get query def by query key + expect(logger.mock.calls[5][0]).toEqual('Cancelling query due to timeout'); + expect(logger.mock.calls[4][0]).toEqual('Error while querying'); }); test('stage reporting', async () => { @@ -321,12 +328,12 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} console.log(retrieve1); const retrieve2 = await redisClient2.retrieveForProcessing('activated2', processingId2); console.log(retrieve2); - console.log(await redisClient.freeProcessingLock('activated1', processingId1, retrieve1 && retrieve1[2].indexOf('activated1') !== -1)); + console.log(await redisClient.freeProcessingLock('activated1', processingId1, retrieve1 && retrieve1[2].indexOf('activated1' as any) !== -1)); const retrieve3 = await redisClient.retrieveForProcessing('activated2', processingId3); console.log(retrieve3); - console.log(await redisClient.freeProcessingLock('activated2', processingId3, retrieve3 && retrieve3[2].indexOf('activated2') !== -1)); - console.log(retrieve2[2].indexOf('activated2') !== -1); - console.log(await redisClient2.freeProcessingLock('activated2', processingId2, retrieve2 && retrieve2[2].indexOf('activated2') !== -1)); + console.log(await redisClient.freeProcessingLock('activated2', processingId3, retrieve3 && retrieve3[2].indexOf('activated2' as any) !== -1)); + console.log(retrieve2[2].indexOf('activated2' as any) !== -1); + console.log(await redisClient2.freeProcessingLock('activated2', processingId2, retrieve2 && retrieve2[2].indexOf('activated2' as any) !== -1)); const retrieve4 = await redisClient.retrieveForProcessing('activated2', await redisClient.getNextProcessingId()); console.log(retrieve4); diff --git a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs index df7f4be241b4b..f739fd391a236 100644 --- a/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs +++ b/rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs @@ -520,10 +520,16 @@ impl CacheStore for RocksCacheStore { async fn queue_truncate(&self) -> Result<(), CubeError> { self.store .write_operation(move |db_ref, batch_pipe| { - let queue_schema = QueueItemRocksTable::new(db_ref); - let rows = queue_schema.all_rows()?; + let queue_item_schema = QueueItemRocksTable::new(db_ref.clone()); + let rows = queue_item_schema.all_rows()?; for row in rows.iter() { - queue_schema.delete(row.get_id(), batch_pipe)?; + queue_item_schema.delete(row.get_id(), batch_pipe)?; + } + + let queue_result_schema = QueueResultRocksTable::new(db_ref); + let rows = queue_result_schema.all_rows()?; + for row in rows.iter() { + queue_result_schema.delete(row.get_id(), batch_pipe)?; } Ok(()) From 31ca047374489f808486ebed5f19c6636cca9f30 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 30 Jan 2023 19:15:48 +0300 Subject: [PATCH 2/6] chore: stable tests --- .../test/unit/QueryQueue.abstract.ts | 16 +++++++++++----- yarn.lock | 11 +++++++++++ 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 3e65367f3e58d..ec9bc5220d8db 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -116,7 +116,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} expect(parseInt(result.find(f => f[0] === '3'), 10) % 10).toBeLessThan(2); }); - test('timeout', async () => { + test('timeout - continue wait', async () => { const query = ['select * from 2']; let errorString = ''; @@ -133,14 +133,20 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} break; } } - await awaitProcessing(); expect(errorString).toEqual(expect.stringContaining('timeout')); + }); + + test('timeout', async () => { + const query = ['select * from 3']; + + await queue.executeInQueue('delay', query, { delay: 60 * 60 * 1000, result: '1', isJob: true }); + await awaitProcessing(); - expect(logger.mock.calls.length).toEqual(6); + expect(logger.mock.calls.length).toEqual(5); // assert that query queue is able to get query def by query key - expect(logger.mock.calls[5][0]).toEqual('Cancelling query due to timeout'); - expect(logger.mock.calls[4][0]).toEqual('Error while querying'); + expect(logger.mock.calls[4][0]).toEqual('Cancelling query due to timeout'); + expect(logger.mock.calls[3][0]).toEqual('Error while querying'); }); test('stage reporting', async () => { diff --git a/yarn.lock b/yarn.lock index 2efb702f6af5f..3857b0257f3ab 100644 --- a/yarn.lock +++ b/yarn.lock @@ -23889,6 +23889,17 @@ rc-tree-select@~4.3.0: rc-tree "^4.0.0" rc-util "^5.0.5" +rc-tree@^4.0.0, rc-tree@~4.2.1: + version "4.2.2" + resolved "https://registry.yarnpkg.com/rc-tree/-/rc-tree-4.2.2.tgz#4429187cbbfbecbe989714a607e3de8b3ab7763f" + integrity sha512-V1hkJt092VrOVjNyfj5IYbZKRMHxWihZarvA5hPL/eqm7o2+0SNkeidFYm7LVVBrAKBpOpa0l8xt04uiqOd+6w== + dependencies: + "@babel/runtime" "^7.10.1" + classnames "2.x" + rc-motion "^2.0.1" + rc-util "^5.0.0" + rc-virtual-list "^3.0.1" + rc-trigger@^5.0.0, rc-trigger@^5.0.4, rc-trigger@^5.1.2, rc-trigger@^5.2.10: version "5.2.10" resolved "https://registry.yarnpkg.com/rc-trigger/-/rc-trigger-5.2.10.tgz#8a0057a940b1b9027eaa33beec8a6ecd85cce2b1" From 7e5570a72e5b44115cc47dda73a204e8a668f17b Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 30 Jan 2023 19:37:55 +0300 Subject: [PATCH 3/6] chore: random prefix isolation for redis testing --- .../test/unit/QueryQueue.abstract.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index ec9bc5220d8db..5466d88a76640 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -1,6 +1,8 @@ import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import type { QueryKey } from '@cubejs-backend/base-driver'; import { pausePromise } from '@cubejs-backend/shared'; +import crypto from 'crypto'; + import { QueryQueue } from '../../src'; import { processUidRE } from '../../src/orchestrator/utils'; @@ -23,7 +25,9 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} let processCancelPromises = []; let cancelledQuery; - const queue = new QueryQueue('test_query_queue', { + const tenantPrefix = crypto.randomBytes(6).toString('hex'); + + const queue = new QueryQueue(`${tenantPrefix}#test_query_queue`, { queryHandlers: { foo: async (query) => `${query[0]} bar`, delay: async (query, setCancelHandler) => { From c2cd3a10f95e894937193273b2349044a072a6d0 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 30 Jan 2023 21:16:57 +0300 Subject: [PATCH 4/6] chore: remove useless redis hash --- .../src/orchestrator/RedisQueueDriver.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js b/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js index cc9409d955190..cbbb299741ca4 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/RedisQueueDriver.js @@ -197,7 +197,7 @@ export class RedisQueueDriverConnection { } async getQueryDef(queryKey) { - const query = await this.redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]); + const query = await this.redisClient.hgetAsync([this.queriesDefKey(), queryKey]); return JSON.parse(query); } From dedd960c4d5af6d97edd397344a2178ff7ee6bb0 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Mon, 30 Jan 2023 23:29:22 +0300 Subject: [PATCH 5/6] chore: better types --- .../src/queue-driver.interface.ts | 12 ++++++------ .../src/CubeStoreQueueDriver.ts | 18 +++++++++--------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/packages/cubejs-base-driver/src/queue-driver.interface.ts b/packages/cubejs-base-driver/src/queue-driver.interface.ts index a2628726f52bb..665948dc69333 100644 --- a/packages/cubejs-base-driver/src/queue-driver.interface.ts +++ b/packages/cubejs-base-driver/src/queue-driver.interface.ts @@ -43,16 +43,16 @@ export interface QueueDriverConnectionInterface { // Queries which was not completed with old heartbeat getStalledQueries(): Promise; getQueryStageState(onlyKeys: boolean): Promise; - updateHeartBeat(queryKey: QueryKey): Promise; + updateHeartBeat(hash: QueryKeyHash): Promise; getNextProcessingId(): Promise; // Trying to acquire a lock for processing a queue item, this method can return null when // multiple nodes tries to process the same query - retrieveForProcessing(queryKey: QueryKey, processingId: number | string): Promise; - freeProcessingLock(queryKey: QueryKey, processingId: string | number, activated: unknown): Promise; - optimisticQueryUpdate(queryKey: QueryKey, toUpdate, processingId): Promise; + retrieveForProcessing(hash: QueryKeyHash, processingId: number | string): Promise; + freeProcessingLock(hash: QueryKeyHash, processingId: string | number, activated: unknown): Promise; + optimisticQueryUpdate(hash: QueryKeyHash, toUpdate, processingId): Promise; cancelQuery(queryKey: QueryKey): Promise; - getQueryAndRemove(queryKey: QueryKey): Promise<[QueryDef]>; - setResultAndRemoveQuery(queryKey: QueryKey, executionResult: any, processingId: any): Promise; + getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]>; + setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, processingId: any): Promise; release(): void; // getQueriesToCancel(): Promise diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index a755318555df7..c966792ceb8b3 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -78,13 +78,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { } // TODO: Looks useless, because we can do it in one step - getQueriesToCancel - public async getQueryAndRemove(queryKey: string): Promise<[QueryDef]> { - return [await this.cancelQuery(queryKey)]; + public async getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]> { + return [await this.cancelQuery(hash)]; } - public async cancelQuery(queryKey: string): Promise { + public async cancelQuery(hash: QueryKeyHash): Promise { const rows = await this.driver.query('QUEUE CANCEL ?', [ - this.prefixKey(queryKey) + this.prefixKey(hash) ]); if (rows && rows.length) { return this.decodeQueryDefFromRow(rows[0], 'cancelQuery'); @@ -93,7 +93,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { return null; } - public async freeProcessingLock(_queryKey: string, _processingId: string, _activated: unknown): Promise { + public async freeProcessingLock(_hash: QueryKeyHash, _processingId: string, _activated: unknown): Promise { // nothing to do } @@ -279,18 +279,18 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { return null; } - public async setResultAndRemoveQuery(queryKey: string, executionResult: any, _processingId: any): Promise { + public async setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, _processingId: any): Promise { await this.driver.query('QUEUE ACK ? ? ', [ - this.prefixKey(queryKey), + this.prefixKey(hash), executionResult ? JSON.stringify(executionResult) : executionResult ]); return true; } - public async updateHeartBeat(queryKey: string): Promise { + public async updateHeartBeat(hash: QueryKeyHash): Promise { await this.driver.query('QUEUE HEARTBEAT ?', [ - this.prefixKey(queryKey) + this.prefixKey(hash) ]); } } From c6305291f5fcfd8f8cfafcb3183fdd9b37b6c825 Mon Sep 17 00:00:00 2001 From: Dmitry Patsura Date: Tue, 31 Jan 2023 00:01:12 +0300 Subject: [PATCH 6/6] chore: fix tsc --- .../test/unit/QueryQueue.abstract.ts | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts index 5466d88a76640..ecc04f6850429 100644 --- a/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts +++ b/packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts @@ -1,5 +1,5 @@ import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; -import type { QueryKey } from '@cubejs-backend/base-driver'; +import type { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver'; import { pausePromise } from '@cubejs-backend/shared'; import crypto from 'crypto'; @@ -334,24 +334,24 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {} const processingId2 = await redisClient.getNextProcessingId(); const processingId3 = await redisClient.getNextProcessingId(); - const retrieve1 = await redisClient.retrieveForProcessing('activated1', processingId1); + const retrieve1 = await redisClient.retrieveForProcessing('activated1' as any, processingId1); console.log(retrieve1); - const retrieve2 = await redisClient2.retrieveForProcessing('activated2', processingId2); + const retrieve2 = await redisClient2.retrieveForProcessing('activated2' as any, processingId2); console.log(retrieve2); - console.log(await redisClient.freeProcessingLock('activated1', processingId1, retrieve1 && retrieve1[2].indexOf('activated1' as any) !== -1)); - const retrieve3 = await redisClient.retrieveForProcessing('activated2', processingId3); + console.log(await redisClient.freeProcessingLock('activated1' as any, processingId1, retrieve1 && retrieve1[2].indexOf('activated1' as any) !== -1)); + const retrieve3 = await redisClient.retrieveForProcessing('activated2' as any, processingId3); console.log(retrieve3); - console.log(await redisClient.freeProcessingLock('activated2', processingId3, retrieve3 && retrieve3[2].indexOf('activated2' as any) !== -1)); + console.log(await redisClient.freeProcessingLock('activated2' as any, processingId3, retrieve3 && retrieve3[2].indexOf('activated2' as any) !== -1)); console.log(retrieve2[2].indexOf('activated2' as any) !== -1); - console.log(await redisClient2.freeProcessingLock('activated2', processingId2, retrieve2 && retrieve2[2].indexOf('activated2' as any) !== -1)); + console.log(await redisClient2.freeProcessingLock('activated2' as any, processingId2, retrieve2 && retrieve2[2].indexOf('activated2' as any) !== -1)); - const retrieve4 = await redisClient.retrieveForProcessing('activated2', await redisClient.getNextProcessingId()); + const retrieve4 = await redisClient.retrieveForProcessing('activated2' as any, await redisClient.getNextProcessingId()); console.log(retrieve4); expect(retrieve4[0]).toBe(1); expect(!!retrieve4[5]).toBe(true); - console.log(await redisClient.getQueryAndRemove('activated1')); - console.log(await redisClient.getQueryAndRemove('activated2')); + console.log(await redisClient.getQueryAndRemove('activated1' as any)); + console.log(await redisClient.getQueryAndRemove('activated2' as any)); await queue.queueDriver.release(redisClient); await queue.queueDriver.release(redisClient2);