diff --git a/packages/cli/src/WorkflowRunner.ts b/packages/cli/src/WorkflowRunner.ts index 448f7c7f5e57f..a14bccb0fefe9 100644 --- a/packages/cli/src/WorkflowRunner.ts +++ b/packages/cli/src/WorkflowRunner.ts @@ -86,6 +86,24 @@ export class WorkflowRunner { ) { ErrorReporter.error(error); + const isQueueMode = config.getEnv('executions.mode') === 'queue'; + + // in queue mode, first do a sanity run for the edge case that the execution was not marked as stalled + // by Bull even though it executed successfully, see https://github.com/OptimalBits/bull/issues/1415 + + if (isQueueMode && executionMode !== 'manual') { + const executionWithoutData = await Container.get(ExecutionRepository).findSingleExecution( + executionId, + { + includeData: false, + }, + ); + if (executionWithoutData?.finished === true && executionWithoutData?.status === 'success') { + // false positive, execution was successful + return; + } + } + const fullRunData: IRun = { data: { resultData: { diff --git a/packages/cli/test/unit/WorkflowRunner.test.ts b/packages/cli/test/unit/WorkflowRunner.test.ts new file mode 100644 index 0000000000000..b950046b58a3a --- /dev/null +++ b/packages/cli/test/unit/WorkflowRunner.test.ts @@ -0,0 +1,83 @@ +import type { User } from '@db/entities/User'; +import * as testDb from '../integration/shared/testDb'; +import * as utils from '../integration/shared/utils/'; +import { createWorkflow, createExecution } from '../integration/shared/testDb'; +import { WorkflowRunner } from '@/WorkflowRunner'; +import { WorkflowHooks, type ExecutionError, type IWorkflowExecuteHooks } from 'n8n-workflow'; +import { Push } from '../../src/push'; +import { mockInstance } from '../integration/shared/utils'; +import Container from 'typedi'; +import config from '../../src/config'; + +let owner: User; +let runner: WorkflowRunner; +let hookFunctions: IWorkflowExecuteHooks; +utils.setupTestServer({ endpointGroups: [] }); + +class Watchers { + workflowExecuteAfter = jest.fn(); +} +const watchers = new Watchers(); +const watchedWorkflowExecuteAfter = jest.spyOn(watchers, 'workflowExecuteAfter'); + +beforeAll(async () => { + const globalOwnerRole = await testDb.getGlobalOwnerRole(); + owner = await testDb.createUser({ globalRole: globalOwnerRole }); + + mockInstance(Push); + Container.set(Push, new Push()); + + runner = new WorkflowRunner(); + + hookFunctions = { + workflowExecuteAfter: [watchers.workflowExecuteAfter], + }; +}); + +afterAll(() => { + jest.restoreAllMocks(); +}); + +beforeEach(async () => { + await testDb.truncate(['Workflow', 'SharedWorkflow']); +}); + +test('processError should return early in Bull stalled edge case', async () => { + const workflow = await createWorkflow({}, owner); + const execution = await createExecution( + { + status: 'success', + finished: true, + }, + workflow, + ); + config.set('executions.mode', 'queue'); + await runner.processError( + new Error('test') as ExecutionError, + new Date(), + 'webhook', + execution.id, + new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + ); + expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(0); +}); + +test('processError should process error', async () => { + const workflow = await createWorkflow({}, owner); + const execution = await createExecution( + { + status: 'success', + finished: true, + }, + workflow, + ); + config.set('executions.mode', 'regular'); + await runner.processError( + new Error('test') as ExecutionError, + new Date(), + 'webhook', + execution.id, + new WorkflowHooks(hookFunctions, 'webhook', execution.id, workflow), + ); + expect(watchedWorkflowExecuteAfter).toHaveBeenCalledTimes(1); +});