Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(core): Adjust starter node priority for manual executions with pinned activators #8305

11 changes: 5 additions & 6 deletions packages/cli/src/ActiveWorkflowRunner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
ErrorReporterProxy as ErrorReporter,
WebhookPathTakenError,
ApplicationError,
STARTERS_TO_IGNORE_IN_REGULAR_EXECUTION,
} from 'n8n-workflow';

import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
Expand All @@ -36,11 +37,7 @@ import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData'
import type { WorkflowEntity } from '@db/entities/WorkflowEntity';
import { ActiveExecutions } from '@/ActiveExecutions';
import { ExecutionsService } from './executions/executions.service';
import {
STARTING_NODES,
WORKFLOW_REACTIVATE_INITIAL_TIMEOUT,
WORKFLOW_REACTIVATE_MAX_TIMEOUT,
} from '@/constants';
import { WORKFLOW_REACTIVATE_INITIAL_TIMEOUT, WORKFLOW_REACTIVATE_MAX_TIMEOUT } from '@/constants';
import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner';
import { ExternalHooks } from '@/ExternalHooks';
Expand Down Expand Up @@ -599,7 +596,9 @@ export class ActiveWorkflowRunner {
settings: dbWorkflow.settings,
});

const canBeActivated = workflow.checkIfWorkflowCanBeActivated(STARTING_NODES);
const canBeActivated = workflow.checkIfWorkflowCanBeActivated(
STARTERS_TO_IGNORE_IN_REGULAR_EXECUTION,
);

if (!canBeActivated) {
throw new WorkflowActivationError(
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import type {
import { NodeTypes } from '@/NodeTypes';
import { Push } from '@/push';
import * as WorkflowHelpers from '@/WorkflowHelpers';
import { findSubworkflowStart, isWorkflowIdValid } from '@/utils';
import { isWorkflowIdValid } from '@/utils';
import { PermissionChecker } from './UserManagement/PermissionChecker';
import { InternalHooks } from '@/InternalHooks';
import { ExecutionRepository } from '@db/repositories/execution.repository';
Expand Down Expand Up @@ -656,7 +656,7 @@ export async function getRunData(
): Promise<IWorkflowExecutionDataProcess> {
const mode = 'integrated';

const startingNode = findSubworkflowStart(workflowData.nodes);
const startingNode = Workflow.selectSubworkflowStarter(workflowData.nodes);

// Always start with empty data if no inputData got supplied
inputData = inputData || [
Expand Down
6 changes: 3 additions & 3 deletions packages/cli/src/commands/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { promises as fs } from 'fs';
import { flags } from '@oclif/command';
import { PLACEHOLDER_EMPTY_WORKFLOW_ID } from 'n8n-core';
import type { IWorkflowBase } from 'n8n-workflow';
import { ApplicationError, ExecutionBaseError } from 'n8n-workflow';
import { ApplicationError, ExecutionBaseError, Workflow } from 'n8n-workflow';

import { ActiveExecutions } from '@/ActiveExecutions';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { findCliWorkflowStart, isWorkflowIdValid } from '@/utils';
import { isWorkflowIdValid } from '@/utils';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
import { WorkflowRepository } from '@db/repositories/workflow.repository';
Expand Down Expand Up @@ -96,7 +96,7 @@ export class Execute extends BaseCommand {
workflowId = undefined;
}

const startingNode = findCliWorkflowStart(workflowData.nodes);
const startingNode = Workflow.selectSubworkflowStarter(workflowData.nodes);

const user = await Container.get(OwnershipService).getInstanceOwner();
const runData: IWorkflowExecutionDataProcess = {
Expand Down
5 changes: 2 additions & 3 deletions packages/cli/src/commands/executeBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import fs from 'fs';
import os from 'os';
import { flags } from '@oclif/command';
import type { IRun, ITaskData } from 'n8n-workflow';
import { ApplicationError, jsonParse, sleep } from 'n8n-workflow';
import { ApplicationError, Workflow, jsonParse, sleep } from 'n8n-workflow';
import { sep } from 'path';
import { diff } from 'json-diff';
import pick from 'lodash/pick';
Expand All @@ -12,7 +12,6 @@ import { ActiveExecutions } from '@/ActiveExecutions';
import { WorkflowRunner } from '@/WorkflowRunner';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { User } from '@db/entities/User';
import { findCliWorkflowStart } from '@/utils';
import { BaseCommand } from './BaseCommand';
import { Container } from 'typedi';
import type {
Expand Down Expand Up @@ -635,7 +634,7 @@ export class ExecuteBatch extends BaseCommand {
}, ExecuteBatch.executionTimeout);

try {
const startingNode = findCliWorkflowStart(workflowData.nodes);
const startingNode = Workflow.selectSubworkflowStarter(workflowData.nodes);

const runData: IWorkflowExecutionDataProcess = {
executionMode: 'cli',
Expand Down
6 changes: 0 additions & 6 deletions packages/cli/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ export function getN8nPackageJson() {
return jsonParse<n8n.PackageJson>(readFileSync(join(CLI_DIR, 'package.json'), 'utf8'));
}

export const STARTING_NODES = [
'@n8n/n8n-nodes-langchain.manualChatTrigger',
'n8n-nodes-base.start',
'n8n-nodes-base.manualTrigger',
];

export const N8N_VERSION = getN8nPackageJson().version;

export const NODE_PACKAGE_PREFIX = 'n8n-nodes-';
Expand Down
30 changes: 0 additions & 30 deletions packages/cli/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { CliWorkflowOperationError, SubworkflowOperationError } from 'n8n-workflow';
import type { INode } from 'n8n-workflow';
import { STARTING_NODES } from './constants';

/**
* Returns if the given id is a valid workflow id
Expand All @@ -11,34 +9,6 @@ export function isWorkflowIdValid(id: string | null | undefined): boolean {
return typeof id === 'string' && id?.length <= 16;
}

function findWorkflowStart(executionMode: 'integrated' | 'cli') {
return function (nodes: INode[]) {
const executeWorkflowTriggerNode = nodes.find(
(node) => node.type === 'n8n-nodes-base.executeWorkflowTrigger',
);

if (executeWorkflowTriggerNode) return executeWorkflowTriggerNode;

const startNode = nodes.find((node) => STARTING_NODES.includes(node.type));

if (startNode) return startNode;

const title = 'Missing node to start execution';
const description =
"Please make sure the workflow you're calling contains an Execute Workflow Trigger node";

if (executionMode === 'integrated') {
throw new SubworkflowOperationError(title, description);
}

throw new CliWorkflowOperationError(title, description);
};
}

export const findSubworkflowStart = findWorkflowStart('integrated');

export const findCliWorkflowStart = findWorkflowStart('cli');

export const alphabetizeKeys = (obj: INode) =>
Object.keys(obj)
.sort()
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/workflows/workflow.request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export declare namespace WorkflowRequest {
type ManualRunPayload = {
workflowData: IWorkflowDb;
runData: IRunData;
pinData: IPinData;
pinData?: IPinData;
startNodes?: string[];
destinationNode?: string;
};
Expand Down
57 changes: 12 additions & 45 deletions packages/cli/src/workflows/workflow.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Container, { Service } from 'typedi';
import type { INode, IPinData } from 'n8n-workflow';
import type { INode } from 'n8n-workflow';
import { NodeApiError, Workflow } from 'n8n-workflow';
import pick from 'lodash/pick';
import omit from 'lodash/omit';
Expand All @@ -14,7 +14,7 @@ import { ExternalHooks } from '@/ExternalHooks';
import { hasSharing, type ListQuery } from '@/requests';
import type { WorkflowRequest } from '@/workflows/workflow.request';
import { TagService } from '@/services/tag.service';
import type { IWorkflowDb, IWorkflowExecutionDataProcess } from '@/Interfaces';
import type { IWorkflowExecutionDataProcess } from '@/Interfaces';
import { NodeTypes } from '@/NodeTypes';
import { WorkflowRunner } from '@/WorkflowRunner';
import * as WorkflowExecuteAdditionalData from '@/WorkflowExecuteAdditionalData';
Expand Down Expand Up @@ -51,48 +51,6 @@ export class WorkflowService {
private readonly activeWorkflowRunner: ActiveWorkflowRunner,
) {}

/**
* Find the pinned trigger to execute the workflow from, if any.
*
* - In a full execution, select the _first_ pinned trigger.
* - In a partial execution,
* - select the _first_ pinned trigger that leads to the executed node,
* - else select the executed pinned trigger.
*/
findPinnedTrigger(workflow: IWorkflowDb, startNodes?: string[], pinData?: IPinData) {
if (!pinData || !startNodes) return null;

const isTrigger = (nodeTypeName: string) =>
['trigger', 'webhook'].some((suffix) => nodeTypeName.toLowerCase().includes(suffix));

const pinnedTriggers = workflow.nodes.filter(
(node) => !node.disabled && pinData[node.name] && isTrigger(node.type),
);

if (pinnedTriggers.length === 0) return null;

if (startNodes?.length === 0) return pinnedTriggers[0]; // full execution

const [startNodeName] = startNodes;

const parentNames = new Workflow({
nodes: workflow.nodes,
connections: workflow.connections,
active: workflow.active,
nodeTypes: this.nodeTypes,
}).getParentNodes(startNodeName);

let checkNodeName = '';

if (parentNames.length === 0) {
checkNodeName = startNodeName;
} else {
checkNodeName = parentNames.find((pn) => pn === pinnedTriggers[0].name) as string;
}

return pinnedTriggers.find((pt) => pt.name === checkNodeName) ?? null; // partial execution
}

async getMany(sharedWorkflowIds: string[], options?: ListQuery.Options) {
const { workflows, count } = await this.workflowRepository.getMany(sharedWorkflowIds, options);

Expand Down Expand Up @@ -304,7 +262,16 @@ export class WorkflowService {
user: User,
sessionId?: string,
) {
const pinnedTrigger = this.findPinnedTrigger(workflowData, startNodes, pinData);
const pinnedTrigger =
pinData && startNodes
? new Workflow({
nodes: workflowData.nodes,
connections: workflowData.connections,
active: workflowData.active,
nodeTypes: this.nodeTypes,
pinData,
}).selectPinnedActivatorStarter(startNodes)
: null;

// If webhooks nodes exist and are active we have to wait for till we receive a call
if (
Expand Down
7 changes: 4 additions & 3 deletions packages/cli/test/integration/publicApi/workflows.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { SuperAgentTest } from 'supertest';
import Container from 'typedi';
import type { INode } from 'n8n-workflow';
import { STARTING_NODES } from '@/constants';
import { SUBWORKFLOW_STARTER_NODES, type INode } from 'n8n-workflow';
import type { Role } from '@db/entities/Role';
import type { TagEntity } from '@db/entities/TagEntity';
import type { User } from '@db/entities/User';
Expand Down Expand Up @@ -809,7 +808,9 @@ describe('POST /workflows', () => {
},
});

const found = response.body.nodes.find((node: INode) => STARTING_NODES.includes(node.type));
const found = response.body.nodes.find((node: INode) =>
SUBWORKFLOW_STARTER_NODES.includes(node.type),
);

expect(found).toBeUndefined();
});
Expand Down
18 changes: 18 additions & 0 deletions packages/workflow/src/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,21 @@ export const CREDENTIAL_EMPTY_VALUE =
'__n8n_EMPTY_VALUE_7b1af746-3729-4c60-9b9b-e08eb29e58da' as const;

export const FORM_TRIGGER_PATH_IDENTIFIER = 'n8n-form';

export const NODE_TYPES = {
EXECUTE_WORKFLOW_TRIGGER: 'n8n-nodes-base.executeWorkflowTrigger',
START: 'n8n-nodes-base.start',
MANUAL_TRIGGER: 'n8n-nodes-base.manualTrigger',
MANUAL_CHAT_TRIGGER: '@n8n/n8n-nodes-langchain.manualChatTrigger',
};

export const SUBWORKFLOW_STARTER_NODES = [
NODE_TYPES.EXECUTE_WORKFLOW_TRIGGER,
NODE_TYPES.MANUAL_CHAT_TRIGGER,
NODE_TYPES.START,
NODE_TYPES.MANUAL_TRIGGER,
];

export const STARTERS_TO_IGNORE_IN_REGULAR_EXECUTION = SUBWORKFLOW_STARTER_NODES.filter(
(node) => node !== NODE_TYPES.EXECUTE_WORKFLOW_TRIGGER,
);
Loading
Loading