diff --git a/src/error.ts b/src/error.ts index c728e03d8..5e1da789e 100644 --- a/src/error.ts +++ b/src/error.ts @@ -174,3 +174,10 @@ export class DBOSDeadLetterQueueError extends DBOSError { super(`Workflow ${workflowUUID} has been moved to the dead-letter queue after exceeding the maximum of ${maxRetries} retries`, DeadLetterQueueError); } } + +const FailedSqlTransactionError = 19; +export class DBOSFailedSqlTransactionError extends DBOSError { + constructor(workflowUUID: string, txnName: string) { + super(`Postgres aborted the ${txnName} transaction of Workflow ${workflowUUID}.`, FailedSqlTransactionError); + } +} diff --git a/src/user_database.ts b/src/user_database.ts index 361f9375b..dc2f8a0b0 100644 --- a/src/user_database.ts +++ b/src/user_database.ts @@ -23,6 +23,8 @@ export interface UserDatabase { isRetriableTransactionError(error: unknown): boolean; // Is a database error caused by a key conflict (key constraint violation or serialization error)? isKeyConflictError(error: unknown): boolean; + // Is the database error caused by a failed or aported transaciton? + isFailedSqlTransactionError(error: unknown): boolean; // Not all databases support this, TypeORM can. // Drop the user database tables (for testing) @@ -164,6 +166,13 @@ export class PGNodeUserDatabase implements UserDatabase { return pge === "23505"; } + isFailedSqlTransactionError(error: unknown): boolean { + if (!(error instanceof PGDatabaseError)) { + return false; + } + return this.getPostgresErrorCode(error) === "25P02"; + } + async createSchema(): Promise { return Promise.reject(new Error("createSchema() is not supported in PG user database.")); } @@ -268,8 +277,11 @@ export class PrismaUserDatabase implements UserDatabase { } isKeyConflictError(error: unknown): boolean { - const pge = this.getPostgresErrorCode(error); - return pge === "23505"; + return this.getPostgresErrorCode(error) === "23505"; + } + + isFailedSqlTransactionError(error: unknown): boolean { + return this.getPostgresErrorCode(error) === "25P02"; } async createSchema(): Promise { @@ -385,8 +397,11 @@ export class TypeORMDatabase implements UserDatabase { } isKeyConflictError(error: unknown): boolean { - const pge = this.getPostgresErrorCode(error); - return pge === "23505"; + return this.getPostgresErrorCode(error) === "23505"; + } + + isFailedSqlTransactionError(error: unknown): boolean { + return this.getPostgresErrorCode(error) === "25P02"; } async createSchema(): Promise { @@ -482,8 +497,11 @@ export class KnexUserDatabase implements UserDatabase { } isKeyConflictError(error: unknown): boolean { - const pge = this.getPostgresErrorCode(error); - return pge === "23505"; + return this.getPostgresErrorCode(error) === "23505"; + } + + isFailedSqlTransactionError(error: unknown): boolean { + return this.getPostgresErrorCode(error) === "25P02"; } async createSchema(): Promise { @@ -590,8 +608,11 @@ export class DrizzleUserDatabase implements UserDatabase { } isKeyConflictError(error: unknown): boolean { - const pge = this.getPostgresErrorCode(error); - return pge === "23505"; + return this.getPostgresErrorCode(error) === "23505"; + } + + isFailedSqlTransactionError(error: unknown): boolean { + return this.getPostgresErrorCode(error) === "25P02"; } async createSchema(): Promise { diff --git a/src/workflow.ts b/src/workflow.ts index 2b727e5a9..cd7923aa3 100644 --- a/src/workflow.ts +++ b/src/workflow.ts @@ -3,7 +3,7 @@ import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "./dbos-executor import { transaction_outputs } from "../schemas/user_db_schema"; import { IsolationLevel, Transaction, TransactionContext, TransactionContextImpl } from "./transaction"; import { StepFunction, StepContext, StepContextImpl } from "./step"; -import { DBOSError, DBOSNotRegisteredError, DBOSWorkflowConflictUUIDError } from "./error"; +import { DBOSError, DBOSFailedSqlTransactionError, DBOSNotRegisteredError, DBOSWorkflowConflictUUIDError } from "./error"; import { serializeError, deserializeError } from "serialize-error"; import { DBOSJSON, sleepms } from "./utils"; import { SystemDatabase } from "./system_database"; @@ -281,7 +281,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont * Write all entries in the workflow result buffer to the database. * If it encounters a primary key error, this indicates a concurrent execution with the same UUID, so throw an DBOSError. */ - async #flushResultBuffer(query: QueryFunction, isKeyConflict: (error: unknown)=> boolean): Promise { + async #flushResultBuffer(query: QueryFunction, isKeyConflict: (error: unknown) => boolean): Promise { const funcIDs = Array.from(this.resultBuffer.keys()); if (funcIDs.length === 0) { return; @@ -330,7 +330,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont /** * Write a operation's output to the database. */ - async #recordOutput(query: QueryFunction, funcID: number, txnSnapshot: string, output: R, isKeyConflict: (error: unknown)=> boolean): Promise { + async #recordOutput(query: QueryFunction, funcID: number, txnSnapshot: string, output: R, isKeyConflict: (error: unknown) => boolean): Promise { try { const serialOutput = DBOSJSON.stringify(output); const rows = await query("INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, txn_id, txn_snapshot, created_at) VALUES ($1, $2, $3, (select pg_current_xact_id_if_assigned()::text), $4, $5) RETURNING txn_id;", @@ -359,7 +359,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont /** * Record an error in an operation to the database. */ - async #recordError(query: QueryFunction, funcID: number, txnSnapshot: string, err: Error, isKeyConflict: (error: unknown)=> boolean): Promise { + async #recordError(query: QueryFunction, funcID: number, txnSnapshot: string, err: Error, isKeyConflict: (error: unknown) => boolean): Promise { try { const serialErr = DBOSJSON.stringify(serializeError(err)); await query("INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, error, txn_id, txn_snapshot, created_at) VALUES ($1, $2, $3, null, $4, $5) RETURNING txn_id;", @@ -661,6 +661,7 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont while (true) { let txn_snapshot = "invalid"; + const workflowUUID = this.workflowUUID; const wrappedTransaction = async (client: UserDatabaseClient): Promise => { const tCtxt = new TransactionContextImpl( this.#dbosExec.userDatabase.getName(), client, this, @@ -700,10 +701,19 @@ export class WorkflowContextImpl extends DBOSContextImpl implements WorkflowCont } this.resultBuffer.set(funcId, readOutput); } else { - // Synchronously record the output of write transactions and obtain the transaction ID. - const pg_txn_id = await this.recordOutputTx(client, funcId, txn_snapshot, result); - tCtxt.span.setAttribute("pg_txn_id", pg_txn_id); - this.resultBuffer.clear(); + try { + // Synchronously record the output of write transactions and obtain the transaction ID. + const pg_txn_id = await this.recordOutputTx(client, funcId, txn_snapshot, result); + tCtxt.span.setAttribute("pg_txn_id", pg_txn_id); + this.resultBuffer.clear(); + } catch (error) { + if (this.#dbosExec.userDatabase.isFailedSqlTransactionError(error)) { + this.logger.error(`Postgres aborted the ${txn.name} @Transaction of Workflow ${workflowUUID}, but the function did not raise an exception. Please ensure that the @Transaction method raises an exception if the database transaction is aborted.`); + throw new DBOSFailedSqlTransactionError(workflowUUID, txn.name) + } else { + throw error; + } + } } return result; diff --git a/tests/dbos.test.ts b/tests/dbos.test.ts index 20c76b818..f4eeeb3c6 100644 --- a/tests/dbos.test.ts +++ b/tests/dbos.test.ts @@ -6,6 +6,7 @@ import { DBOSConfig } from "../src/dbos-executor"; import { PoolClient } from "pg"; import { TestingRuntime, TestingRuntimeImpl, createInternalTestRuntime } from "../src/testing/testing_runtime"; import { transaction_outputs } from "../schemas/user_db_schema"; +import { DBOSFailedSqlTransactionError } from "../src/error"; type TestTransactionContext = TransactionContext; const testTableName = "dbos_test_kv"; @@ -231,6 +232,13 @@ describe("dbos-tests", () => { status: StatusString.SUCCESS, }); }); + + test("aborted-transaction", async () => { + const workflowUUID: string = uuidv1(); + await expect(testRuntime.invoke(DBOSTestClass, workflowUUID).attemptToCatchAbortingStoredProc()) + .rejects + .toThrow(new DBOSFailedSqlTransactionError(workflowUUID, "attemptToCatchAbortingStoredProc")); + }) }); class DBOSTestClass { @@ -244,6 +252,11 @@ class DBOSTestClass { expect(_ctx.getConfig("counter")).toBe(3); await _ctx.queryUserDB(`DROP TABLE IF EXISTS ${testTableName};`); await _ctx.queryUserDB(`CREATE TABLE IF NOT EXISTS ${testTableName} (id SERIAL PRIMARY KEY, value TEXT);`); + await _ctx.queryUserDB(`CREATE OR REPLACE FUNCTION test_proc_raise() returns void as $$ + BEGIN + raise 'something bad happened'; + END + $$ language plpgsql;`); } @Transaction() @@ -296,6 +309,15 @@ class DBOSTestClass { } } + @Transaction() + static async attemptToCatchAbortingStoredProc(txnCtxt: TestTransactionContext) { + try { + return await txnCtxt.client.query("select xx()"); + } catch (e) { + return "all good" + } + } + @Workflow() static async testFailWorkflow(workflowCtxt: WorkflowContext, name: string) { expect(DBOSTestClass.initialized).toBe(true); @@ -341,4 +363,5 @@ class DBOSTestClass { return 0; } + }