diff --git a/packages/cubejs-mysql-driver/driver/MySqlDriver.js b/packages/cubejs-mysql-driver/driver/MySqlDriver.js index 21eda9d74f781..411a1b2b6aade 100644 --- a/packages/cubejs-mysql-driver/driver/MySqlDriver.js +++ b/packages/cubejs-mysql-driver/driver/MySqlDriver.js @@ -63,13 +63,12 @@ class MySqlDriver extends BaseDriver { let cancelled = false; const cancelObj = {}; - const promise = connectionPromise.then(conn => { + const promise = connectionPromise.then(async conn => { + const [{ connectionId }] = await conn.execute('select connection_id() as connectionId'); cancelObj.cancel = async () => { cancelled = true; await self.withConnection(async processConnection => { - const processRows = await processConnection.execute('SHOW PROCESSLIST'); - await Promise.all(processRows.filter(row => row.Time >= 599) - .map(row => processConnection.execute(`KILL ${row.Id}`))); + await processConnection.execute(`KILL ${connectionId}`); }); }; return fn(conn) @@ -128,10 +127,12 @@ class MySqlDriver extends BaseDriver { return GenericTypeToMySql[columnType] || super.fromGenericType(columnType); } - async loadPreAggregationIntoTable(preAggregationTableName, loadSql, params, tx) { + loadPreAggregationIntoTable(preAggregationTableName, loadSql, params, tx) { if (this.config.loadPreAggregationWithoutMetaLock) { - await this.query(`${loadSql} LIMIT 0`, params); - return this.query(loadSql.replace(/^CREATE TABLE (\S+) AS/i, 'INSERT INTO $1'), params); + return this.cancelCombinator(async saveCancelFn => { + await saveCancelFn(this.query(`${loadSql} LIMIT 0`, params)); + await saveCancelFn(this.query(loadSql.replace(/^CREATE TABLE (\S+) AS/i, 'INSERT INTO $1'), params)); + }); } return super.loadPreAggregationIntoTable(preAggregationTableName, loadSql, params, tx); } diff --git a/packages/cubejs-query-orchestrator/driver/BaseDriver.js b/packages/cubejs-query-orchestrator/driver/BaseDriver.js index 1e65bb0e50cb8..e75887fd255dc 100644 --- a/packages/cubejs-query-orchestrator/driver/BaseDriver.js +++ b/packages/cubejs-query-orchestrator/driver/BaseDriver.js @@ -1,4 +1,5 @@ const { reduce } = require('ramda'); +const { cancelCombinator } = require('./utils'); const sortByKeys = (unordered) => { const ordered = {}; @@ -169,6 +170,10 @@ class BaseDriver { quoteIdentifier(identifier) { return `"${identifier}"`; } + + cancelCombinator(fn) { + return cancelCombinator(fn); + } } module.exports = BaseDriver; diff --git a/packages/cubejs-query-orchestrator/driver/utils.js b/packages/cubejs-query-orchestrator/driver/utils.js new file mode 100644 index 0000000000000..2dcdcaf99f4e7 --- /dev/null +++ b/packages/cubejs-query-orchestrator/driver/utils.js @@ -0,0 +1,12 @@ +exports.cancelCombinator = (fn) => { + const cancelFnArray = []; + const saveCancelFn = promise => { + if (promise.cancel) { + cancelFnArray.push(promise.cancel); + } + return promise; + }; + const promise = fn(saveCancelFn); + promise.cancel = () => Promise.all(cancelFnArray.map(cancel => cancel())); + return promise; +}; diff --git a/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js b/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js index d6dbb21412b81..a9aac14e17ca9 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js +++ b/packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js @@ -1,5 +1,6 @@ const crypto = require('crypto'); const R = require('ramda'); +const { cancelCombinator } = require('../driver/utils'); const RedisCacheDriver = require('./RedisCacheDriver'); const LocalCacheDriver = require('./LocalCacheDriver'); @@ -380,13 +381,13 @@ class PreAggregationLoader { refreshStrategy = readOnly ? this.refreshImplStreamExternalStrategy : this.refreshImplTempTableExternalStrategy; } - const resultPromise = refreshStrategy.bind(this)(client, newVersionEntry); - resultPromise.cancel = () => {}; // TODO implement cancel (loading rollup into table and external upload) - return resultPromise; + return cancelCombinator( + saveCancelFn => refreshStrategy.bind(this)(client, newVersionEntry, saveCancelFn) + ); }; } - async refreshImplStoreInSourceStrategy(client, newVersionEntry) { + async refreshImplStoreInSourceStrategy(client, newVersionEntry, saveCancelFn) { const [loadSql, params] = Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; const targetTableName = this.targetTableName(newVersionEntry); @@ -402,18 +403,18 @@ class PreAggregationLoader { targetTableName, requestId: this.requestId }); - await client.loadPreAggregationIntoTable( + await saveCancelFn(client.loadPreAggregationIntoTable( targetTableName, query, params - ); - await this.createIndexes(client, newVersionEntry); + )); + await this.createIndexes(client, newVersionEntry, saveCancelFn); await this.loadCache.reset(this.preAggregation); - await this.dropOrphanedTables(client, targetTableName); + await this.dropOrphanedTables(client, targetTableName, saveCancelFn); await this.loadCache.reset(this.preAggregation); } - async refreshImplTempTableExternalStrategy(client, newVersionEntry) { + async refreshImplTempTableExternalStrategy(client, newVersionEntry, saveCancelFn) { const [loadSql, params] = Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []]; await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema); @@ -430,18 +431,18 @@ class PreAggregationLoader { targetTableName, requestId: this.requestId }); - await client.loadPreAggregationIntoTable( + await saveCancelFn(client.loadPreAggregationIntoTable( targetTableName, query, params - ); - const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry); - await this.uploadExternalPreAggregation(tableData, newVersionEntry); + )); + const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn); + await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn); await this.loadCache.reset(this.preAggregation); - await this.dropOrphanedTables(client, targetTableName); + await this.dropOrphanedTables(client, targetTableName, saveCancelFn); } - async refreshImplStreamExternalStrategy(client, newVersionEntry) { + async refreshImplStreamExternalStrategy(client, newVersionEntry, saveCancelFn) { const [sql, params] = Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []]; if (!client.downloadQueryResults) { @@ -452,12 +453,12 @@ class PreAggregationLoader { preAggregation: this.preAggregation, requestId: this.requestId }); - const tableData = await client.downloadQueryResults(sql, params); - await this.uploadExternalPreAggregation(tableData, newVersionEntry); + const tableData = await saveCancelFn(client.downloadQueryResults(sql, params)); + await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn); await this.loadCache.reset(this.preAggregation); } - async downloadTempExternalPreAggregation(client, newVersionEntry) { + async downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn) { if (!client.downloadTable) { throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`); } @@ -466,12 +467,12 @@ class PreAggregationLoader { preAggregation: this.preAggregation, requestId: this.requestId }); - const tableData = await client.downloadTable(table); - tableData.types = await client.tableColumnTypes(table); + const tableData = await saveCancelFn(client.downloadTable(table)); + tableData.types = await saveCancelFn(client.tableColumnTypes(table)); return tableData; } - async uploadExternalPreAggregation(tableData, newVersionEntry) { + async uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn) { const table = this.targetTableName(newVersionEntry); const externalDriver = await this.externalDriverFactory(); if (!externalDriver.uploadTable) { @@ -481,13 +482,13 @@ class PreAggregationLoader { preAggregation: this.preAggregation, requestId: this.requestId }); - await externalDriver.uploadTable(table, tableData.types, tableData); - await this.createIndexes(externalDriver, newVersionEntry); + await saveCancelFn(externalDriver.uploadTable(table, tableData.types, tableData)); + await this.createIndexes(externalDriver, newVersionEntry, saveCancelFn); await this.loadCache.reset(this.preAggregation); - await this.dropOrphanedTables(externalDriver, table); + await this.dropOrphanedTables(externalDriver, table, saveCancelFn); } - async createIndexes(driver, newVersionEntry) { + async createIndexes(driver, newVersionEntry, saveCancelFn) { if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) { return; } @@ -503,7 +504,7 @@ class PreAggregationLoader { requestId: this.requestId, sql }); - await driver.query( + await saveCancelFn(driver.query( QueryCache.replacePreAggregationTableNames( query, this.preAggregationsTablesToTempTables.concat([ @@ -512,11 +513,11 @@ class PreAggregationLoader { ]) ), params - ); + )); } } - async dropOrphanedTables(client, justCreatedTable) { + async dropOrphanedTables(client, justCreatedTable, saveCancelFn) { await this.preAggregations.addTableUsed(justCreatedTable); const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema); const versionEntries = tablesToVersionEntries(this.preAggregation.preAggregationsSchema, actualTables); @@ -536,7 +537,7 @@ class PreAggregationLoader { tablesToDrop: JSON.stringify(toDrop), requestId: this.requestId }); - await Promise.all(toDrop.map(table => client.dropTable(table))); + await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table)))); } } diff --git a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js index d43994c5c99af..d1dfd7de3a046 100644 --- a/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js @@ -274,13 +274,16 @@ class QueryQueue { error: (e.stack || e).toString() }); if (e instanceof TimeoutError) { - this.logger('Cancelling query due to timeout', { - processingId, - queryKey: query.queryKey, - queuePrefix: this.redisQueuePrefix, - requestId: query.requestId - }); - await this.sendCancelMessageFn(query); + const queryWithCancelHandle = await redisClient.getQueryDef(queryKey); + if (queryWithCancelHandle) { + this.logger('Cancelling query due to timeout', { + processingId, + queryKey: queryWithCancelHandle.queryKey, + queuePrefix: this.redisQueuePrefix, + requestId: queryWithCancelHandle.requestId + }); + await this.sendCancelMessageFn(queryWithCancelHandle); + } } } diff --git a/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js b/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js index 87fb470c8e96c..9bd6a68e2aede 100644 --- a/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js +++ b/packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js @@ -1,15 +1,23 @@ -/* globals describe, it, should, before */ +/* globals describe, beforeAll, afterAll, test, expect */ const QueryOrchestrator = require('../orchestrator/QueryOrchestrator'); class MockDriver { constructor() { this.tables = []; this.executedQueries = []; + this.cancelledQueries = []; } - async query(query) { + query(query) { this.executedQueries.push(query); - return [query]; + let promise = Promise.resolve([query]); + if (query.match(`orders_too_big`)) { + promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000))); + } + promise.cancel = async () => { + this.cancelledQueries.push(query); + }; + return promise; } async getTablesQuery(schema) { @@ -21,19 +29,29 @@ class MockDriver { return null; } - async loadPreAggregationIntoTable(preAggregationTableName) { + loadPreAggregationIntoTable(preAggregationTableName, loadSql) { this.tables.push(preAggregationTableName.substring(0, 100)); + return this.query(loadSql); } async dropTable(tableName) { this.tables = this.tables.filter(t => t !== tableName.split('.')[1]); + return this.query(`DROP TABLE ${tableName}`); } } describe('QueryOrchestrator', () => { let mockDriver = null; const queryOrchestrator = new QueryOrchestrator( - 'TEST', async () => mockDriver, (msg, params) => console.log(msg, params) + 'TEST', + async () => mockDriver, + (msg, params) => console.log(msg, params), { + preAggregationsOptions: { + queueOptions: { + executionTimeout: 1 + } + } + } ); beforeAll(() => { @@ -119,4 +137,29 @@ describe('QueryOrchestrator', () => { } expect(thrown).toBe(true); }); + + test('cancel pre-aggregation', async () => { + const query = { + query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20181101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000", + values: ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"], + cacheKeyQueries: { + renewalThreshold: 21600, + queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]] + }, + preAggregations: [{ + preAggregationsSchema: "stb_pre_aggregations", + tableName: "stb_pre_aggregations.orders_number_and_count20181101", + loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20181101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders_too_big AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"]], + invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]] + }], + renewQuery: true, + requestId: 'cancel pre-aggregation' + }; + try { + await queryOrchestrator.fetchQuery(query); + } catch (e) { + expect(e.toString()).toMatch(/timeout/); + } + expect(mockDriver.cancelledQueries[0]).toMatch(/orders_too_big/); + }); });