diff --git a/packages/workflow/src/Workflow.ts b/packages/workflow/src/Workflow.ts index 9b2042d7f8394..29d789562a014 100644 --- a/packages/workflow/src/Workflow.ts +++ b/packages/workflow/src/Workflow.ts @@ -1424,13 +1424,6 @@ export class Workflow { return { data: null }; } - if (triggerResponse.manualTriggerFunction !== undefined) { - // If a manual trigger function is defined call it and wait till it did run - await triggerResponse.manualTriggerFunction(); - } - - const response = await triggerResponse.manualTriggerResponse!; - let closeFunction; if (triggerResponse.closeFunction) { // In manual mode we return the trigger closeFunction. That allows it to be called directly @@ -1439,8 +1432,18 @@ export class Workflow { // If we would not be able to wait for it to close would it cause problems with "own" mode as the // process would be killed directly after it and so the acknowledge would not have been finished yet. closeFunction = triggerResponse.closeFunction; + + // Manual testing of Trigger nodes creates an execution. If the execution is cancelled, `closeFunction` should be called to cleanup any open connections/consumers + abortSignal?.addEventListener('abort', closeFunction); } + if (triggerResponse.manualTriggerFunction !== undefined) { + // If a manual trigger function is defined call it and wait till it did run + await triggerResponse.manualTriggerFunction(); + } + + const response = await triggerResponse.manualTriggerResponse!; + if (response.length === 0) { return { data: null, closeFunction }; } diff --git a/packages/workflow/test/Workflow.test.ts b/packages/workflow/test/Workflow.test.ts index ba2873e4853c0..df0e85f49b364 100644 --- a/packages/workflow/test/Workflow.test.ts +++ b/packages/workflow/test/Workflow.test.ts @@ -4,13 +4,18 @@ import type { IBinaryKeyData, IConnections, IDataObject, + IExecuteData, INode, + INodeExecuteFunctions, INodeExecutionData, INodeParameters, INodeType, INodeTypeDescription, INodeTypes, IRunExecutionData, + ITriggerFunctions, + ITriggerResponse, + IWorkflowExecuteAdditionalData, NodeParameterValueType, } from '@/Interfaces'; import { Workflow, type WorkflowParameters } from '@/Workflow'; @@ -2015,4 +2020,67 @@ describe('Workflow', () => { ]); }); }); + + describe('runNode', () => { + const nodeTypes = mock(); + const triggerNode = mock(); + const triggerResponse = mock({ + closeFunction: jest.fn(), + // This node should never trigger, or return + manualTriggerFunction: async () => await new Promise(() => {}), + }); + const triggerNodeType = mock({ + description: { + properties: [], + }, + execute: undefined, + poll: undefined, + webhook: undefined, + async trigger(this: ITriggerFunctions) { + return triggerResponse; + }, + }); + + nodeTypes.getByNameAndVersion.mockReturnValue(triggerNodeType); + + const workflow = new Workflow({ + nodeTypes, + nodes: [triggerNode], + connections: {}, + active: false, + }); + + const executionData = mock(); + const runExecutionData = mock(); + const additionalData = mock(); + const nodeExecuteFunctions = mock(); + const triggerFunctions = mock(); + nodeExecuteFunctions.getExecuteTriggerFunctions.mockReturnValue(triggerFunctions); + const abortController = new AbortController(); + + test('should call closeFunction when manual trigger is aborted', async () => { + const runPromise = workflow.runNode( + executionData, + runExecutionData, + 0, + additionalData, + nodeExecuteFunctions, + 'manual', + abortController.signal, + ); + // Yield back to the event-loop to let async parts of `runNode` execute + await new Promise((resolve) => setImmediate(resolve)); + + let isSettled = false; + void runPromise.then(() => { + isSettled = true; + }); + expect(isSettled).toBe(false); + expect(abortController.signal.aborted).toBe(false); + expect(triggerResponse.closeFunction).not.toHaveBeenCalled(); + + abortController.abort(); + expect(triggerResponse.closeFunction).toHaveBeenCalled(); + }); + }); });