Skip to content

Commit

Permalink
fix: Broken query and pre-aggregation cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
paveltiunov committed Apr 12, 2020
1 parent 950ba84 commit aa82256
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 48 deletions.
15 changes: 8 additions & 7 deletions packages/cubejs-mysql-driver/driver/MySqlDriver.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,12 @@ class MySqlDriver extends BaseDriver {

let cancelled = false;
const cancelObj = {};
const promise = connectionPromise.then(conn => {
const promise = connectionPromise.then(async conn => {
const [{ connectionId }] = await conn.execute('select connection_id() as connectionId');
cancelObj.cancel = async () => {
cancelled = true;
await self.withConnection(async processConnection => {
const processRows = await processConnection.execute('SHOW PROCESSLIST');
await Promise.all(processRows.filter(row => row.Time >= 599)
.map(row => processConnection.execute(`KILL ${row.Id}`)));
await processConnection.execute(`KILL ${connectionId}`);
});
};
return fn(conn)
Expand Down Expand Up @@ -128,10 +127,12 @@ class MySqlDriver extends BaseDriver {
return GenericTypeToMySql[columnType] || super.fromGenericType(columnType);
}

async loadPreAggregationIntoTable(preAggregationTableName, loadSql, params, tx) {
loadPreAggregationIntoTable(preAggregationTableName, loadSql, params, tx) {
if (this.config.loadPreAggregationWithoutMetaLock) {
await this.query(`${loadSql} LIMIT 0`, params);
return this.query(loadSql.replace(/^CREATE TABLE (\S+) AS/i, 'INSERT INTO $1'), params);
return this.cancelCombinator(async saveCancelFn => {
await saveCancelFn(this.query(`${loadSql} LIMIT 0`, params));
await saveCancelFn(this.query(loadSql.replace(/^CREATE TABLE (\S+) AS/i, 'INSERT INTO $1'), params));
});
}
return super.loadPreAggregationIntoTable(preAggregationTableName, loadSql, params, tx);
}
Expand Down
5 changes: 5 additions & 0 deletions packages/cubejs-query-orchestrator/driver/BaseDriver.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const { reduce } = require('ramda');
const { cancelCombinator } = require('./utils');

const sortByKeys = (unordered) => {
const ordered = {};
Expand Down Expand Up @@ -169,6 +170,10 @@ class BaseDriver {
quoteIdentifier(identifier) {
return `"${identifier}"`;
}

cancelCombinator(fn) {
return cancelCombinator(fn);
}
}

module.exports = BaseDriver;
12 changes: 12 additions & 0 deletions packages/cubejs-query-orchestrator/driver/utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
exports.cancelCombinator = (fn) => {
const cancelFnArray = [];
const saveCancelFn = promise => {
if (promise.cancel) {
cancelFnArray.push(promise.cancel);
}
return promise;
};
const promise = fn(saveCancelFn);
promise.cancel = () => Promise.all(cancelFnArray.map(cancel => cancel()));
return promise;
};
59 changes: 30 additions & 29 deletions packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const crypto = require('crypto');
const R = require('ramda');
const { cancelCombinator } = require('../driver/utils');
const RedisCacheDriver = require('./RedisCacheDriver');
const LocalCacheDriver = require('./LocalCacheDriver');

Expand Down Expand Up @@ -380,13 +381,13 @@ class PreAggregationLoader {
refreshStrategy = readOnly ?
this.refreshImplStreamExternalStrategy : this.refreshImplTempTableExternalStrategy;
}
const resultPromise = refreshStrategy.bind(this)(client, newVersionEntry);
resultPromise.cancel = () => {}; // TODO implement cancel (loading rollup into table and external upload)
return resultPromise;
return cancelCombinator(
saveCancelFn => refreshStrategy.bind(this)(client, newVersionEntry, saveCancelFn)
);
};
}

async refreshImplStoreInSourceStrategy(client, newVersionEntry) {
async refreshImplStoreInSourceStrategy(client, newVersionEntry, saveCancelFn) {
const [loadSql, params] =
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
const targetTableName = this.targetTableName(newVersionEntry);
Expand All @@ -402,18 +403,18 @@ class PreAggregationLoader {
targetTableName,
requestId: this.requestId
});
await client.loadPreAggregationIntoTable(
await saveCancelFn(client.loadPreAggregationIntoTable(
targetTableName,
query,
params
);
await this.createIndexes(client, newVersionEntry);
));
await this.createIndexes(client, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(client, targetTableName);
await this.dropOrphanedTables(client, targetTableName, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
}

async refreshImplTempTableExternalStrategy(client, newVersionEntry) {
async refreshImplTempTableExternalStrategy(client, newVersionEntry, saveCancelFn) {
const [loadSql, params] =
Array.isArray(this.preAggregation.loadSql) ? this.preAggregation.loadSql : [this.preAggregation.loadSql, []];
await client.createSchemaIfNotExists(this.preAggregation.preAggregationsSchema);
Expand All @@ -430,18 +431,18 @@ class PreAggregationLoader {
targetTableName,
requestId: this.requestId
});
await client.loadPreAggregationIntoTable(
await saveCancelFn(client.loadPreAggregationIntoTable(
targetTableName,
query,
params
);
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry);
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
));
const tableData = await this.downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn);
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(client, targetTableName);
await this.dropOrphanedTables(client, targetTableName, saveCancelFn);
}

async refreshImplStreamExternalStrategy(client, newVersionEntry) {
async refreshImplStreamExternalStrategy(client, newVersionEntry, saveCancelFn) {
const [sql, params] =
Array.isArray(this.preAggregation.sql) ? this.preAggregation.sql : [this.preAggregation.sql, []];
if (!client.downloadQueryResults) {
Expand All @@ -452,12 +453,12 @@ class PreAggregationLoader {
preAggregation: this.preAggregation,
requestId: this.requestId
});
const tableData = await client.downloadQueryResults(sql, params);
await this.uploadExternalPreAggregation(tableData, newVersionEntry);
const tableData = await saveCancelFn(client.downloadQueryResults(sql, params));
await this.uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
}

async downloadTempExternalPreAggregation(client, newVersionEntry) {
async downloadTempExternalPreAggregation(client, newVersionEntry, saveCancelFn) {
if (!client.downloadTable) {
throw new Error(`Can't load external pre-aggregation: source driver doesn't support downloadTable()`);
}
Expand All @@ -466,12 +467,12 @@ class PreAggregationLoader {
preAggregation: this.preAggregation,
requestId: this.requestId
});
const tableData = await client.downloadTable(table);
tableData.types = await client.tableColumnTypes(table);
const tableData = await saveCancelFn(client.downloadTable(table));
tableData.types = await saveCancelFn(client.tableColumnTypes(table));
return tableData;
}

async uploadExternalPreAggregation(tableData, newVersionEntry) {
async uploadExternalPreAggregation(tableData, newVersionEntry, saveCancelFn) {
const table = this.targetTableName(newVersionEntry);
const externalDriver = await this.externalDriverFactory();
if (!externalDriver.uploadTable) {
Expand All @@ -481,13 +482,13 @@ class PreAggregationLoader {
preAggregation: this.preAggregation,
requestId: this.requestId
});
await externalDriver.uploadTable(table, tableData.types, tableData);
await this.createIndexes(externalDriver, newVersionEntry);
await saveCancelFn(externalDriver.uploadTable(table, tableData.types, tableData));
await this.createIndexes(externalDriver, newVersionEntry, saveCancelFn);
await this.loadCache.reset(this.preAggregation);
await this.dropOrphanedTables(externalDriver, table);
await this.dropOrphanedTables(externalDriver, table, saveCancelFn);
}

async createIndexes(driver, newVersionEntry) {
async createIndexes(driver, newVersionEntry, saveCancelFn) {
if (!this.preAggregation.indexesSql || !this.preAggregation.indexesSql.length) {
return;
}
Expand All @@ -503,7 +504,7 @@ class PreAggregationLoader {
requestId: this.requestId,
sql
});
await driver.query(
await saveCancelFn(driver.query(
QueryCache.replacePreAggregationTableNames(
query,
this.preAggregationsTablesToTempTables.concat([
Expand All @@ -512,11 +513,11 @@ class PreAggregationLoader {
])
),
params
);
));
}
}

async dropOrphanedTables(client, justCreatedTable) {
async dropOrphanedTables(client, justCreatedTable, saveCancelFn) {
await this.preAggregations.addTableUsed(justCreatedTable);
const actualTables = await client.getTablesQuery(this.preAggregation.preAggregationsSchema);
const versionEntries = tablesToVersionEntries(this.preAggregation.preAggregationsSchema, actualTables);
Expand All @@ -536,7 +537,7 @@ class PreAggregationLoader {
tablesToDrop: JSON.stringify(toDrop),
requestId: this.requestId
});
await Promise.all(toDrop.map(table => client.dropTable(table)));
await Promise.all(toDrop.map(table => saveCancelFn(client.dropTable(table))));
}
}

Expand Down
17 changes: 10 additions & 7 deletions packages/cubejs-query-orchestrator/orchestrator/QueryQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,16 @@ class QueryQueue {
error: (e.stack || e).toString()
});
if (e instanceof TimeoutError) {
this.logger('Cancelling query due to timeout', {
processingId,
queryKey: query.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: query.requestId
});
await this.sendCancelMessageFn(query);
const queryWithCancelHandle = await redisClient.getQueryDef(queryKey);
if (queryWithCancelHandle) {
this.logger('Cancelling query due to timeout', {
processingId,
queryKey: queryWithCancelHandle.queryKey,
queuePrefix: this.redisQueuePrefix,
requestId: queryWithCancelHandle.requestId
});
await this.sendCancelMessageFn(queryWithCancelHandle);
}
}
}

Expand Down
53 changes: 48 additions & 5 deletions packages/cubejs-query-orchestrator/test/QueryOrchestrator.test.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
/* globals describe, it, should, before */
/* globals describe, beforeAll, afterAll, test, expect */
const QueryOrchestrator = require('../orchestrator/QueryOrchestrator');

class MockDriver {
constructor() {
this.tables = [];
this.executedQueries = [];
this.cancelledQueries = [];
}

async query(query) {
query(query) {
this.executedQueries.push(query);
return [query];
let promise = Promise.resolve([query]);
if (query.match(`orders_too_big`)) {
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 3000)));
}
promise.cancel = async () => {
this.cancelledQueries.push(query);
};
return promise;
}

async getTablesQuery(schema) {
Expand All @@ -21,19 +29,29 @@ class MockDriver {
return null;
}

async loadPreAggregationIntoTable(preAggregationTableName) {
loadPreAggregationIntoTable(preAggregationTableName, loadSql) {
this.tables.push(preAggregationTableName.substring(0, 100));
return this.query(loadSql);
}

async dropTable(tableName) {
this.tables = this.tables.filter(t => t !== tableName.split('.')[1]);
return this.query(`DROP TABLE ${tableName}`);
}
}

describe('QueryOrchestrator', () => {
let mockDriver = null;
const queryOrchestrator = new QueryOrchestrator(
'TEST', async () => mockDriver, (msg, params) => console.log(msg, params)
'TEST',
async () => mockDriver,
(msg, params) => console.log(msg, params), {
preAggregationsOptions: {
queueOptions: {
executionTimeout: 1
}
}
}
);

beforeAll(() => {
Expand Down Expand Up @@ -119,4 +137,29 @@ describe('QueryOrchestrator', () => {
}
expect(thrown).toBe(true);
});

test('cancel pre-aggregation', async () => {
const query = {
query: "SELECT \"orders__created_at_week\" \"orders__created_at_week\", sum(\"orders__count\") \"orders__count\" FROM (SELECT * FROM stb_pre_aggregations.orders_number_and_count20181101) as partition_union WHERE (\"orders__created_at_week\" >= ($1::timestamptz::timestamptz AT TIME ZONE 'UTC') AND \"orders__created_at_week\" <= ($2::timestamptz::timestamptz AT TIME ZONE 'UTC')) GROUP BY 1 ORDER BY 1 ASC LIMIT 10000",
values: ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"],
cacheKeyQueries: {
renewalThreshold: 21600,
queries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
},
preAggregations: [{
preAggregationsSchema: "stb_pre_aggregations",
tableName: "stb_pre_aggregations.orders_number_and_count20181101",
loadSql: ["CREATE TABLE stb_pre_aggregations.orders_number_and_count20181101 AS SELECT\n date_trunc('week', (\"orders\".created_at::timestamptz AT TIME ZONE 'UTC')) \"orders__created_at_week\", count(\"orders\".id) \"orders__count\", sum(\"orders\".number) \"orders__number\"\n FROM\n public.orders_too_big AS \"orders\"\n WHERE (\"orders\".created_at >= $1::timestamptz AND \"orders\".created_at <= $2::timestamptz) GROUP BY 1", ["2018-11-01T00:00:00Z", "2018-11-30T23:59:59Z"]],
invalidateKeyQueries: [["SELECT date_trunc('hour', (NOW()::timestamptz AT TIME ZONE 'UTC')) as current_hour", []]]
}],
renewQuery: true,
requestId: 'cancel pre-aggregation'
};
try {
await queryOrchestrator.fetchQuery(query);
} catch (e) {
expect(e.toString()).toMatch(/timeout/);
}
expect(mockDriver.cancelledQueries[0]).toMatch(/orders_too_big/);
});
});

0 comments on commit aa82256

Please sign in to comment.