diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index d3c0266b0..a6017d96c 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -151,6 +151,9 @@ class Cursor extends EventEmitter { } handleError(msg) { + // If this cursor has already closed, don't try to handle the error. + if (this.state === 'done') return + // If we're in an initialized state we've never been submitted // and don't have a connection instance reference yet. // This can happen if you queue a stream and close the client before diff --git a/packages/pg-query-stream/test/error.ts b/packages/pg-query-stream/test/error.ts index 220a52485..9f1d136cf 100644 --- a/packages/pg-query-stream/test/error.ts +++ b/packages/pg-query-stream/test/error.ts @@ -89,4 +89,85 @@ describe('error recovery', () => { await client.end() }) }) + + it('should work if used after timeout error', async () => { + const pool = new Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 }) + + const res1 = await pool.query('SELECT 1 AS a') + assert.deepStrictEqual(res1.rows, [{ a: 1 }]) + + const query = new QueryStream('SELECT 2 AS b') + const client = await pool.connect() + const stream = await client.query(query) + + await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' }) + + await stream.destroy() + await client.release() + + const res2 = await pool.query('SELECT 4 AS d') + assert.deepStrictEqual(res2.rows, [{ d: 4 }]) + + await pool.end() + }) + + it('should work if used after syntax error', async () => { + const pool = new Pool({ max: 1, statement_timeout: 100 }) // statement_timeout is required here, so maybe this is just another timeout error? + + const res1 = await pool.query('SELECT 1 AS a') + assert.deepStrictEqual(res1.rows, [{ a: 1 }]) + + const query = new QueryStream('SELECT 2 AS b') + const client = await pool.connect() + const stream = await client.query(query) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + await stream.destroy() + await client.release() + + const res2 = await pool.query('SELECT 4 AS d') + assert.deepStrictEqual(res2.rows, [{ d: 4 }]) + + await pool.end() + }) + + it('should work after cancelling query', async () => { + const pool = new Pool() + const conn = await pool.connect() + + // Get connection PID for sake of pg_cancel_backend() call + const result = await conn.query('SELECT pg_backend_pid() AS pid;') + const { pid } = result.rows[0] + + const stream = conn.query(new QueryStream('SELECT pg_sleep(10);')) + stream.on('data', (chunk) => { + // Switches stream into readableFlowing === true mode + }) + stream.on('error', (err) => { + // Errors are expected due to pg_cancel_backend() call + }) + + // Create a promise that is resolved when the stream is closed + const closed = new Promise((res) => { + stream.on('close', res) + }) + + // Wait 100ms before cancelling the query + await new Promise((res) => setTimeout(res, 100)) + + // Cancel pg_sleep(10) query + await pool.query('SELECT pg_cancel_backend($1);', [pid]) + + // Destroy stream and wait for it to be closed + stream.destroy() + await closed + + // Subsequent query on same connection should succeed + const res = await conn.query('SELECT 1 AS a;') + assert.deepStrictEqual(res.rows, [{ a: 1 }]) + + conn.release() + await pool.end() + }) })