Skip to content

Commit

Permalink
[FIXUP] replace generic-pool with just a client
Browse files Browse the repository at this point in the history
  • Loading branch information
mcheshkov committed Nov 18, 2024
1 parent c724f59 commit 75c2d68
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 70 deletions.
1 change: 0 additions & 1 deletion packages/cubejs-clickhouse-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
"@clickhouse/client": "^1.7.0",
"@cubejs-backend/base-driver": "1.1.4",
"@cubejs-backend/shared": "1.1.4",
"generic-pool": "^3.6.0",
"moment": "^2.24.0",
"sqlstring": "^2.3.1",
"uuid": "^8.3.2"
Expand Down
108 changes: 39 additions & 69 deletions packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import * as process from 'node:process';
import { Readable } from 'node:stream';
import { ClickHouseClient, createClient } from '@clickhouse/client';
import type { ClickHouseSettings, ResponseJSON } from '@clickhouse/client';
import genericPool from 'generic-pool';
import type { Factory, Pool } from 'generic-pool';
import { v4 as uuidv4 } from 'uuid';
import sqlstring from 'sqlstring';

Expand Down Expand Up @@ -119,9 +117,8 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
return 5;
}

protected readonly connectionFactory: Factory<ClickHouseClient>;

protected readonly pool: Pool<ClickHouseClient>;
// ClickHouseClient has internal pool of several sockets, no need for generic-pool
protected readonly client: ClickHouseClient;

protected readonly readOnlyMode: boolean;

Expand Down Expand Up @@ -168,81 +165,54 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
},
};

this.connectionFactory = {
create: async () => createClient({
url: this.config.url,
username: this.config.username,
password: this.config.password,
database: this.config.database,
clickhouse_settings: this.config.clickhouseSettings,
// TODO max_open_connections vs generic pool
max_open_connections: 1,
}),
validate: async (client) => {
const result = await client.ping();
if (!result.success) {
this.databasePoolError(result.error);
}
return result.success;
},
destroy: (client) => client.close(),
};
const maxPoolSize = config.maxPoolSize || getEnv("dbMaxPoolSize", { dataSource }) || 8;

// TODO @clickhouse/client have internal pool, that does NOT guarantee same connection, and can break with temp tables. Disable it?
this.pool = genericPool.createPool(
this.connectionFactory,
{
min: 0,
max: config.maxPoolSize || getEnv("dbMaxPoolSize", { dataSource }) || 8,
evictionRunIntervalMillis: 10000,
softIdleTimeoutMillis: 30000,
idleTimeoutMillis: 30000,
acquireTimeoutMillis: 20000,
}
);

// https://github.com/coopernurse/node-pool/blob/ee5db9ddb54ce3a142fde3500116b393d4f2f755/README.md#L220-L226
this.pool.on('factoryCreateError', (err) => {
this.databasePoolError(err);
});
this.pool.on('factoryDestroyError', (err) => {
this.databasePoolError(err);
});
this.client = this.createClient(maxPoolSize);
}

protected withConnection<T>(fn: (con: ClickHouseClient, queryId: string) => Promise<T>): Promise<T> {
console.log("withConnection call");
protected withCancel<T>(fn: (con: ClickHouseClient, queryId: string) => Promise<T>): Promise<T> {
console.log("withCancel call");
const queryId = uuidv4();

const abortController = new AbortController();
const { signal } = abortController;

const promise = (async () => {
const connection = await this.pool.acquire();
try {
signal.throwIfAborted();
// TODO pass signal deeper, new driver supports abort signal, but does not do autokill
const result = await fn(connection, queryId);
signal.throwIfAborted();
return result;
} finally {
await this.pool.release(connection);
}
await this.client.ping();
signal.throwIfAborted();
// TODO pass signal deeper, new driver supports abort signal, but does not do autokill
const result = await fn(this.client, queryId);
signal.throwIfAborted();
return result;
})();
// TODO why do we need this?
(promise as any).cancel = async () => {
abortController.abort();
// TODO kill is sent thru same pool, which can be busy, use separate pool/connection.
await this.withConnection(async conn => {
await conn.command({
// Use separate client for kill query, usual pool may be busy
const killClient = this.createClient(1);
try {
await killClient.command({
query: `KILL QUERY WHERE query_id = '${queryId}'`,
});
});
} finally {
await killClient.close();
}
};

return promise;
}

protected createClient(maxPoolSize: number): ClickHouseClient {
return createClient({
url: this.config.url,
username: this.config.username,
password: this.config.password,
database: this.config.database,
clickhouse_settings: this.config.clickhouseSettings,
max_open_connections: maxPoolSize,
});
}

public async testConnection() {
await this.query('SELECT 1', []);
}
Expand Down Expand Up @@ -274,7 +244,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {

console.log('ClickHouse queryResponse prepared', formattedQuery);

return this.withConnection(async (connection, queryId) => {
return this.withCancel(async (connection, queryId) => {
// if (formattedQuery.startsWith("CREATE TABLE")) {
//
// }
Expand Down Expand Up @@ -343,8 +313,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
}

public async release() {
await this.pool.drain();
await this.pool.clear();
await this.client.close();
}

public informationSchemaQuery() {
Expand Down Expand Up @@ -400,14 +369,15 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
): Promise<StreamTableDataWithTypes> {
console.log('ClickHouse stream call', query, values);

const conn = await this.connectionFactory.create();
// Use separate client for this long-living query
const client = this.createClient(1);

try {
const formattedQuery = sqlstring.format(query, values);

console.log('ClickHouse stream prepared', formattedQuery);

const resultSet = await conn.query({
const resultSet = await client.query({
query: formattedQuery,
query_id: uuidv4(),
format: 'JSONCompactEachRowWithNamesAndTypes',
Expand Down Expand Up @@ -462,11 +432,11 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {
};
}),
release: async () => {
await this.connectionFactory.destroy(conn);
await client.close();
}
};
} catch (e) {
await this.connectionFactory.destroy(conn);
await client.close();

throw e;
}
Expand Down Expand Up @@ -682,7 +652,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {

// This is not part of a driver interface, and marked public only for testing
public async command(query: string): Promise<void> {
await this.withConnection(async (connection) => {
await this.withCancel(async (connection) => {
await connection.command({
query,
});
Expand All @@ -691,7 +661,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface {

// This is not part of a driver interface, and marked public only for testing
public async insert(table: string, values: Array<Array<unknown>>): Promise<void> {
await this.withConnection(async (connection) => {
await this.withCancel(async (connection) => {
await connection.insert({
table,
values,
Expand Down

0 comments on commit 75c2d68

Please sign in to comment.