diff --git a/packages/components/nodes/memory/AgentMemory/MySQLAgentMemory/mysqlSaver.ts b/packages/components/nodes/memory/AgentMemory/MySQLAgentMemory/mysqlSaver.ts index 525886ee21f..4214734f53c 100644 --- a/packages/components/nodes/memory/AgentMemory/MySQLAgentMemory/mysqlSaver.ts +++ b/packages/components/nodes/memory/AgentMemory/MySQLAgentMemory/mysqlSaver.ts @@ -21,6 +21,13 @@ export class MySQLSaver extends BaseCheckpointSaver implements MemoryMethods { private async getDataSource(): Promise { const { datasourceOptions } = this.config + if (!datasourceOptions) { + throw new Error('No datasource options provided') + } + // Prevent using default Postgres port, otherwise will throw uncaught error and crashing the app + if (datasourceOptions.port === 5432) { + throw new Error('Invalid port number') + } const dataSource = new DataSource(datasourceOptions) await dataSource.initialize() return dataSource diff --git a/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts b/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts index 11f261a47e3..702c8cfad56 100644 --- a/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts +++ b/packages/components/nodes/memory/AgentMemory/PostgresAgentMemory/pgSaver.ts @@ -21,6 +21,13 @@ export class PostgresSaver extends BaseCheckpointSaver implements MemoryMethods private async getDataSource(): Promise { const { datasourceOptions } = this.config + if (!datasourceOptions) { + throw new Error('No datasource options provided') + } + // Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app + if (datasourceOptions.port === 3006) { + throw new Error('Invalid port number') + } const dataSource = new DataSource(datasourceOptions) await dataSource.initialize() return dataSource diff --git a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts index f246fcf4726..193c13ba82a 100644 --- a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts +++ b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts @@ -1,7 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base' -import { DataSource, QueryRunner } from 'typeorm' +import { DataSource } from 'typeorm' class MySQLRecordManager_RecordManager implements INode { label: string @@ -167,29 +167,37 @@ type MySQLRecordManagerOptions = { class MySQLRecordManager implements RecordManagerInterface { lc_namespace = ['langchain', 'recordmanagers', 'mysql'] - - datasource: DataSource - - queryRunner: QueryRunner - + config: MySQLRecordManagerOptions tableName: string - namespace: string constructor(namespace: string, config: MySQLRecordManagerOptions) { - const { mysqlOptions, tableName } = config + const { tableName } = config this.namespace = namespace this.tableName = tableName || 'upsertion_records' - this.datasource = new DataSource(mysqlOptions) + this.config = config + } + + private async getDataSource(): Promise { + const { mysqlOptions } = this.config + if (!mysqlOptions) { + throw new Error('No datasource options provided') + } + // Prevent using default Postgres port, otherwise will throw uncaught error and crashing the app + if (mysqlOptions.port === 5432) { + throw new Error('Invalid port number') + } + const dataSource = new DataSource(mysqlOptions) + await dataSource.initialize() + return dataSource } async createSchema(): Promise { try { - const appDataSource = await this.datasource.initialize() - - this.queryRunner = appDataSource.createQueryRunner() + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() - await this.queryRunner.manager.query(`create table if not exists \`${this.tableName}\` ( + await queryRunner.manager.query(`create table if not exists \`${this.tableName}\` ( \`uuid\` varchar(36) primary key default (UUID()), \`key\` varchar(255) not null, \`namespace\` varchar(255) not null, @@ -197,17 +205,20 @@ class MySQLRecordManager implements RecordManagerInterface { \`group_id\` longtext, unique key \`unique_key_namespace\` (\`key\`, \`namespace\`));`) + const columns = [`updated_at`, `key`, `namespace`, `group_id`] for (const column of columns) { // MySQL does not support 'IF NOT EXISTS' function for Index - const Check = await this.queryRunner.manager.query( + const Check = await queryRunner.manager.query( `SELECT COUNT(1) IndexIsThere FROM INFORMATION_SCHEMA.STATISTICS WHERE table_schema=DATABASE() AND table_name='${this.tableName}' AND index_name='${column}_index';` ) if (Check[0].IndexIsThere === 0) - await this.queryRunner.manager.query(`CREATE INDEX \`${column}_index\` + await queryRunner.manager.query(`CREATE INDEX \`${column}_index\` ON \`${this.tableName}\` (\`${column}\`);`) } + + await queryRunner.release() } catch (e: any) { // This error indicates that the table already exists // Due to asynchronous nature of the code, it is possible that @@ -221,12 +232,17 @@ class MySQLRecordManager implements RecordManagerInterface { } async getTime(): Promise { + const dataSource = await this.getDataSource() try { - const res = await this.queryRunner.manager.query(`SELECT UNIX_TIMESTAMP(NOW()) AS epoch`) + const queryRunner = dataSource.createQueryRunner() + const res = await queryRunner.manager.query(`SELECT UNIX_TIMESTAMP(NOW()) AS epoch`) + await queryRunner.release() return Number.parseFloat(res[0].epoch) } catch (error) { console.error('Error getting time in MySQLRecordManager:') throw error + } finally { + await dataSource.destroy() } } @@ -235,6 +251,9 @@ class MySQLRecordManager implements RecordManagerInterface { return } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + const updatedAt = await this.getTime() const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} @@ -261,9 +280,18 @@ class MySQLRecordManager implements RecordManagerInterface { ON DUPLICATE KEY UPDATE \`updated_at\` = VALUES(\`updated_at\`)` // To handle multiple files upsert - for (const record of recordsToUpsert) { - // Consider using a transaction for batch operations - await this.queryRunner.manager.query(query, record.flat()) + try { + for (const record of recordsToUpsert) { + // Consider using a transaction for batch operations + await queryRunner.manager.query(query, record.flat()) + } + + await queryRunner.release() + } catch (error) { + console.error('Error updating in MySQLRecordManager:') + throw error + } finally { + await dataSource.destroy() } } @@ -272,6 +300,9 @@ class MySQLRecordManager implements RecordManagerInterface { return [] } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + // Prepare the placeholders and the query const placeholders = keys.map(() => `?`).join(', ') const query = ` @@ -284,21 +315,27 @@ class MySQLRecordManager implements RecordManagerInterface { try { // Execute the query - const rows = await this.queryRunner.manager.query(query, [this.namespace, ...keys.flat()]) + const rows = await queryRunner.manager.query(query, [this.namespace, ...keys.flat()]) // Create a set of existing keys for faster lookup const existingKeysSet = new Set(rows.map((row: { key: string }) => row.key)) // Map the input keys to booleans indicating if they exist keys.forEach((key, index) => { existsArray[index] = existingKeysSet.has(key) }) + await queryRunner.release() return existsArray } catch (error) { console.error('Error checking existence of keys') - throw error // Allow the caller to handle the error + throw error + } finally { + await dataSource.destroy() } } async listKeys(options?: ListKeyOptions): Promise { + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + try { const { before, after, limit, groupIds } = options ?? {} let query = `SELECT \`key\` FROM \`${this.tableName}\` WHERE \`namespace\` = ?` @@ -330,11 +367,14 @@ class MySQLRecordManager implements RecordManagerInterface { query += ';' // Directly using try/catch with async/await for cleaner flow - const result = await this.queryRunner.manager.query(query, values) + const result = await queryRunner.manager.query(query, values) + await queryRunner.release() return result.map((row: { key: string }) => row.key) } catch (error) { console.error('MySQLRecordManager listKeys Error: ') - throw error // Re-throw the error to be handled by the caller + throw error + } finally { + await dataSource.destroy() } } @@ -343,16 +383,22 @@ class MySQLRecordManager implements RecordManagerInterface { return } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + const placeholders = keys.map(() => '?').join(', ') const query = `DELETE FROM \`${this.tableName}\` WHERE \`namespace\` = ? AND \`key\` IN (${placeholders});` const values = [this.namespace, ...keys].map((v) => (typeof v !== 'string' ? `${v}` : v)) // Directly using try/catch with async/await for cleaner flow try { - await this.queryRunner.manager.query(query, values) + await queryRunner.manager.query(query, values) + await queryRunner.release() } catch (error) { console.error('Error deleting keys') - throw error // Re-throw the error to be handled by the caller + throw error + } finally { + await dataSource.destroy() } } } diff --git a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts index 59f640c6a30..94d154da2c0 100644 --- a/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts +++ b/packages/components/nodes/recordmanager/PostgresRecordManager/PostgresRecordManager.ts @@ -1,7 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base' -import { DataSource, QueryRunner } from 'typeorm' +import { DataSource } from 'typeorm' import { getHost } from '../../vectorstores/Postgres/utils' import { getDatabase, getPort, getTableName } from './utils' @@ -175,29 +175,37 @@ type PostgresRecordManagerOptions = { class PostgresRecordManager implements RecordManagerInterface { lc_namespace = ['langchain', 'recordmanagers', 'postgres'] - - datasource: DataSource - - queryRunner: QueryRunner - + config: PostgresRecordManagerOptions tableName: string - namespace: string constructor(namespace: string, config: PostgresRecordManagerOptions) { - const { postgresConnectionOptions, tableName } = config + const { tableName } = config this.namespace = namespace - this.datasource = new DataSource(postgresConnectionOptions) this.tableName = tableName + this.config = config + } + + private async getDataSource(): Promise { + const { postgresConnectionOptions } = this.config + if (!postgresConnectionOptions) { + throw new Error('No datasource options provided') + } + // Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app + if (postgresConnectionOptions.port === 3006) { + throw new Error('Invalid port number') + } + const dataSource = new DataSource(postgresConnectionOptions) + await dataSource.initialize() + return dataSource } async createSchema(): Promise { try { - const appDataSource = await this.datasource.initialize() - - this.queryRunner = appDataSource.createQueryRunner() + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() - await this.queryRunner.manager.query(` + await queryRunner.manager.query(` CREATE TABLE IF NOT EXISTS "${this.tableName}" ( uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(), key TEXT NOT NULL, @@ -210,6 +218,8 @@ class PostgresRecordManager implements RecordManagerInterface { CREATE INDEX IF NOT EXISTS key_index ON "${this.tableName}" (key); CREATE INDEX IF NOT EXISTS namespace_index ON "${this.tableName}" (namespace); CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) + + await queryRunner.release() } catch (e: any) { // This error indicates that the table already exists // Due to asynchronous nature of the code, it is possible that @@ -223,8 +233,18 @@ class PostgresRecordManager implements RecordManagerInterface { } async getTime(): Promise { - const res = await this.queryRunner.manager.query('SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)') - return Number.parseFloat(res[0].extract) + const dataSource = await this.getDataSource() + try { + const queryRunner = dataSource.createQueryRunner() + const res = await queryRunner.manager.query('SELECT EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)') + await queryRunner.release() + return Number.parseFloat(res[0].extract) + } catch (error) { + console.error('Error getting time in PostgresRecordManager:') + throw error + } finally { + await dataSource.destroy() + } } /** @@ -247,6 +267,9 @@ class PostgresRecordManager implements RecordManagerInterface { return } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + const updatedAt = await this.getTime() const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} @@ -265,7 +288,15 @@ class PostgresRecordManager implements RecordManagerInterface { const valuesPlaceholders = recordsToUpsert.map((_, j) => this.generatePlaceholderForRowAt(j, recordsToUpsert[0].length)).join(', ') const query = `INSERT INTO "${this.tableName}" (key, namespace, updated_at, group_id) VALUES ${valuesPlaceholders} ON CONFLICT (key, namespace) DO UPDATE SET updated_at = EXCLUDED.updated_at;` - await this.queryRunner.manager.query(query, recordsToUpsert.flat()) + try { + await queryRunner.manager.query(query, recordsToUpsert.flat()) + await queryRunner.release() + } catch (error) { + console.error('Error updating in PostgresRecordManager:') + throw error + } finally { + await dataSource.destroy() + } } async exists(keys: string[]): Promise { @@ -273,14 +304,25 @@ class PostgresRecordManager implements RecordManagerInterface { return [] } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + const startIndex = 2 const arrayPlaceholders = keys.map((_, i) => `$${i + startIndex}`).join(', ') const query = ` SELECT k, (key is not null) ex from unnest(ARRAY[${arrayPlaceholders}]) k left join "${this.tableName}" on k=key and namespace = $1; ` - const res = await this.queryRunner.manager.query(query, [this.namespace, ...keys.flat()]) - return res.map((row: { ex: boolean }) => row.ex) + try { + const res = await queryRunner.manager.query(query, [this.namespace, ...keys.flat()]) + await queryRunner.release() + return res.map((row: { ex: boolean }) => row.ex) + } catch (error) { + console.error('Error checking existence of keys in PostgresRecordManager:') + throw error + } finally { + await dataSource.destroy() + } } async listKeys(options?: ListKeyOptions): Promise { @@ -314,8 +356,20 @@ class PostgresRecordManager implements RecordManagerInterface { } query += ';' - const res = await this.queryRunner.manager.query(query, values) - return res.map((row: { key: string }) => row.key) + + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + + try { + const res = await queryRunner.manager.query(query, values) + await queryRunner.release() + return res.map((row: { key: string }) => row.key) + } catch (error) { + console.error('Error listing keys in PostgresRecordManager:') + throw error + } finally { + await dataSource.destroy() + } } async deleteKeys(keys: string[]): Promise { @@ -323,16 +377,19 @@ class PostgresRecordManager implements RecordManagerInterface { return } - const query = `DELETE FROM "${this.tableName}" WHERE namespace = $1 AND key = ANY($2);` - await this.queryRunner.manager.query(query, [this.namespace, keys]) - } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() - /** - * Terminates the connection pool. - * @returns {Promise} - */ - async end(): Promise { - if (this.datasource && this.datasource.isInitialized) await this.datasource.destroy() + try { + const query = `DELETE FROM "${this.tableName}" WHERE namespace = $1 AND key = ANY($2);` + await queryRunner.manager.query(query, [this.namespace, keys]) + await queryRunner.release() + } catch (error) { + console.error('Error deleting keys') + throw error + } finally { + await dataSource.destroy() + } } } diff --git a/packages/components/nodes/recordmanager/SQLiteRecordManager/SQLiteRecordManager.ts b/packages/components/nodes/recordmanager/SQLiteRecordManager/SQLiteRecordManager.ts index daf4b73501c..3246e250ef8 100644 --- a/packages/components/nodes/recordmanager/SQLiteRecordManager/SQLiteRecordManager.ts +++ b/packages/components/nodes/recordmanager/SQLiteRecordManager/SQLiteRecordManager.ts @@ -1,7 +1,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' -import { getBaseClasses } from '../../../src/utils' +import { getBaseClasses, getUserHome } from '../../../src/utils' import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base' -import { DataSource, QueryRunner } from 'typeorm' +import { DataSource } from 'typeorm' import path from 'path' class SQLiteRecordManager_RecordManager implements INode { @@ -19,19 +19,19 @@ class SQLiteRecordManager_RecordManager implements INode { constructor() { this.label = 'SQLite Record Manager' this.name = 'SQLiteRecordManager' - this.version = 1.0 + this.version = 1.1 this.type = 'SQLite RecordManager' this.icon = 'sqlite.png' this.category = 'Record Manager' this.description = 'Use SQLite to keep track of document writes into the vector databases' this.baseClasses = [this.type, 'RecordManager', ...getBaseClasses(SQLiteRecordManager)] this.inputs = [ - { + /*{ label: 'Database File Path', name: 'databaseFilePath', type: 'string', placeholder: 'C:\\Users\\User\\.flowise\\database.sqlite' - }, + },*/ { label: 'Additional Connection Configuration', name: 'additionalConfig', @@ -106,7 +106,6 @@ class SQLiteRecordManager_RecordManager implements INode { const cleanup = nodeData.inputs?.cleanup as string const _sourceIdKey = nodeData.inputs?.sourceIdKey as string const sourceIdKey = _sourceIdKey ? _sourceIdKey : 'source' - const databaseFilePath = nodeData.inputs?.databaseFilePath as string let additionalConfiguration = {} if (additionalConfig) { @@ -117,10 +116,12 @@ class SQLiteRecordManager_RecordManager implements INode { } } + const database = path.join(process.env.DATABASE_PATH ?? path.join(getUserHome(), '.flowise'), 'database.sqlite') + const sqliteOptions = { + database, ...additionalConfiguration, - type: 'sqlite', - database: path.resolve(databaseFilePath) + type: 'sqlite' } const args = { @@ -144,29 +145,33 @@ type SQLiteRecordManagerOptions = { class SQLiteRecordManager implements RecordManagerInterface { lc_namespace = ['langchain', 'recordmanagers', 'sqlite'] - - datasource: DataSource - - queryRunner: QueryRunner - tableName: string - namespace: string + config: SQLiteRecordManagerOptions constructor(namespace: string, config: SQLiteRecordManagerOptions) { - const { sqliteOptions, tableName } = config + const { tableName } = config this.namespace = namespace this.tableName = tableName || 'upsertion_records' - this.datasource = new DataSource(sqliteOptions) + this.config = config + } + + private async getDataSource(): Promise { + const { sqliteOptions } = this.config + if (!sqliteOptions) { + throw new Error('No datasource options provided') + } + const dataSource = new DataSource(sqliteOptions) + await dataSource.initialize() + return dataSource } async createSchema(): Promise { try { - const appDataSource = await this.datasource.initialize() + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() - this.queryRunner = appDataSource.createQueryRunner() - - await this.queryRunner.manager.query(` + await queryRunner.manager.query(` CREATE TABLE IF NOT EXISTS "${this.tableName}" ( uuid TEXT PRIMARY KEY DEFAULT (lower(hex(randomblob(16)))), key TEXT NOT NULL, @@ -179,6 +184,8 @@ CREATE INDEX IF NOT EXISTS updated_at_index ON "${this.tableName}" (updated_at); CREATE INDEX IF NOT EXISTS key_index ON "${this.tableName}" (key); CREATE INDEX IF NOT EXISTS namespace_index ON "${this.tableName}" (namespace); CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) + + await queryRunner.release() } catch (e: any) { // This error indicates that the table already exists // Due to asynchronous nature of the code, it is possible that @@ -192,12 +199,17 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) } async getTime(): Promise { + const dataSource = await this.getDataSource() try { - const res = await this.queryRunner.manager.query(`SELECT strftime('%s', 'now') AS epoch`) + const queryRunner = dataSource.createQueryRunner() + const res = await queryRunner.manager.query(`SELECT strftime('%s', 'now') AS epoch`) + await queryRunner.release() return Number.parseFloat(res[0].epoch) } catch (error) { console.error('Error getting time in SQLiteRecordManager:') throw error + } finally { + await dataSource.destroy() } } @@ -205,6 +217,8 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) if (keys.length === 0) { return } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() const updatedAt = await this.getTime() const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} @@ -231,10 +245,18 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) VALUES (?, ?, ?, ?) ON CONFLICT (key, namespace) DO UPDATE SET updated_at = excluded.updated_at` - // To handle multiple files upsert - for (const record of recordsToUpsert) { - // Consider using a transaction for batch operations - await this.queryRunner.manager.query(query, record.flat()) + try { + // To handle multiple files upsert + for (const record of recordsToUpsert) { + // Consider using a transaction for batch operations + await queryRunner.manager.query(query, record.flat()) + } + await queryRunner.release() + } catch (error) { + console.error('Error updating in SQLiteRecordManager:') + throw error + } finally { + await dataSource.destroy() } } @@ -253,19 +275,25 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) // Initialize an array to fill with the existence checks const existsArray = new Array(keys.length).fill(false) + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + try { // Execute the query - const rows = await this.queryRunner.manager.query(sql, [this.namespace, ...keys.flat()]) + const rows = await queryRunner.manager.query(sql, [this.namespace, ...keys.flat()]) // Create a set of existing keys for faster lookup const existingKeysSet = new Set(rows.map((row: { key: string }) => row.key)) // Map the input keys to booleans indicating if they exist keys.forEach((key, index) => { existsArray[index] = existingKeysSet.has(key) }) + await queryRunner.release() return existsArray } catch (error) { console.error('Error checking existence of keys') throw error // Allow the caller to handle the error + } finally { + await dataSource.destroy() } } @@ -299,13 +327,19 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) query += ';' + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + // Directly using try/catch with async/await for cleaner flow try { - const result = await this.queryRunner.manager.query(query, values) + const result = await queryRunner.manager.query(query, values) + await queryRunner.release() return result.map((row: { key: string }) => row.key) } catch (error) { console.error('Error listing keys.') throw error // Re-throw the error to be handled by the caller + } finally { + await dataSource.destroy() } } @@ -314,16 +348,22 @@ CREATE INDEX IF NOT EXISTS group_id_index ON "${this.tableName}" (group_id);`) return } + const dataSource = await this.getDataSource() + const queryRunner = dataSource.createQueryRunner() + const placeholders = keys.map(() => '?').join(', ') const query = `DELETE FROM "${this.tableName}" WHERE namespace = ? AND key IN (${placeholders});` const values = [this.namespace, ...keys].map((v) => (typeof v !== 'string' ? `${v}` : v)) // Directly using try/catch with async/await for cleaner flow try { - await this.queryRunner.manager.query(query, values) + await queryRunner.manager.query(query, values) + await queryRunner.release() } catch (error) { console.error('Error deleting keys') throw error // Re-throw the error to be handled by the caller + } finally { + await dataSource.destroy() } } } diff --git a/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts b/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts index 39ec62ad9e7..727d7f31379 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/PGVector.ts @@ -33,6 +33,11 @@ export class PGVectorDriver extends VectorStoreDriver { password: password, database: this.getDatabase() } + + // Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app + if (this.getHost() === '3006') { + throw new Error('Invalid port number') + } } return this._postgresConnectionOptions diff --git a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts index 0217713b13f..65593499462 100644 --- a/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts +++ b/packages/components/nodes/vectorstores/Postgres/driver/TypeORM.ts @@ -34,6 +34,11 @@ export class TypeORMDriver extends VectorStoreDriver { password: password, database: this.getDatabase() } as DataSourceOptions + + // Prevent using default MySQL port, otherwise will throw uncaught error and crashing the app + if (this.getHost() === '3006') { + throw new Error('Invalid port number') + } } return this._postgresConnectionOptions }