Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cubestore-driver): Cancel handling #6087

Merged
merged 6 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions packages/cubejs-base-driver/src/queue-driver.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ export type QueryDef = unknown;
export type QueryKey = (string | [string, any[]]) & {
persistent?: true,
};
export interface QueryKeyHash extends String {
__type: 'QueryKeyHash'
}

export type AddToQueueResponse = [added: number, _b: any, _c: any, queueSize: number, addedToQueueTime: number];
export type QueryStageStateResponse = [active: string[], toProcess: string[]] | [active: string[], toProcess: string[], defs: Record<string, QueryDef>];
export type RetrieveForProcessingResponse = [added: any, removed: any, active: string[], toProcess: any, def: QueryDef, lockAquired: boolean] | null;
export type RetrieveForProcessingResponse = [added: any, removed: any, active: QueryKeyHash[], toProcess: any, def: QueryDef, lockAquired: boolean] | null;

export interface AddToQueueQuery {
isJob: boolean,
Expand All @@ -27,37 +30,37 @@ export interface QueueDriverOptions {
}

export interface QueueDriverConnectionInterface {
redisHash(queryKey: QueryKey): string;
redisHash(queryKey: QueryKey): QueryKeyHash;
getResultBlocking(queryKey: QueryKey): Promise<unknown>;
getResult(queryKey: QueryKey): Promise<any>;
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: any, queryHandler: any, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
// Return query keys which was sorted by priority and time
getToProcessQueries(): Promise<string[]>;
getActiveQueries(): Promise<string[]>;
getQueryDef(queryKey: QueryKey): Promise<QueryDef | null>;
getQueryDef(queryKey: QueryKeyHash): Promise<QueryDef | null>;
// Queries which was added to queue, but was not processed and not needed
getOrphanedQueries(): Promise<string[]>;
// Queries which was not completed with old heartbeat
getStalledQueries(): Promise<string[]>;
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
updateHeartBeat(queryKey: QueryKey): Promise<void>;
updateHeartBeat(hash: QueryKeyHash): Promise<void>;
getNextProcessingId(): Promise<string | number>;
// Trying to acquire a lock for processing a queue item, this method can return null when
// multiple nodes tries to process the same query
retrieveForProcessing(queryKey: QueryKey, processingId: number | string): Promise<RetrieveForProcessingResponse>;
freeProcessingLock(queryKey: QueryKey, processingId: string | number, activated: unknown): Promise<void>;
optimisticQueryUpdate(queryKey: QueryKey, toUpdate, processingId): Promise<boolean>;
retrieveForProcessing(hash: QueryKeyHash, processingId: number | string): Promise<RetrieveForProcessingResponse>;
freeProcessingLock(hash: QueryKeyHash, processingId: string | number, activated: unknown): Promise<void>;
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate, processingId): Promise<boolean>;
cancelQuery(queryKey: QueryKey): Promise<QueryDef | null>;
getQueryAndRemove(queryKey: QueryKey): Promise<[QueryDef]>;
setResultAndRemoveQuery(queryKey: QueryKey, executionResult: any, processingId: any): Promise<unknown>;
getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]>;
setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, processingId: any): Promise<unknown>;
release(): void;
//
getQueriesToCancel(): Promise<string[]>
getActiveAndToProcess(): Promise<[active: string[], toProcess: string[]]>;
}

export interface QueueDriverInterface {
redisHash(queryKey: QueryKey): string;
redisHash(queryKey: QueryKey): QueryKeyHash;
createConnection(): Promise<QueueDriverConnectionInterface>;
release(connection: QueueDriverConnectionInterface): void;
}
41 changes: 22 additions & 19 deletions packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,23 @@ import {
RetrieveForProcessingResponse,
QueueDriverOptions,
AddToQueueQuery,
AddToQueueOptions, AddToQueueResponse, QueryKey,
AddToQueueOptions,
AddToQueueResponse,
QueryKey,
QueryKeyHash
} from '@cubejs-backend/base-driver';
import { getProcessUid } from '@cubejs-backend/shared';

import { CubeStoreDriver } from './CubeStoreDriver';

function hashQueryKey(queryKey: QueryKey) {
function hashQueryKey(queryKey: QueryKey): QueryKeyHash {
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');

if (typeof queryKey === 'object' && queryKey.persistent) {
return `${hash}@${getProcessUid()}`;
return `${hash}@${getProcessUid()}` as any;
}

return hash;
return hash as any;
}

class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
Expand All @@ -29,7 +32,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
protected readonly options: QueueDriverOptions,
) { }

public redisHash(queryKey: QueryKey): string {
public redisHash(queryKey: QueryKey): QueryKeyHash {
return hashQueryKey(queryKey);
}

Expand Down Expand Up @@ -75,13 +78,13 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
}

// TODO: Looks useless, because we can do it in one step - getQueriesToCancel
public async getQueryAndRemove(queryKey: string): Promise<[QueryDef]> {
return [await this.cancelQuery(queryKey)];
public async getQueryAndRemove(hash: QueryKeyHash): Promise<[QueryDef]> {
return [await this.cancelQuery(hash)];
}

public async cancelQuery(queryKey: string): Promise<QueryDef | null> {
public async cancelQuery(hash: QueryKeyHash): Promise<QueryDef | null> {
const rows = await this.driver.query('QUEUE CANCEL ?', [
this.prefixKey(queryKey)
this.prefixKey(hash)
]);
if (rows && rows.length) {
return this.decodeQueryDefFromRow(rows[0], 'cancelQuery');
Expand All @@ -90,7 +93,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
return null;
}

public async freeProcessingLock(_queryKey: string, _processingId: string, _activated: unknown): Promise<void> {
public async freeProcessingLock(_hash: QueryKeyHash, _processingId: string, _activated: unknown): Promise<void> {
// nothing to do
}

Expand Down Expand Up @@ -170,7 +173,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
return [active, toProcess, defs];
}

public async getResult(queryKey: string): Promise<unknown> {
public async getResult(queryKey: QueryKey): Promise<unknown> {
const rows = await this.driver.query('QUEUE RESULT ?', [
this.prefixKey(this.redisHash(queryKey)),
]);
Expand Down Expand Up @@ -220,9 +223,9 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
return payload;
}

public async getQueryDef(queryKey: string): Promise<QueryDef | null> {
public async getQueryDef(queryKey: QueryKeyHash): Promise<QueryDef | null> {
const rows = await this.driver.query('QUEUE GET ?', [
this.prefixKey(this.redisHash(queryKey))
this.prefixKey(queryKey)
]);
if (rows && rows.length) {
return this.decodeQueryDefFromRow(rows[0], 'getQueryDef');
Expand All @@ -244,7 +247,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// nothing to release
}

public async retrieveForProcessing(queryKeyHashed: string, _processingId: string): Promise<RetrieveForProcessingResponse> {
public async retrieveForProcessing(queryKeyHashed: QueryKeyHash, _processingId: string): Promise<RetrieveForProcessingResponse> {
const rows = await this.driver.query('QUEUE RETRIEVE CONCURRENCY ? ?', [
this.options.concurrency,
this.prefixKey(queryKeyHashed),
Expand Down Expand Up @@ -276,18 +279,18 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
return null;
}

public async setResultAndRemoveQuery(queryKey: string, executionResult: any, _processingId: any): Promise<boolean> {
public async setResultAndRemoveQuery(hash: QueryKeyHash, executionResult: any, _processingId: any): Promise<boolean> {
await this.driver.query('QUEUE ACK ? ? ', [
this.prefixKey(queryKey),
this.prefixKey(hash),
executionResult ? JSON.stringify(executionResult) : executionResult
]);

return true;
}

public async updateHeartBeat(queryKey: string): Promise<void> {
public async updateHeartBeat(hash: QueryKeyHash): Promise<void> {
await this.driver.query('QUEUE HEARTBEAT ?', [
this.prefixKey(queryKey)
this.prefixKey(hash)
]);
}
}
Expand All @@ -300,7 +303,7 @@ export class CubeStoreQueueDriver implements QueueDriverInterface {

protected connection: CubeStoreDriver | null = null;

public redisHash(queryKey: QueryKey) {
public redisHash(queryKey: QueryKey): QueryKeyHash {
return hashQueryKey(queryKey);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { QueueDriverConnectionInterface, QueueDriverInterface } from '@cubejs-backend/base-driver';
import {
QueryKey,
QueryKeyHash,
QueueDriverConnectionInterface,
QueueDriverInterface,
} from '@cubejs-backend/base-driver';
import { getCacheHash } from './utils';

export abstract class BaseQueueDriver implements QueueDriverInterface {
public redisHash(queryKey: string) {
public redisHash(queryKey: QueryKey): QueryKeyHash {
return getCacheHash(queryKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class LocalQueueDriverConnection {
if (this.resultPromises[resultListKey] && this.resultPromises[resultListKey].resolved) {
return this.getResultBlocking(queryKey);
}

return null;
}

Expand Down Expand Up @@ -221,7 +222,7 @@ export class LocalQueueDriverConnection {
}

async getQueryDef(queryKey) {
return this.queryDef[this.redisHash(queryKey)];
return this.queryDef[queryKey];
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ export class QueryCache {
}

public queryRedisKey(cacheKey): string {
return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey));
return this.getKey('SQL_QUERY_RESULT', getCacheHash(cacheKey) as any);
}

public async cleanup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ export class QueryQueue {
if (priority == null) {
priority = 0;
}

if (!(priority >= -10000 && priority <= 10000)) {
throw new Error('Priority should be between -10000 and 10000');
}
Expand All @@ -330,8 +331,10 @@ export class QueryQueue {
return this.parseResult(result);
}

const queryKeyHash = this.redisHash(queryKey);

if (query.forceBuild) {
const jobExists = await queueConnection.getQueryDef(queryKey);
const jobExists = await queueConnection.getQueryDef(queryKeyHash);
if (jobExists) return null;
}

Expand Down Expand Up @@ -363,7 +366,7 @@ export class QueryQueue {

await this.reconcileQueue();

const queryDef = await queueConnection.getQueryDef(queryKey);
const queryDef = await queueConnection.getQueryDef(queryKeyHash);
const [active, toProcess] = await queueConnection.getQueryStageState(true);

if (queryDef) {
Expand All @@ -374,8 +377,8 @@ export class QueryQueue {
requestId: options.requestId,
activeQueryKeys: active,
toProcessQueryKeys: toProcess,
active: active.indexOf(this.redisHash(queryKey)) !== -1,
queueIndex: toProcess.indexOf(this.redisHash(queryKey)),
active: active.indexOf(queryKeyHash) !== -1,
queueIndex: toProcess.indexOf(queryKeyHash),
waitingForRequestId: queryDef.requestId
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class RedisQueueDriverConnection {
}

async getQueryDef(queryKey) {
const query = await this.redisClient.hgetAsync([this.queriesDefKey(), this.redisHash(queryKey)]);
const query = await this.redisClient.hgetAsync([this.queriesDefKey(), queryKey]);
return JSON.parse(query);
}

Expand Down
29 changes: 16 additions & 13 deletions packages/cubejs-query-orchestrator/src/orchestrator/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as querystring from 'querystring';
import crypto from 'crypto';

import { getProcessUid } from '@cubejs-backend/shared';
import { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver';

function parseHostPort(addr: string): { host: string, port: number } {
if (addr.includes(':')) {
Expand Down Expand Up @@ -187,17 +188,19 @@ export const processUidRE = /^[0-9a-f]{8}\b-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-
/**
* Returns query hash by specified `queryKey`.
*/
export function getCacheHash(queryKey) {
return typeof queryKey === 'string' && queryKey.length < 256
? queryKey
: `${
crypto
.createHash('md5')
.update(JSON.stringify(queryKey))
.digest('hex')
}${
typeof queryKey === 'object' && queryKey.persistent
? `@${getProcessUid()}`
: ''
}`;
export function getCacheHash(queryKey: QueryKey): QueryKeyHash {
if (typeof queryKey === 'string' && queryKey.length < 256) {
return queryKey as any;
}

const hash = crypto
.createHash('md5')
.update(JSON.stringify(queryKey))
.digest('hex');

if (typeof queryKey === 'object' && queryKey.persistent) {
return `${hash}@${getProcessUid()}` as any;
}

return hash as any;
}
Loading