diff --git a/packages/editor-ui/src/components/NodeSettings.vue b/packages/editor-ui/src/components/NodeSettings.vue index cd74caa5e9016..13e1f0e0aa043 100644 --- a/packages/editor-ui/src/components/NodeSettings.vue +++ b/packages/editor-ui/src/components/NodeSettings.vue @@ -139,6 +139,7 @@ :parameters="parametersSetting" :node-values="nodeValues" :is-read-only="isReadOnly" + :hide-delete="true" :hidden-issues-inputs="hiddenIssuesInputs" path="parameters" @value-changed="valueChanged" diff --git a/packages/workflow/src/NodeHelpers.ts b/packages/workflow/src/NodeHelpers.ts index 3f41fb776aa3b..f218f9a88223e 100644 --- a/packages/workflow/src/NodeHelpers.ts +++ b/packages/workflow/src/NodeHelpers.ts @@ -272,6 +272,134 @@ const commonCORSParameters: INodeProperties[] = [ }, ]; +const declarativeNodeOptionParameters: INodeProperties = { + displayName: 'Request Options', + name: 'requestOptions', + type: 'collection', + isNodeSetting: true, + placeholder: 'Add Option', + default: {}, + options: [ + { + displayName: 'Batching', + name: 'batching', + placeholder: 'Add Batching', + type: 'fixedCollection', + typeOptions: { + multipleValues: false, + }, + default: { + batch: {}, + }, + options: [ + { + displayName: 'Batching', + name: 'batch', + values: [ + { + displayName: 'Items per Batch', + name: 'batchSize', + type: 'number', + typeOptions: { + minValue: -1, + }, + default: 50, + description: + 'Input will be split in batches to throttle requests. -1 for disabled. 0 will be treated as 1.', + }, + { + displayName: 'Batch Interval (ms)', + name: 'batchInterval', + type: 'number', + typeOptions: { + minValue: 0, + }, + default: 1000, + description: 'Time (in milliseconds) between each batch of requests. 0 for disabled.', + }, + ], + }, + ], + }, + { + displayName: 'Ignore SSL Issues', + name: 'allowUnauthorizedCerts', + type: 'boolean', + noDataExpression: true, + default: false, + description: + 'Whether to accept the response even if SSL certificate validation is not possible', + }, + { + displayName: 'Proxy', + name: 'proxy', + type: 'string', + default: '', + placeholder: 'e.g. http://myproxy:3128', + description: + 'HTTP proxy to use. If authentication is required it can be defined as follow: http://username:password@myproxy:3128', + }, + { + displayName: 'Timeout', + name: 'timeout', + type: 'number', + typeOptions: { + minValue: 1, + }, + default: 10000, + description: + 'Time in ms to wait for the server to send response headers (and start the response body) before aborting the request', + }, + ], +}; + +export function applyDeclarativeNodeOptionParameters(nodeType: INodeType): void { + if (nodeType.execute || nodeType.trigger || nodeType.webhook || nodeType.description.polling) { + return; + } + + const parameters = nodeType.description.properties; + + if (!parameters) { + return; + } + + // Was originally under "options" instead of "requestOptions" so the chance + // that that existed was quite high. With this name the chance is actually + // very low that it already exists but lets leave it in anyway to be sure. + const existingRequestOptionsIndex = parameters.findIndex( + (parameter) => parameter.name === 'requestOptions', + ); + if (existingRequestOptionsIndex !== -1) { + parameters[existingRequestOptionsIndex] = { + ...declarativeNodeOptionParameters, + options: [ + ...(declarativeNodeOptionParameters.options || []), + ...(parameters[existingRequestOptionsIndex]?.options || []), + ], + }; + + if (parameters[existingRequestOptionsIndex]?.options) { + parameters[existingRequestOptionsIndex].options!.sort((a, b) => { + if ('displayName' in a && 'displayName' in b) { + if (a.displayName < b.displayName) { + return -1; + } + if (a.displayName > b.displayName) { + return 1; + } + } + + return 0; + }); + } + } else { + parameters.push(declarativeNodeOptionParameters); + } + + return; +} + /** * Apply special parameters which should be added to nodeTypes depending on their type or configuration */ @@ -289,6 +417,8 @@ export function applySpecialNodeParameters(nodeType: INodeType): void { ]; else properties.push(...commonCORSParameters); } + + applyDeclarativeNodeOptionParameters(nodeType); } const getPropertyValues = ( diff --git a/packages/workflow/src/RoutingNode.ts b/packages/workflow/src/RoutingNode.ts index abd77a5d5b138..e41310c200084 100644 --- a/packages/workflow/src/RoutingNode.ts +++ b/packages/workflow/src/RoutingNode.ts @@ -9,6 +9,7 @@ import get from 'lodash/get'; import merge from 'lodash/merge'; import set from 'lodash/set'; +import url from 'node:url'; import type { ICredentialDataDecryptedObject, @@ -46,6 +47,7 @@ import type { Workflow } from './Workflow'; import { NodeOperationError } from './errors/node-operation.error'; import { NodeApiError } from './errors/node-api.error'; +import { sleep } from './utils'; export class RoutingNode { additionalData: IWorkflowExecuteAdditionalData; @@ -76,6 +78,7 @@ export class RoutingNode { this.workflow = workflow; } + // eslint-disable-next-line complexity async runNode( inputData: ITaskDataConnections, runIndex: number, @@ -87,7 +90,6 @@ export class RoutingNode { ): Promise { const items = inputData.main[0] as INodeExecutionData[]; const returnData: INodeExecutionData[] = []; - let responseData; let credentialType: string | undefined; @@ -129,24 +131,41 @@ export class RoutingNode { } } - // TODO: Think about how batching could be handled for REST APIs which support it - for (let i = 0; i < items.length; i++) { - let thisArgs: IExecuteSingleFunctions | undefined; - try { - thisArgs = nodeExecuteFunctions.getExecuteSingleFunctions( + const { batching } = executeFunctions.getNodeParameter('requestOptions', 0, {}) as { + batching: { batch: { batchSize: number; batchInterval: number } }; + }; + + const batchSize = batching?.batch?.batchSize > 0 ? batching?.batch?.batchSize : 1; + const batchInterval = batching?.batch.batchInterval; + + const requestPromises = []; + const itemContext: Array<{ + thisArgs: IExecuteSingleFunctions; + requestData: DeclarativeRestApiSettings.ResultOptions; + }> = []; + + for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { + if (itemIndex > 0 && batchSize >= 0 && batchInterval > 0) { + if (itemIndex % batchSize === 0) { + await sleep(batchInterval); + } + } + + itemContext.push({ + thisArgs: nodeExecuteFunctions.getExecuteSingleFunctions( this.workflow, this.runExecutionData, runIndex, this.connectionInputData, inputData, this.node, - i, + itemIndex, this.additionalData, executeData, this.mode, abortSignal, - ); - const requestData: DeclarativeRestApiSettings.ResultOptions = { + ), + requestData: { options: { qs: {}, body: {}, @@ -155,88 +174,160 @@ export class RoutingNode { preSend: [], postReceive: [], requestOperations: {}, - }; - - if (nodeType.description.requestOperations) { - requestData.requestOperations = { ...nodeType.description.requestOperations }; - } + } as DeclarativeRestApiSettings.ResultOptions, + }); - if (nodeType.description.requestDefaults) { - for (const key of Object.keys(nodeType.description.requestDefaults)) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let value = (nodeType.description.requestDefaults as Record)[key]; - // If the value is an expression resolve it - value = this.getParameterValue( - value, - i, - runIndex, - executeData, - { $credentials: credentials, $version: this.node.typeVersion }, - false, - ) as string; - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (requestData.options as Record)[key] = value; - } - } + const { proxy, timeout, allowUnauthorizedCerts } = itemContext[ + itemIndex + ].thisArgs.getNodeParameter('requestOptions', 0, {}) as { + proxy: string; + timeout: number; + allowUnauthorizedCerts: boolean; + }; + + if (nodeType.description.requestOperations) { + itemContext[itemIndex].requestData.requestOperations = { + ...nodeType.description.requestOperations, + }; + } - for (const property of nodeType.description.properties) { - let value = get(this.node.parameters, property.name, []) as string | NodeParameterValue; + if (nodeType.description.requestDefaults) { + for (const key of Object.keys(nodeType.description.requestDefaults)) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let value = (nodeType.description.requestDefaults as Record)[key]; // If the value is an expression resolve it value = this.getParameterValue( value, - i, + itemIndex, runIndex, executeData, { $credentials: credentials, $version: this.node.typeVersion }, false, - ) as string | NodeParameterValue; + ) as string; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (itemContext[itemIndex].requestData.options as Record)[key] = value; + } + } - const tempOptions = this.getRequestOptionsFromParameters( - thisArgs, - property, - i, - runIndex, - '', - { $credentials: credentials, $value: value, $version: this.node.typeVersion }, - ); + for (const property of nodeType.description.properties) { + let value = get(this.node.parameters, property.name, []) as string | NodeParameterValue; + // If the value is an expression resolve it + value = this.getParameterValue( + value, + itemIndex, + runIndex, + executeData, + { $credentials: credentials, $version: this.node.typeVersion }, + false, + ) as string | NodeParameterValue; + + const tempOptions = this.getRequestOptionsFromParameters( + itemContext[itemIndex].thisArgs, + property, + itemIndex, + runIndex, + '', + { $credentials: credentials, $value: value, $version: this.node.typeVersion }, + ); + + this.mergeOptions(itemContext[itemIndex].requestData, tempOptions); + } - this.mergeOptions(requestData, tempOptions); + if (proxy) { + const proxyParsed = url.parse(proxy); + const proxyProperties = ['host', 'port']; + + for (const property of proxyProperties) { + if ( + !(property in proxyParsed) || + proxyParsed[property as keyof typeof proxyParsed] === null + ) { + throw new NodeOperationError(this.node, 'The proxy is not value', { + runIndex, + itemIndex, + description: `The proxy URL does not contain a valid value for "${property}"`, + }); + } } - // TODO: Change to handle some requests in parallel (should be configurable) - responseData = await this.makeRoutingRequest( - requestData, - thisArgs, - i, + itemContext[itemIndex].requestData.options.proxy = { + host: proxyParsed.hostname as string, + port: parseInt(proxyParsed.port!), + protocol: proxyParsed.protocol?.replace(/:$/, '') || undefined, + }; + + if (proxyParsed.auth) { + const [username, password] = proxyParsed.auth.split(':'); + itemContext[itemIndex].requestData.options.proxy!.auth = { + username, + password, + }; + } + } + + if (allowUnauthorizedCerts) { + itemContext[itemIndex].requestData.options.skipSslCertificateValidation = + allowUnauthorizedCerts; + } + + if (timeout) { + itemContext[itemIndex].requestData.options.timeout = timeout; + } else { + // set default timeout to 5 minutes + itemContext[itemIndex].requestData.options.timeout = 300_000; + } + + requestPromises.push( + this.makeRoutingRequest( + itemContext[itemIndex].requestData, + itemContext[itemIndex].thisArgs, + itemIndex, runIndex, credentialType, - requestData.requestOperations, + itemContext[itemIndex].requestData.requestOperations, credentialsDecrypted, - ); + ), + ); + } - if (requestData.maxResults) { - // Remove not needed items in case APIs return to many - responseData.splice(requestData.maxResults as number); + const promisesResponses = await Promise.allSettled(requestPromises); + let responseData: any; + for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { + responseData = promisesResponses.shift(); + if (responseData!.status !== 'fulfilled') { + if (responseData.reason.statusCode === 429) { + responseData.reason.message = + "Try spacing your requests out using the batching settings under 'Options'"; } - returnData.push(...responseData); - } catch (error) { - if (thisArgs !== undefined && thisArgs.continueOnFail()) { + const error = responseData.reason; + + if (itemContext[itemIndex].thisArgs?.continueOnFail()) { returnData.push({ json: {}, error: error as NodeApiError }); continue; } if (error instanceof NodeApiError) { - set(error, 'context.itemIndex', i); + set(error, 'context.itemIndex', itemIndex); set(error, 'context.runIndex', runIndex); throw error; } throw new NodeApiError(this.node, error as JsonObject, { runIndex, - itemIndex: i, + itemIndex, + message: error?.message, + description: error?.description, + httpCode: error.isAxiosError && error.response ? String(error.response?.status) : 'none', }); } + + if (itemContext[itemIndex].requestData.maxResults) { + // Remove not needed items in case APIs return to many + responseData.value.splice(itemContext[itemIndex].requestData.maxResults as number); + } + + returnData.push(...responseData.value); } return [returnData]; diff --git a/packages/workflow/test/RoutingNode.test.ts b/packages/workflow/test/RoutingNode.test.ts index 4b0ac4a5e4998..e2638d8c90e6f 100644 --- a/packages/workflow/test/RoutingNode.test.ts +++ b/packages/workflow/test/RoutingNode.test.ts @@ -19,7 +19,10 @@ import type { import { RoutingNode } from '@/RoutingNode'; import { Workflow } from '@/Workflow'; +import * as utilsModule from '@/utils'; + import * as Helpers from './Helpers'; +import { applyDeclarativeNodeOptionParameters } from '@/NodeHelpers'; import { mock } from 'jest-mock-extended'; const postReceiveFunction1 = async function ( @@ -42,6 +45,23 @@ const preSendFunction1 = async function ( describe('RoutingNode', () => { const additionalData = mock(); + test('applyDeclarativeNodeOptionParameters', () => { + const nodeTypes = Helpers.NodeTypes(); + const nodeType = nodeTypes.getByNameAndVersion('test.setMulti'); + + applyDeclarativeNodeOptionParameters(nodeType); + + const options = nodeType.description.properties.find( + (property) => property.name === 'requestOptions', + ); + + expect(options?.options).toBeDefined; + + const optionNames = options!.options!.map((option) => option.name); + + expect(optionNames).toEqual(['batching', 'allowUnauthorizedCerts', 'proxy', 'timeout']); + }); + describe('getRequestOptionsFromParameters', () => { const tests: Array<{ description: string; @@ -717,6 +737,11 @@ describe('RoutingNode', () => { const tests: Array<{ description: string; input: { + specialTestOptions?: { + applyDeclarativeNodeOptionParameters?: boolean; + numberOfItems?: number; + sleepCalls?: number[][]; + }; nodeType: { properties?: INodeProperties[]; credentials?: INodeCredentialDescription[]; @@ -772,6 +797,7 @@ describe('RoutingNode', () => { }, baseURL: 'http://127.0.0.1:5678', returnFullResponse: true, + timeout: 300000, }, }, }, @@ -821,6 +847,7 @@ describe('RoutingNode', () => { }, baseURL: 'http://127.0.0.1:5678', returnFullResponse: true, + timeout: 300000, }, }, }, @@ -876,6 +903,7 @@ describe('RoutingNode', () => { }, baseURL: 'http://127.0.0.1:5678', returnFullResponse: true, + timeout: 300000, }, }, }, @@ -931,6 +959,7 @@ describe('RoutingNode', () => { }, baseURL: 'http://127.0.0.1:5678', returnFullResponse: true, + timeout: 300000, }, }, }, @@ -988,6 +1017,154 @@ describe('RoutingNode', () => { offset: 10, }, returnFullResponse: true, + timeout: 300000, + }, + }, + }, + ], + ], + }, + { + description: 'multiple parameters, from applyDeclarativeNodeOptionParameters', + input: { + specialTestOptions: { + applyDeclarativeNodeOptionParameters: true, + numberOfItems: 5, + sleepCalls: [[500], [500]], + }, + node: { + parameters: { + requestOptions: { + allowUnauthorizedCerts: true, + batching: { + batch: { + batchSize: 2, + batchInterval: 500, + }, + }, + proxy: 'http://user:password@127.0.0.1:8080', + timeout: 123, + }, + }, + }, + nodeType: { + properties: [], + }, + }, + output: [ + [ + { + json: { + headers: {}, + statusCode: 200, + requestOptions: { + qs: {}, + headers: {}, + proxy: { + auth: { + username: 'user', + password: 'password', + }, + host: '127.0.0.1', + protocol: 'http', + port: 8080, + }, + body: {}, + returnFullResponse: true, + skipSslCertificateValidation: true, + timeout: 123, + }, + }, + }, + { + json: { + headers: {}, + statusCode: 200, + requestOptions: { + qs: {}, + headers: {}, + proxy: { + auth: { + username: 'user', + password: 'password', + }, + host: '127.0.0.1', + protocol: 'http', + port: 8080, + }, + body: {}, + returnFullResponse: true, + skipSslCertificateValidation: true, + timeout: 123, + }, + }, + }, + { + json: { + headers: {}, + statusCode: 200, + requestOptions: { + qs: {}, + headers: {}, + proxy: { + auth: { + username: 'user', + password: 'password', + }, + host: '127.0.0.1', + protocol: 'http', + port: 8080, + }, + body: {}, + returnFullResponse: true, + skipSslCertificateValidation: true, + timeout: 123, + }, + }, + }, + { + json: { + headers: {}, + statusCode: 200, + requestOptions: { + qs: {}, + headers: {}, + proxy: { + auth: { + username: 'user', + password: 'password', + }, + host: '127.0.0.1', + protocol: 'http', + port: 8080, + }, + body: {}, + returnFullResponse: true, + skipSslCertificateValidation: true, + timeout: 123, + }, + }, + }, + { + json: { + headers: {}, + statusCode: 200, + requestOptions: { + qs: {}, + headers: {}, + proxy: { + auth: { + username: 'user', + password: 'password', + }, + host: '127.0.0.1', + protocol: 'http', + port: 8080, + }, + body: {}, + returnFullResponse: true, + skipSslCertificateValidation: true, + timeout: 123, }, }, }, @@ -1424,6 +1601,7 @@ describe('RoutingNode', () => { addedIn: 'preSendFunction1', }, returnFullResponse: true, + timeout: 300000, }, }, ], @@ -1519,6 +1697,7 @@ describe('RoutingNode', () => { baseURL: 'http://127.0.0.1:5678', url: '/test-url', returnFullResponse: true, + timeout: 300000, }, }, }, @@ -1698,15 +1877,12 @@ describe('RoutingNode', () => { const connectionInputData: INodeExecutionData[] = []; const runExecutionData: IRunExecutionData = { resultData: { runData: {} } }; const nodeType = nodeTypes.getByNameAndVersion(baseNode.type); + applyDeclarativeNodeOptionParameters(nodeType); + + const propertiesOriginal = nodeType.description.properties; const inputData: ITaskDataConnections = { - main: [ - [ - { - json: {}, - }, - ], - ], + main: [[]], }; for (const testData of tests) { @@ -1719,6 +1895,9 @@ describe('RoutingNode', () => { }; nodeType.description = { ...testData.input.nodeType } as INodeTypeDescription; + if (testData.input.specialTestOptions?.applyDeclarativeNodeOptionParameters) { + nodeType.description.properties = propertiesOriginal; + } const workflow = new Workflow({ nodes: workflowData.nodes, @@ -1742,18 +1921,46 @@ describe('RoutingNode', () => { source: null, } as IExecuteData; + const executeFunctions = mock(); + const executeSingleFunctions = Helpers.getExecuteSingleFunctions( + workflow, + runExecutionData, + runIndex, + node, + itemIndex, + ); + const nodeExecuteFunctions: Partial = { - getExecuteFunctions: () => mock(), - getExecuteSingleFunctions: () => - Helpers.getExecuteSingleFunctions( - workflow, - runExecutionData, - runIndex, - node, - itemIndex, - ), + getExecuteFunctions: () => executeFunctions, + getExecuteSingleFunctions: () => executeSingleFunctions, }; + const numberOfItems = testData.input.specialTestOptions?.numberOfItems ?? 1; + if (!inputData.main[0] || inputData.main[0].length !== numberOfItems) { + inputData.main[0] = []; + for (let i = 0; i < numberOfItems; i++) { + inputData.main[0].push({ json: {} }); + } + } + + const spy = jest.spyOn(utilsModule, 'sleep').mockReturnValue( + new Promise((resolve) => { + resolve(); + }), + ); + + spy.mockClear(); + + executeFunctions.getNodeParameter.mockImplementation( + (parameterName: string) => testData.input.node.parameters[parameterName] || {}, + ); + + const getNodeParameter = executeSingleFunctions.getNodeParameter; + executeSingleFunctions.getNodeParameter = (parameterName: string) => + parameterName in testData.input.node.parameters + ? testData.input.node.parameters[parameterName] + : getNodeParameter(parameterName) ?? {}; + const result = await routingNode.runNode( inputData, runIndex, @@ -1762,6 +1969,12 @@ describe('RoutingNode', () => { nodeExecuteFunctions as INodeExecuteFunctions, ); + if (testData.input.specialTestOptions?.sleepCalls) { + expect(spy.mock.calls).toEqual(testData.input.specialTestOptions?.sleepCalls); + } else { + expect(spy).toHaveBeenCalledTimes(0); + } + expect(result).toEqual(testData.output); }); } @@ -1820,6 +2033,7 @@ describe('RoutingNode', () => { }, baseURL: 'http://127.0.0.1:5678', returnFullResponse: true, + timeout: 300000, }, }, },