diff --git a/.gitignore b/.gitignore index 6440537ad..37341edbb 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ redis/ *.log /indexer/blocks/ +node_modules/ \ No newline at end of file diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 38d637d07..e0c92c057 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -2,18 +2,27 @@ import pgFormat from 'pg-format'; import DmlHandler from './dml-handler'; describe('DML Handler tests', () => { - let pgClient: any; + const hasuraClient: any = { + getDbConnectionParameters: jest.fn().mockReturnValue({ + database: 'test_near', + host: 'postgres', + password: 'test_pass', + port: 5432, + username: 'test_near' + }) + }; + let PgClient: any; + let query: any; const ACCOUNT = 'test_near'; const SCHEMA = 'test_schema'; const TABLE_NAME = 'test_table'; beforeEach(() => { - pgClient = { - setUser: jest.fn(), - query: jest.fn().mockReturnValue({ rows: [] }), - format: pgFormat, - }; + query = jest.fn().mockReturnValue({ rows: [] }); + PgClient = jest.fn().mockImplementation(() => { + return { query, format: pgFormat }; + }); }); test('Test valid insert one with array', async () => { @@ -26,10 +35,10 @@ describe('DML Handler tests', () => { accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) }; - const dmlHandler = new DmlHandler(pgClient); + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); - await dmlHandler.insert(ACCOUNT, SCHEMA, TABLE_NAME, [inputObj]); - expect(pgClient.query.mock.calls).toEqual([ + await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + expect(query.mock.calls).toEqual([ ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []] ]); }); @@ -46,10 +55,10 @@ describe('DML Handler tests', () => { receipt_id: 'abc', }]; - const dmlHandler = new DmlHandler(pgClient); + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); - await dmlHandler.insert(ACCOUNT, SCHEMA, TABLE_NAME, [inputObj]); - expect(pgClient.query.mock.calls).toEqual([ + await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + expect(query.mock.calls).toEqual([ ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []] ]); }); @@ -60,10 +69,10 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = new DmlHandler(pgClient); + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); - await dmlHandler.select(ACCOUNT, SCHEMA, TABLE_NAME, inputObj, 0); - expect(pgClient.query.mock.calls).toEqual([ + await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj); + expect(query.mock.calls).toEqual([ ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2', Object.values(inputObj)] ]); }); @@ -74,10 +83,10 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = new DmlHandler(pgClient); + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); - await dmlHandler.select(ACCOUNT, SCHEMA, TABLE_NAME, inputObj, 1); - expect(pgClient.query.mock.calls).toEqual([ + await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1); + expect(query.mock.calls).toEqual([ ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)] ]); }); diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index d8b1c1fa0..45dbdcb73 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -1,26 +1,35 @@ import { wrapError } from '../utility'; -import PgClient from '../pg-client'; - -const sharedPgClient = new PgClient({ - user: process.env.PGUSER, - password: process.env.PGPASSWORD, - database: process.env.PGDATABASE, - host: process.env.PGHOST, - port: Number(process.env.PGPORT), -}); +import PgClientModule from '../pg-client'; +import HasuraClient from '../hasura-client/hasura-client'; export default class DmlHandler { + private pgClient!: PgClientModule; + private readonly initialized: Promise; + constructor ( - private readonly pgClient: PgClient = sharedPgClient, + private readonly account: string, + private readonly hasuraClient: HasuraClient = new HasuraClient(), + private readonly PgClient = PgClientModule, ) { - this.pgClient = pgClient; + this.initialized = this.initialize(); + } + + private async initialize (): Promise { + const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account); + this.pgClient = new this.PgClient({ + user: connectionParameters.username, + password: connectionParameters.password, + host: process.env.PGHOST, + port: Number(connectionParameters.port), + database: connectionParameters.database, + }); } - async insert (account: string, schemaName: string, tableName: string, objects: any[]): Promise { + async insert (schemaName: string, tableName: string, objects: any[]): Promise { + await this.initialized; // Ensure constructor completed before proceeding if (!objects?.length) { return []; } - await this.pgClient.setUser(account); // Set Postgres user to account's user const keys = Object.keys(objects[0]); // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order @@ -34,16 +43,15 @@ export default class DmlHandler { return result.rows; } - async select (account: string, schemaName: string, tableName: string, object: any, limit: number): Promise { - await this.pgClient.setUser(account); // Set Postgres user to account's user + async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise { + await this.initialized; // Ensure constructor completed before proceeding - const roundedLimit = Math.round(limit); const keys = Object.keys(object); const values = Object.values(object); const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); let query = `SELECT * FROM ${schemaName}.${tableName} WHERE ${param}`; - if (roundedLimit > 0) { - query = query.concat(' LIMIT ', roundedLimit.toString()); + if (limit !== null) { + query = query.concat(' LIMIT ', Math.round(limit).toString()); } const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); diff --git a/runner/src/hasura-client/hasura-client.test.ts b/runner/src/hasura-client/hasura-client.test.ts index 56a19cef2..d4b621886 100644 --- a/runner/src/hasura-client/hasura-client.test.ts +++ b/runner/src/hasura-client/hasura-client.test.ts @@ -244,4 +244,80 @@ describe('HasuraClient', () => { expect(mockFetch).toBeCalledTimes(1); // to fetch the foreign keys }); + + it('returns connection parameters for valid and invalid users', async () => { + const testUsers = { + testA_near: 'passA', + testB_near: 'passB', + testC_near: 'passC' + }; + const TEST_METADATA = generateMetadata(testUsers); + const mockFetch = jest + .fn() + .mockResolvedValue({ + status: 200, + text: () => JSON.stringify({ metadata: TEST_METADATA }) + }); + const client = new HasuraClient({ fetch: mockFetch as unknown as typeof fetch }); + const result = await client.getDbConnectionParameters('testB_near'); + expect(result).toEqual(generateConnectionParameter('testB_near', 'passB')); + await expect(client.getDbConnectionParameters('fake_near')).rejects.toThrow('Could not find connection parameters for user fake_near on respective database.'); + }); }); + +function generateMetadata (testUsers: any): any { + const sources = []; + // Insert default source which has different format than the rest + sources.push({ + name: 'default', + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, + isolation_level: 'read-committed', + pool_settings: { + connection_lifetime: 600, + idle_timeout: 180, + max_connections: 50, + retries: 1 + }, + use_prepared_statements: true + } + } + }); + + Object.keys(testUsers).forEach((user) => { + sources.push(generateSource(user, testUsers[user])); + }); + + return { + version: 3, + sources + }; +} + +function generateSource (user: string, password: string): any { + return { + name: user, + kind: 'postgres', + tables: [], + configuration: { + connection_info: { + database_url: { connection_parameters: generateConnectionParameter(user, password) }, + isolation_level: 'read-committed', + use_prepared_statements: false + } + } + }; +} + +function generateConnectionParameter (user: string, password: string): any { + return { + database: user, + host: 'postgres', + password, + port: 5432, + username: user + }; +} diff --git a/runner/src/hasura-client/hasura-client.ts b/runner/src/hasura-client/hasura-client.ts index 4edf6f1bb..0162c4b41 100644 --- a/runner/src/hasura-client/hasura-client.ts +++ b/runner/src/hasura-client/hasura-client.ts @@ -99,6 +99,15 @@ export default class HasuraClient { return metadata; } + async getDbConnectionParameters (account: string): Promise { + const metadata = await this.exportMetadata(); + const source = metadata.sources.find((source: { name: any, configuration: any }) => source.name === account); + if (source === undefined) { + throw new Error(`Could not find connection parameters for user ${account} on respective database.`); + } + return source.configuration.connection_info.database_url.connection_parameters; + } + async doesSourceExist (source: string): Promise { const metadata = await this.exportMetadata(); return metadata.sources.filter(({ name }: { name: string }) => name === source).length > 0; diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 4ed5e5c61..2ace38cc2 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -16,52 +16,53 @@ describe('Indexer unit tests', () => { const INDEXER_NAME = 'morgs.near/test_fn'; const SIMPLE_SCHEMA = `CREATE TABLE - "posts" ( - "id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - "content" TEXT NOT NULL, - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "accounts_liked" JSONB NOT NULL DEFAULT '[]', - "last_comment_timestamp" DECIMAL(20, 0), - CONSTRAINT "posts_pkey" PRIMARY KEY ("id") - );`; - - const SOCIAL_SCHEMA = `CREATE TABLE - "posts" ( - "id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - "content" TEXT NOT NULL, - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "accounts_liked" JSONB NOT NULL DEFAULT '[]', - "last_comment_timestamp" DECIMAL(20, 0), - CONSTRAINT "posts_pkey" PRIMARY KEY ("id") - ); - -CREATE TABLE - "comments" ( - "id" SERIAL NOT NULL, - "post_id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0) NOT NULL, - "content" TEXT NOT NULL, - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - CONSTRAINT "comments_pkey" PRIMARY KEY ("id") - ); - -CREATE TABLE - "post_likes" ( - "post_id" SERIAL NOT NULL, - "account_id" VARCHAR NOT NULL, - "block_height" DECIMAL(58, 0), - "block_timestamp" DECIMAL(20, 0) NOT NULL, - "receipt_id" VARCHAR NOT NULL, - CONSTRAINT "post_likes_pkey" PRIMARY KEY ("post_id", "account_id") - );'`; + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + "last_comment_timestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + );`; + + const SOCIAL_SCHEMA = ` + CREATE TABLE + "posts" ( + "id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "accounts_liked" JSONB NOT NULL DEFAULT '[]', + "last_comment_timestamp" DECIMAL(20, 0), + CONSTRAINT "posts_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "comments" ( + "id" SERIAL NOT NULL, + "post_id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0) NOT NULL, + "content" TEXT NOT NULL, + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + CONSTRAINT "comments_pkey" PRIMARY KEY ("id") + ); + + CREATE TABLE + "post_likes" ( + "post_id" SERIAL NOT NULL, + "account_id" VARCHAR NOT NULL, + "block_height" DECIMAL(58, 0), + "block_timestamp" DECIMAL(20, 0) NOT NULL, + "receipt_id" VARCHAR NOT NULL, + CONSTRAINT "post_likes_pkey" PRIMARY KEY ("post_id", "account_id") + );'`; beforeEach(() => { process.env = { @@ -364,15 +365,11 @@ CREATE TABLE }); test('indexer builds context and inserts an objects into existing table', async () => { - // Set HASURA_ENDPOINT and HASURA_ADMIN_SECRET values in process.env before running with actual mockDmlHandler - // process.env.HASURA_ENDPOINT = ''; - // process.env.HASURA_ADMIN_SECRET = ''; - - const mockDmlHandler: any = { - insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) - }; + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }); - const indexer = new Indexer('mainnet', { dmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -397,19 +394,16 @@ CREATE TABLE }); test('indexer builds context and selects objects from existing table', async () => { - // Set HASURA_ENDPOINT and HASURA_ADMIN_SECRET values in process.env before running with actual mockDmlHandler - // process.env.HASURA_ENDPOINT = ''; - // process.env.HASURA_ADMIN_SECRET = ''; const selectFn = jest.fn(); selectFn.mockImplementation((...lim) => { // Expects limit to be last parameter - return lim[lim.length - 1] === 0 ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; + return lim[lim.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; + }); + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { select: selectFn }; }); - const mockDmlHandler: any = { - select: selectFn - }; - const indexer = new Indexer('mainnet', { dmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToSelect = { @@ -423,29 +417,18 @@ CREATE TABLE }); test('indexer builds context and verifies all methods generated', async () => { - const mockDmlHandler: any = { - insert: jest.fn().mockReturnValue(true), - select: jest.fn().mockReturnValue(true) - }; + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { + insert: jest.fn().mockReturnValue(true), + select: jest.fn().mockReturnValue(true) + }; + }); - const indexer = new Indexer('mainnet', { dmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); - const objToPassIn = {}; // These calls would fail on a real database, but we are merely checking to ensure they exist - let result = await context.db.insert_posts([objToPassIn]); - expect(result).toEqual(true); - result = await context.db.insert_comments(objToPassIn); // Verifying both array and single object input is supported for insert - expect(result).toEqual(true); - result = await context.db.insert_post_likes([objToPassIn]); - expect(result).toEqual(true); - - result = await context.db.select_posts(objToPassIn); - expect(result).toEqual(true); - result = await context.db.select_comments(objToPassIn); - expect(result).toEqual(true); - result = await context.db.select_post_likes(objToPassIn); - expect(result).toEqual(true); + expect(Object.keys(context.db)).toStrictEqual(['insert_posts', 'select_posts', 'insert_comments', 'select_comments', 'insert_post_likes', 'select_post_likes']); }); test('Indexer.runFunctions() allows imperative execution of GraphQL operations', async () => { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 9fba078ae..4cae6a0ca 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -10,7 +10,7 @@ interface Dependencies { fetch: typeof fetch s3: AWS.S3 provisioner: Provisioner - dmlHandler: DmlHandler + DmlHandler: typeof DmlHandler }; interface Context { @@ -44,7 +44,7 @@ export default class Indexer { fetch, s3: new AWS.S3(), provisioner: new Provisioner(), - dmlHandler: new DmlHandler(), + DmlHandler, ...deps, }; } @@ -242,15 +242,18 @@ export default class Indexer { } buildDatabaseContext (account: string, schemaName: string, tables: string[], blockHeight: number): Record any> { + let dmlHandler: DmlHandler | null = null; const result = tables.reduce((prev, tableName) => ({ ...prev, [`insert_${tableName}`]: async (objects: any) => { await this.writeLog(`context.db.insert_${tableName}`, blockHeight, `Calling context.db.insert_${tableName}.`, `Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`); - return await this.deps.dmlHandler.insert(account, schemaName, tableName, Array.isArray(objects) ? objects : [objects]); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.insert(schemaName, tableName, Array.isArray(objects) ? objects : [objects]); }, - [`select_${tableName}`]: async (object: any, limit = 0) => { - await this.writeLog(`context.db.select_${tableName}`, blockHeight, `Calling context.db.select_${tableName}.`, `Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with limit ${limit === 0 ? 'no' : limit}`); - return await this.deps.dmlHandler.select(account, schemaName, tableName, object, limit); + [`select_${tableName}`]: async (object: any, limit = null) => { + await this.writeLog(`context.db.select_${tableName}`, blockHeight, `Calling context.db.select_${tableName}.`, `Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with limit ${limit === null ? 'no' : limit}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.select(schemaName, tableName, object, limit); }, }), {}); return result; diff --git a/runner/src/pg-client.ts b/runner/src/pg-client.ts new file mode 100644 index 000000000..ebca73a49 --- /dev/null +++ b/runner/src/pg-client.ts @@ -0,0 +1,41 @@ +import { Pool, type PoolConfig, type QueryResult, type QueryResultRow } from 'pg'; +import pgFormatModule from 'pg-format'; + +interface ConnectionParams { + user: string + password: string + host: string + port: number | string + database: string +} + +export default class PgClient { + private readonly pgPool: Pool; + public format: typeof pgFormatModule; + + constructor ( + connectionParams: ConnectionParams, + poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 }, + PgPool: typeof Pool = Pool, + pgFormat: typeof pgFormatModule = pgFormatModule + ) { + this.pgPool = new PgPool({ + user: connectionParams.user, + password: connectionParams.password, + host: connectionParams.host, + port: Number(connectionParams.port), + database: connectionParams.database, + ...poolConfig, + }); + this.format = pgFormat; + } + + async query(query: string, params: any[] = []): Promise> { + const client = await this.pgPool.connect(); + try { + return await (client.query(query, params)); + } finally { + client.release(); + } + } +} diff --git a/runner/src/pg-client/index.ts b/runner/src/pg-client/index.ts deleted file mode 100644 index 6d8451c58..000000000 --- a/runner/src/pg-client/index.ts +++ /dev/null @@ -1 +0,0 @@ -export { default } from './pg-client'; diff --git a/runner/src/pg-client/pg-client.test.ts b/runner/src/pg-client/pg-client.test.ts deleted file mode 100644 index b6897d73c..000000000 --- a/runner/src/pg-client/pg-client.test.ts +++ /dev/null @@ -1,89 +0,0 @@ -import PgClient from './pg-client'; - -describe('Postgres Client Tests', () => { - let hasuraClient: any; - const testUsers = { - testA_near: 'passA', - testB_near: 'passB', - testC_near: 'passC' - }; - const TEST_METADATA = generateMetadata(testUsers); - const testPgClient = { - user: 'user', - password: 'password', - database: 'database', - host: 'host', - port: 'port', - }; - - beforeEach(() => { - hasuraClient = { - exportMetadata: jest.fn().mockReturnValue(TEST_METADATA) - }; - }); - - test('Test set user', async () => { - const pgClient = new PgClient(testPgClient, hasuraClient); - await pgClient.setUser('testA_near'); - await expect(pgClient.setUser('fake_near')).rejects.toThrow('Could not find password for user fake_near when trying to set user account to process database actions.'); - }); -}); - -function generateMetadata (testUsers: any): any { - const sources = []; - // Insert default source which has different format than the rest - sources.push({ - name: 'default', - kind: 'postgres', - tables: [], - configuration: { - connection_info: { - database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' }, - isolation_level: 'read-committed', - pool_settings: { - connection_lifetime: 600, - idle_timeout: 180, - max_connections: 50, - retries: 1 - }, - use_prepared_statements: true - } - } - }); - - Object.keys(testUsers).forEach((user) => { - sources.push(generateSource(user, testUsers[user])); - }); - - console.log(sources); - - return { - version: 3, - sources - }; -} - -function generateSource (user: string, password: string): any { - return { - name: user, - kind: 'postgres', - tables: [], - configuration: { - connection_info: { - database_url: { connection_parameters: generateConnectionParameter(user, password) }, - isolation_level: 'read-committed', - use_prepared_statements: false - } - } - }; -} - -function generateConnectionParameter (user: string, password: string): any { - return { - database: user, - host: 'postgres', - password, - port: 5432, - username: user - }; -} diff --git a/runner/src/pg-client/pg-client.ts b/runner/src/pg-client/pg-client.ts deleted file mode 100644 index 88ee0f9cb..000000000 --- a/runner/src/pg-client/pg-client.ts +++ /dev/null @@ -1,83 +0,0 @@ -import { Pool, type PoolConfig, type QueryResult, type QueryResultRow } from 'pg'; -import pgFormatModule from 'pg-format'; -import HasuraClient from '../hasura-client'; - -interface ConnectionParams { - user: string - password: string - host: string - port: number | string - database: string -} - -export default class PgClient { - private readonly connectionParams: ConnectionParams; - private readonly hasuraClient: HasuraClient; - private readonly poolConfig: PoolConfig; - private pgPool: Pool; - public format: typeof pgFormatModule; - private userPasswords: Record; - - constructor ( - connectionParams: ConnectionParams, - hasuraClient: HasuraClient = new HasuraClient(), - poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 }, - PgPool: typeof Pool = Pool, - pgFormat: typeof pgFormatModule = pgFormatModule - ) { - this.connectionParams = connectionParams; - this.hasuraClient = hasuraClient; - this.poolConfig = poolConfig; - this.pgPool = new PgPool({ - user: connectionParams.user, - password: connectionParams.password, - host: connectionParams.host, - port: Number(connectionParams.port), - database: connectionParams.database, - ...poolConfig, - }); - this.format = pgFormat; - this.userPasswords = {}; - } - - private async collectPasswords (): Promise { - const metadata = await this.hasuraClient.exportMetadata(); - console.log(metadata.sources[2].configuration.connection_info.database_url.connection_parameters); - this.userPasswords = metadata.sources.reduce((prev: any, source: { name: any, configuration: any }) => ({ - ...prev, - [source.name]: source.name === 'default' ? 'N/A' : source.configuration.connection_info.database_url.connection_parameters.password - }), {}); - } - - async setUser (user: string): Promise { - if (Object.keys(this.userPasswords).length === 0) { - console.log('Collecting passwords for each user.'); - await this.collectPasswords(); - } - - const newUser = user === 'admin' ? this.connectionParams.user : user; - const newPassword = user === 'admin' ? this.connectionParams.password : this.userPasswords[user]; - - if (newPassword === undefined) { - throw new Error(`Could not find password for user ${user} when trying to set user account to process database actions.`); - } - - this.pgPool = new Pool({ - user: newUser, - password: newPassword, - host: this.connectionParams.host, - port: Number(this.connectionParams.port), - database: this.connectionParams.database, - ...this.poolConfig, - }); - } - - async query(query: string, params: any[] = []): Promise> { - const client = await this.pgPool.connect(); - try { - return await (client.query(query, params)); - } finally { - client.release(); - } - } -}