From 321d6deef18806d88d97afef2f2c6f29e739ccb4 Mon Sep 17 00:00:00 2001 From: Danny Martini Date: Fri, 18 Oct 2024 17:30:26 +0200 Subject: [PATCH] feat(core): Handle cycles in workflows when partially executing them (#11187) --- .../PartialExecutionUtils/DirectedGraph.ts | 143 ++++++++++++++++++ .../__tests__/DirectedGraph.test.ts | 110 ++++++++++++++ .../__tests__/cleanRunData.test.ts | 6 +- .../__tests__/findStartNodes.test.ts | 48 +++--- .../__tests__/handleCycles.test.ts | 116 ++++++++++++++ .../recreateNodeExecutionStack.test.ts | 12 +- .../src/PartialExecutionUtils/cleanRunData.ts | 2 +- .../src/PartialExecutionUtils/findCycles.ts | 6 - .../PartialExecutionUtils/findStartNodes.ts | 4 +- .../src/PartialExecutionUtils/handleCycles.ts | 56 +++++++ .../core/src/PartialExecutionUtils/index.ts | 3 +- .../recreateNodeExecutionStack.ts | 2 +- packages/core/src/WorkflowExecute.ts | 14 +- 13 files changed, 469 insertions(+), 53 deletions(-) create mode 100644 packages/core/src/PartialExecutionUtils/__tests__/handleCycles.test.ts delete mode 100644 packages/core/src/PartialExecutionUtils/findCycles.ts create mode 100644 packages/core/src/PartialExecutionUtils/handleCycles.ts diff --git a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts index 606f624d02d0b..6f8b43a660d31 100644 --- a/packages/core/src/PartialExecutionUtils/DirectedGraph.ts +++ b/packages/core/src/PartialExecutionUtils/DirectedGraph.ts @@ -286,6 +286,149 @@ export class DirectedGraph { ); } + /** + * Returns all strongly connected components. + * + * Strongly connected components are a set of nodes where it's possible to + * reach every node from every node. + * + * Strongly connected components are mutually exclusive in directed graphs, + * e.g. they cannot overlap. + * + * The smallest strongly connected component is a single node, since it can + * reach itself from itself by not following any edges. + * + * The algorithm implement here is Tarjan's algorithm. + * + * Example: + * ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ + * │node1├────►node2◄────┤node3├────►node5│ + * └─────┘ └──┬──┘ └──▲──┘ └▲───┬┘ + * │ │ │ │ + * ┌──▼──┐ │ ┌┴───▼┐ + * │node4├───────┘ │node6│ + * └─────┘ └─────┘ + * + * The strongly connected components are + * 1. node1 + * 2. node2, node4, node3 + * 3. node5, node6 + * + * Further reading: + * https://en.wikipedia.org/wiki/Strongly_connected_component + * https://www.youtube.com/watch?v=wUgWX0nc4NY + */ + getStronglyConnectedComponents(): Array> { + let id = 0; + const visited = new Set(); + const ids = new Map(); + const lowLinkValues = new Map(); + const stack: INode[] = []; + const stronglyConnectedComponents: Array> = []; + + const followNode = (node: INode) => { + if (visited.has(node)) { + return; + } + + visited.add(node); + lowLinkValues.set(node, id); + ids.set(node, id); + id++; + stack.push(node); + + const directChildren = this.getDirectChildConnections(node).map((c) => c.to); + for (const child of directChildren) { + followNode(child); + + // if node is on stack min the low id + if (stack.includes(child)) { + const childLowLinkValue = lowLinkValues.get(child); + const ownLowLinkValue = lowLinkValues.get(node); + a.ok(childLowLinkValue !== undefined); + a.ok(ownLowLinkValue !== undefined); + const lowestLowLinkValue = Math.min(childLowLinkValue, ownLowLinkValue); + + lowLinkValues.set(node, lowestLowLinkValue); + } + } + + // after we visited all children, check if the low id is the same as the + // nodes id, which means we found a strongly connected component + const ownId = ids.get(node); + const ownLowLinkValue = lowLinkValues.get(node); + a.ok(ownId !== undefined); + a.ok(ownLowLinkValue !== undefined); + + if (ownId === ownLowLinkValue) { + // pop from the stack until the stack is empty or we find a node that + // has a different low id + const scc: Set = new Set(); + let next = stack.at(-1); + + while (next && lowLinkValues.get(next) === ownId) { + stack.pop(); + scc.add(next); + next = stack.at(-1); + } + + if (scc.size > 0) { + stronglyConnectedComponents.push(scc); + } + } + }; + + for (const node of this.nodes.values()) { + followNode(node); + } + + return stronglyConnectedComponents; + } + + private depthFirstSearchRecursive( + from: INode, + fn: (node: INode) => boolean, + seen: Set, + ): INode | undefined { + if (seen.has(from)) { + return undefined; + } + seen.add(from); + + if (fn(from)) { + return from; + } + + for (const childConnection of this.getDirectChildConnections(from)) { + const found = this.depthFirstSearchRecursive(childConnection.to, fn, seen); + + if (found) { + return found; + } + } + + return undefined; + } + + /** + * Like `Array.prototype.find` but for directed graphs. + * + * Starting from, and including, the `from` node this calls the provided + * predicate function with every child node until the predicate function + * returns true. + * + * The search is depth first, meaning every branch is exhausted before the + * next branch is tried. + * + * The first node for which the predicate function returns true is returned. + * + * If the graph is exhausted and the predicate function never returned true, + * undefined is returned instead. + */ + depthFirstSearch({ from, fn }: { from: INode; fn: (node: INode) => boolean }): INode | undefined { + return this.depthFirstSearchRecursive(from, fn, new Set()); + } + toWorkflow(parameters: Omit): Workflow { return new Workflow({ ...parameters, diff --git a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts index d6eedf416df3a..9530ed2217c29 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/DirectedGraph.test.ts @@ -9,6 +9,7 @@ // XX denotes that the node is disabled // PD denotes that the node has pinned data +import type { INode } from 'n8n-workflow'; import { NodeConnectionType } from 'n8n-workflow'; import { createNodeData, defaultWorkflowParameter } from './helpers'; @@ -89,6 +90,115 @@ describe('DirectedGraph', () => { }); }); + describe('getStronglyConnectedComponents', () => { + // ┌─────┐ ┌─────┐ ┌─────┐ + // │node1├───►│node2├───►│node4│ + // └─────┘ └──┬──┘ └─────┘ + // ▲ │ + // │ │ + // ┌──┴──┐ │ + // │node3│◄──────┘ + // └─────┘ + test('find strongly connected components', () => { + // ARRANGE + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const node4 = createNodeData({ name: 'Node4' }); + const graph = new DirectedGraph() + .addNodes(node1, node2, node3, node4) + .addConnections( + { from: node1, to: node2 }, + { from: node2, to: node3 }, + { from: node3, to: node1 }, + { from: node2, to: node4 }, + ); + + // ACT + const stronglyConnectedComponents = graph.getStronglyConnectedComponents(); + + // ASSERT + expect(stronglyConnectedComponents).toHaveLength(2); + expect(stronglyConnectedComponents).toContainEqual(new Set([node4])); + expect(stronglyConnectedComponents).toContainEqual(new Set([node3, node2, node1])); + }); + + // ┌────┐ + // ┌───────┐ │ ├─ + // │trigger├──┬──►loop│ + // └───────┘ │ │ ├────┐ + // │ └────┘ │ + // └─────────┐ │ + // ┌────┐ │ │ + // ┌───►node├─┘ │ + // │ └────┘ │ + // │ │ + // └─────────────┘ + test('find strongly connected components even if they use different output indexes', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop' }); + const node = createNodeData({ name: 'node' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, node) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 1, to: node }, + { from: node, to: loop }, + ); + + // ACT + const stronglyConnectedComponents = graph.getStronglyConnectedComponents(); + + // ASSERT + expect(stronglyConnectedComponents).toHaveLength(2); + expect(stronglyConnectedComponents).toContainEqual(new Set([trigger])); + expect(stronglyConnectedComponents).toContainEqual(new Set([node, loop])); + }); + }); + + describe('depthFirstSearch', () => { + // ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ + // │node0├───►│node1├───►│node2├───►│node4│───►│node5│ + // └─────┘ └─────┘ └──┬──┘ └─────┘ └─────┘ + // ▲ │ + // │ │ + // ┌──┴──┐ │ + // │node3│◄──────┘ + // └─────┘ + test('calls nodes in the correct order and stops when it found the node', () => { + // ARRANGE + const node0 = createNodeData({ name: 'Node0' }); + const node1 = createNodeData({ name: 'Node1' }); + const node2 = createNodeData({ name: 'Node2' }); + const node3 = createNodeData({ name: 'Node3' }); + const node4 = createNodeData({ name: 'Node4' }); + const node5 = createNodeData({ name: 'Node5' }); + const graph = new DirectedGraph() + .addNodes(node0, node1, node2, node3, node4, node5) + .addConnections( + { from: node0, to: node1 }, + { from: node1, to: node2 }, + { from: node2, to: node3 }, + { from: node3, to: node1 }, + { from: node2, to: node4 }, + { from: node4, to: node5 }, + ); + const fn = jest.fn().mockImplementation((node: INode) => node === node4); + + // ACT + const foundNode = graph.depthFirstSearch({ + from: node0, + fn, + }); + + // ASSERT + expect(foundNode).toBe(node4); + expect(fn).toHaveBeenCalledTimes(5); + expect(fn.mock.calls).toEqual([[node0], [node1], [node2], [node3], [node4]]); + }); + }); + describe('getParentConnections', () => { // ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ // │node1├──►│node2├──►│node3│──►│node4│ diff --git a/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts index bf37ec7636916..5daea46ef6e7d 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/cleanRunData.test.ts @@ -23,7 +23,7 @@ describe('cleanRunData', () => { }; // ACT - const newRunData = cleanRunData(runData, graph, [node1]); + const newRunData = cleanRunData(runData, graph, new Set([node1])); // ASSERT expect(newRunData).toEqual({}); @@ -47,7 +47,7 @@ describe('cleanRunData', () => { }; // ACT - const newRunData = cleanRunData(runData, graph, [node2]); + const newRunData = cleanRunData(runData, graph, new Set([node2])); // ASSERT expect(newRunData).toEqual({ [node1.name]: runData[node1.name] }); @@ -78,7 +78,7 @@ describe('cleanRunData', () => { }; // ACT - const newRunData = cleanRunData(runData, graph, [node2]); + const newRunData = cleanRunData(runData, graph, new Set([node2])); // ASSERT // TODO: Find out if this is a desirable result in milestone 2 diff --git a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts index 57022d862c480..ab33ccf8edd1e 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/findStartNodes.test.ts @@ -48,8 +48,8 @@ describe('findStartNodes', () => { const startNodes = findStartNodes({ graph, trigger: node, destination: node }); - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node); }); // ►► @@ -67,8 +67,8 @@ describe('findStartNodes', () => { { const startNodes = findStartNodes({ graph, trigger, destination }); - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(trigger); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(trigger); } // if the trigger has run data @@ -79,8 +79,8 @@ describe('findStartNodes', () => { const startNodes = findStartNodes({ graph, trigger, destination, runData }); - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(destination); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(destination); } }); @@ -115,8 +115,8 @@ describe('findStartNodes', () => { const startNodes = findStartNodes({ graph, trigger, destination: node, runData }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node); }); // ┌─────┐ ┌─────┐ ►► @@ -156,9 +156,9 @@ describe('findStartNodes', () => { const startNodes = findStartNodes({ graph, trigger, destination: node4 }); // ASSERT - expect(startNodes).toHaveLength(1); + expect(startNodes.size).toBe(1); // no run data means the trigger is the start node - expect(startNodes[0]).toEqual(trigger); + expect(startNodes).toContainEqual(trigger); } { @@ -175,8 +175,8 @@ describe('findStartNodes', () => { const startNodes = findStartNodes({ graph, trigger, destination: node4, runData }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node4); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node4); } }); @@ -211,8 +211,8 @@ describe('findStartNodes', () => { }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node); }); // ►► @@ -246,8 +246,8 @@ describe('findStartNodes', () => { }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node); }); // ►► @@ -286,8 +286,8 @@ describe('findStartNodes', () => { }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node); }); // ►► @@ -324,8 +324,8 @@ describe('findStartNodes', () => { }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node3); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node3); }); // ►► @@ -360,8 +360,8 @@ describe('findStartNodes', () => { }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node2); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node2); }); // ►► @@ -392,7 +392,7 @@ describe('findStartNodes', () => { const startNodes = findStartNodes({ graph, trigger, destination: node2, runData, pinData }); // ASSERT - expect(startNodes).toHaveLength(1); - expect(startNodes[0]).toEqual(node2); + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(node2); }); }); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/handleCycles.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/handleCycles.test.ts new file mode 100644 index 0000000000000..def9fed0ff53c --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/__tests__/handleCycles.test.ts @@ -0,0 +1,116 @@ +// NOTE: Diagrams in this file have been created with https://asciiflow.com/#/ +// If you update the tests, please update the diagrams as well. +// If you add a test, please create a new diagram. +// +// Map +// 0 means the output has no run data +// 1 means the output has run data +// ►► denotes the node that the user wants to execute to +// XX denotes that the node is disabled +// PD denotes that the node has pinned data + +import { createNodeData } from './helpers'; +import { DirectedGraph } from '../DirectedGraph'; +import { handleCycles } from '../handleCycles'; + +describe('handleCycles', () => { + // ┌────┐ ┌─────────┐ + //┌───────┐ │ ├──────────►afterLoop│ + //│trigger├────┬───►loop│ └─────────┘ + //└───────┘ │ │ ├─┐ ►► + // │ └────┘ │ ┌──────┐ + // │ └───►inLoop├────┐ + // │ └──────┘ │ + // │ │ + // └──────────────────────────┘ + test('if the start node is within a cycle it returns the start of the cycle as the new start node', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 0, to: afterLoop }, + { from: loop, outputIndex: 1, to: inLoop }, + { from: inLoop, to: loop }, + ); + const startNodes = new Set([inLoop]); + + // ACT + const newStartNodes = handleCycles(graph, startNodes, trigger); + + // ASSERT + expect(newStartNodes.size).toBe(1); + expect(newStartNodes).toContainEqual(loop); + }); + + // ┌────┐ ┌─────────┐ + //┌───────┐ │ ├──────────►afterLoop│ + //│trigger├────┬───►loop│ └─────────┘ + //└───────┘ │ │ ├─┐ ►► + // │ └────┘ │ ┌──────┐ + // │ └───►inLoop├────┐ + // │ └──────┘ │ + // │ │ + // └──────────────────────────┘ + test('does not mutate `startNodes`', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 0, to: afterLoop }, + { from: loop, outputIndex: 1, to: inLoop }, + { from: inLoop, to: loop }, + ); + const startNodes = new Set([inLoop]); + + // ACT + handleCycles(graph, startNodes, trigger); + + // ASSERT + expect(startNodes.size).toBe(1); + expect(startNodes).toContainEqual(inLoop); + }); + + // ►► + // ┌────┐ ┌─────────┐ + //┌───────┐ │ ├──────────►afterLoop│ + //│trigger├────┬───►loop│ └─────────┘ + //└───────┘ │ │ ├─┐ + // │ └────┘ │ ┌──────┐ + // │ └───►inLoop├────┐ + // │ └──────┘ │ + // │ │ + // └──────────────────────────┘ + test('if the start node is not within a cycle it returns the same node as the new start node', () => { + // ARRANGE + const trigger = createNodeData({ name: 'trigger' }); + const loop = createNodeData({ name: 'loop' }); + const inLoop = createNodeData({ name: 'inLoop' }); + const afterLoop = createNodeData({ name: 'afterLoop' }); + const graph = new DirectedGraph() + .addNodes(trigger, loop, inLoop, afterLoop) + .addConnections( + { from: trigger, to: loop }, + { from: loop, outputIndex: 0, to: afterLoop }, + { from: loop, outputIndex: 1, to: inLoop }, + { from: inLoop, to: loop }, + ); + const startNodes = new Set([afterLoop]); + + // ACT + const newStartNodes = handleCycles(graph, startNodes, trigger); + + // ASSERT + expect(newStartNodes.size).toBe(1); + expect(newStartNodes).toContainEqual(afterLoop); + }); +}); diff --git a/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts b/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts index a4bcac23a54ef..8bae766912fa6 100644 --- a/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts +++ b/packages/core/src/PartialExecutionUtils/__tests__/recreateNodeExecutionStack.test.ts @@ -33,7 +33,7 @@ describe('recreateNodeExecutionStack', () => { .addConnections({ from: trigger, to: node }); const workflow = findSubgraph({ graph, destination: node, trigger }); - const startNodes = [node]; + const startNodes = new Set([node]); const runData: IRunData = { [trigger.name]: [toITaskData([{ data: { value: 1 } }])], }; @@ -87,7 +87,7 @@ describe('recreateNodeExecutionStack', () => { const workflow = new DirectedGraph() .addNodes(trigger, node) .addConnections({ from: trigger, to: node }); - const startNodes = [trigger]; + const startNodes = new Set([trigger]); const runData: IRunData = {}; const pinData: IPinData = {}; @@ -121,7 +121,7 @@ describe('recreateNodeExecutionStack', () => { const workflow = new DirectedGraph() .addNodes(trigger, node) .addConnections({ from: trigger, to: node }); - const startNodes = [node]; + const startNodes = new Set([node]); const runData: IRunData = {}; const pinData: IPinData = { [trigger.name]: [{ json: { value: 1 } }], @@ -169,7 +169,7 @@ describe('recreateNodeExecutionStack', () => { .addNodes(trigger, node1, node2) .addConnections({ from: trigger, to: node1 }, { from: node1, to: node2 }); - const startNodes = [node2]; + const startNodes = new Set([node2]); const runData: IRunData = { [trigger.name]: [toITaskData([{ data: { value: 1 } }])], }; @@ -204,7 +204,7 @@ describe('recreateNodeExecutionStack', () => { { from: node2, to: node3 }, ); - const startNodes = [node3]; + const startNodes = new Set([node3]); const runData: IRunData = { [trigger.name]: [toITaskData([{ data: { value: 1 } }])], [node1.name]: [toITaskData([{ data: { value: 1 } }])], @@ -287,7 +287,7 @@ describe('recreateNodeExecutionStack', () => { { from: node1, to: node3, inputIndex: 0 }, { from: node2, to: node3, inputIndex: 1 }, ); - const startNodes = [node3]; + const startNodes = new Set([node3]); const runData: IRunData = { [trigger.name]: [toITaskData([{ data: { value: 1 } }])], [node1.name]: [toITaskData([{ data: { value: 1 } }])], diff --git a/packages/core/src/PartialExecutionUtils/cleanRunData.ts b/packages/core/src/PartialExecutionUtils/cleanRunData.ts index 5d74a3575ac4a..bcd60c423bc68 100644 --- a/packages/core/src/PartialExecutionUtils/cleanRunData.ts +++ b/packages/core/src/PartialExecutionUtils/cleanRunData.ts @@ -10,7 +10,7 @@ import type { DirectedGraph } from './DirectedGraph'; export function cleanRunData( runData: IRunData, graph: DirectedGraph, - startNodes: INode[], + startNodes: Set, ): IRunData { const newRunData: IRunData = { ...runData }; diff --git a/packages/core/src/PartialExecutionUtils/findCycles.ts b/packages/core/src/PartialExecutionUtils/findCycles.ts deleted file mode 100644 index 388518ae52d55..0000000000000 --- a/packages/core/src/PartialExecutionUtils/findCycles.ts +++ /dev/null @@ -1,6 +0,0 @@ -import type { Workflow } from 'n8n-workflow'; - -export function findCycles(_workflow: Workflow) { - // TODO: implement depth first search or Tarjan's Algorithm - return []; -} diff --git a/packages/core/src/PartialExecutionUtils/findStartNodes.ts b/packages/core/src/PartialExecutionUtils/findStartNodes.ts index a6165f6564043..5eb036bd888fe 100644 --- a/packages/core/src/PartialExecutionUtils/findStartNodes.ts +++ b/packages/core/src/PartialExecutionUtils/findStartNodes.ts @@ -137,7 +137,7 @@ export function findStartNodes(options: { destination: INode; runData?: IRunData; pinData?: IPinData; -}): INode[] { +}): Set { const graph = options.graph; const trigger = options.trigger; const destination = options.destination; @@ -156,5 +156,5 @@ export function findStartNodes(options: { new Set(), ); - return [...startNodes]; + return startNodes; } diff --git a/packages/core/src/PartialExecutionUtils/handleCycles.ts b/packages/core/src/PartialExecutionUtils/handleCycles.ts new file mode 100644 index 0000000000000..94a8ae8cbc91b --- /dev/null +++ b/packages/core/src/PartialExecutionUtils/handleCycles.ts @@ -0,0 +1,56 @@ +import type { INode } from 'n8n-workflow'; +import * as a from 'node:assert/strict'; + +import type { DirectedGraph } from './DirectedGraph'; + +/** + * Returns a new set of start nodes. + * + * For every start node this checks if it is part of a cycle and if it is it + * replaces the start node with the start of the cycle. + * + * This is useful because it prevents executing cycles partially, e.g. figuring + * our which run of the cycle has to be repeated etc. + */ +export function handleCycles( + graph: DirectedGraph, + startNodes: Set, + trigger: INode, +): Set { + // Strongly connected components can also be nodes that are not part of a + // cycle. They form a strongly connected component of one. E.g the trigger is + // always a strongly connected component by itself because it does not have + // any inputs and thus cannot build a cycle. + // + // We're not interested in them so we filter them out. + const cycles = graph.getStronglyConnectedComponents().filter((cycle) => cycle.size >= 1); + const newStartNodes: Set = new Set(startNodes); + + // For each start node, check if the node is part of a cycle and if it is + // replace the start node with the start of the cycle. + if (cycles.length === 0) { + return newStartNodes; + } + + for (const startNode of startNodes) { + for (const cycle of cycles) { + const isPartOfCycle = cycle.has(startNode); + if (isPartOfCycle) { + const firstNode = graph.depthFirstSearch({ + from: trigger, + fn: (node) => cycle.has(node), + }); + + a.ok( + firstNode, + "the trigger must be connected to the cycle, otherwise the cycle wouldn't be part of the subgraph", + ); + + newStartNodes.delete(startNode); + newStartNodes.add(firstNode); + } + } + } + + return newStartNodes; +} diff --git a/packages/core/src/PartialExecutionUtils/index.ts b/packages/core/src/PartialExecutionUtils/index.ts index 6a6f1a233aa11..cea8ded9b9ca5 100644 --- a/packages/core/src/PartialExecutionUtils/index.ts +++ b/packages/core/src/PartialExecutionUtils/index.ts @@ -2,5 +2,6 @@ export { DirectedGraph } from './DirectedGraph'; export { findTriggerForPartialExecution } from './findTriggerForPartialExecution'; export { findStartNodes } from './findStartNodes'; export { findSubgraph } from './findSubgraph'; -export { findCycles } from './findCycles'; export { recreateNodeExecutionStack } from './recreateNodeExecutionStack'; +export { cleanRunData } from './cleanRunData'; +export { handleCycles } from './handleCycles'; diff --git a/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts b/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts index 4926becb79385..534969f960cb0 100644 --- a/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts +++ b/packages/core/src/PartialExecutionUtils/recreateNodeExecutionStack.ts @@ -32,7 +32,7 @@ import { getSourceDataGroups } from './getSourceDataGroups'; */ export function recreateNodeExecutionStack( graph: DirectedGraph, - startNodes: INode[], + startNodes: Set, destinationNode: INode, runData: IRunData, pinData: IPinData, diff --git a/packages/core/src/WorkflowExecute.ts b/packages/core/src/WorkflowExecute.ts index ec5963a54b4f1..1d9aee76c655b 100644 --- a/packages/core/src/WorkflowExecute.ts +++ b/packages/core/src/WorkflowExecute.ts @@ -51,13 +51,13 @@ import PCancelable from 'p-cancelable'; import * as NodeExecuteFunctions from './NodeExecuteFunctions'; import { DirectedGraph, - findCycles, findStartNodes, findSubgraph, findTriggerForPartialExecution, + cleanRunData, + recreateNodeExecutionStack, + handleCycles, } from './PartialExecutionUtils'; -import { cleanRunData } from './PartialExecutionUtils/cleanRunData'; -import { recreateNodeExecutionStack } from './PartialExecutionUtils/recreateNodeExecutionStack'; export class WorkflowExecute { private status: ExecutionStatus = 'new'; @@ -352,15 +352,11 @@ export class WorkflowExecute { const filteredNodes = subgraph.getNodes(); // 3. Find the Start Nodes - const startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData }); + let startNodes = findStartNodes({ graph: subgraph, trigger, destination, runData }); // 4. Detect Cycles - const cycles = findCycles(workflow); - // 5. Handle Cycles - if (cycles.length) { - // TODO: handle - } + startNodes = handleCycles(graph, startNodes, trigger); // 6. Clean Run Data const newRunData: IRunData = cleanRunData(runData, graph, startNodes);