From d3556e03286d5de75aba2140e820e7e0d0a233e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 4 Sep 2024 12:18:02 +0200 Subject: [PATCH 1/3] fix(RabbitMQ Trigger Node): Handle existing messages during manual testing --- .../nodes/RabbitMQ/RabbitMQTrigger.node.ts | 224 ++++++++++-------- 1 file changed, 126 insertions(+), 98 deletions(-) diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index 31e3fec8eb7fa..d3abf36b2a0bd 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -1,4 +1,5 @@ /* eslint-disable n8n-nodes-base/node-filename-against-convention */ +import type { Message } from 'amqplib'; import type { IDataObject, IDeferredPromise, @@ -11,7 +12,7 @@ import type { ITriggerFunctions, ITriggerResponse, } from 'n8n-workflow'; -import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { rabbitDefaultOptions } from './DefaultOptions'; @@ -207,9 +208,67 @@ export class RabbitMQTrigger implements INodeType { const queue = this.getNodeParameter('queue') as string; const options = this.getNodeParameter('options', {}) as IDataObject; + const parseMessage = async (message: Message): Promise => { + if (options.contentIsBinary === true) { + const { content } = message; + message.content = undefined as unknown as Buffer; + return { + binary: { + data: await this.helpers.prepareBinaryData(content), + }, + json: message as unknown as IDataObject, + }; + } else { + const item: INodeExecutionData = { json: {} }; + let content: IDataObject | string = message.content.toString(); + if (options.jsonParseBody === true) { + content = jsonParse(content); + } + if (options.onlyContent === true) { + item.json = content as IDataObject; + } else { + message.content = content as unknown as Buffer; + item.json = message as unknown as IDataObject; + } + return item; + } + }; + const channel = await rabbitmqConnectQueue.call(this, queue, options); - let parallelMessages = + if (this.getMode() === 'manual') { + const manualTriggerFunction = async () => { + // Do only catch a single message when executing manually, else messages will leak + await channel.prefetch(1); + + const processMessage = async (message: Message | null) => { + if (message !== null) { + const item = await parseMessage(message); + channel.ack(message); + this.emit([[item]]); + } else { + // TODO: throw an error saying that rabbitmq closed the consumer + } + }; + + const existingMessage = await channel.get(queue); + if (existingMessage) await processMessage(existingMessage); + else await channel.consume(queue, processMessage); + }; + + const closeFunction = async () => { + await channel.close(); + await channel.connection.close(); + return; + }; + + return { + closeFunction, + manualTriggerFunction, + }; + } + + const parallelMessages = options.parallelMessages !== undefined && options.parallelMessages !== -1 ? parseInt(options.parallelMessages as string, 10) : -1; @@ -221,11 +280,6 @@ export class RabbitMQTrigger implements INodeType { ); } - if (this.getMode() === 'manual') { - // Do only catch a single message when executing manually, else messages will leak - parallelMessages = 1; - } - let acknowledgeMode = options.acknowledge ? options.acknowledge : 'immediately'; if (parallelMessages !== -1 && acknowledgeMode === 'immediately') { @@ -236,108 +290,82 @@ export class RabbitMQTrigger implements INodeType { } const messageTracker = new MessageTracker(); - let consumerTag: string; let closeGotCalled = false; - const startConsumer = async () => { - if (parallelMessages !== -1) { - await channel.prefetch(parallelMessages); - } - - channel.on('close', () => { - if (!closeGotCalled) { - this.emitError(new Error('Connection got closed unexpectedly')); - } - }); - - const consumerInfo = await channel.consume(queue, async (message) => { - if (message !== null) { - try { - if (acknowledgeMode !== 'immediately') { - messageTracker.received(message); - } + if (parallelMessages !== -1) { + await channel.prefetch(parallelMessages); + } - let content: IDataObject | string = message.content.toString(); + channel.on('close', () => { + if (!closeGotCalled) { + this.emitError(new Error('Connection got closed unexpectedly')); + } + }); - const item: INodeExecutionData = { - json: {}, - }; - if (options.contentIsBinary === true) { - item.binary = { - data: await this.helpers.prepareBinaryData(message.content), - }; + const consumerInfo = await channel.consume(queue, async (message) => { + if (message !== null) { + try { + if (acknowledgeMode !== 'immediately') { + messageTracker.received(message); + } - item.json = message as unknown as IDataObject; - message.content = undefined as unknown as Buffer; - } else { - if (options.jsonParseBody === true) { - content = JSON.parse(content); - } - if (options.onlyContent === true) { - item.json = content as IDataObject; - } else { - message.content = content as unknown as Buffer; - item.json = message as unknown as IDataObject; - } - } + const item = await parseMessage(message); - let responsePromise: IDeferredPromise | undefined = undefined; - let responsePromiseHook: IDeferredPromise | undefined = - undefined; - if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { - responsePromise = await this.helpers.createDeferredPromise(); - } else if (acknowledgeMode === 'laterMessageNode') { - responsePromiseHook = - await this.helpers.createDeferredPromise(); - } - if (responsePromiseHook) { - this.emit([[item]], responsePromiseHook, undefined); - } else { - this.emit([[item]], undefined, responsePromise); - } - if (responsePromise && acknowledgeMode !== 'laterMessageNode') { - // Acknowledge message after the execution finished - await responsePromise.promise().then(async (data: IRun) => { - if (data.data.resultData.error) { - // The execution did fail - if (acknowledgeMode === 'executionFinishesSuccessfully') { - channel.nack(message); - messageTracker.answered(message); - return; - } + let responsePromise: IDeferredPromise | undefined = undefined; + let responsePromiseHook: IDeferredPromise | undefined = + undefined; + if (acknowledgeMode !== 'immediately' && acknowledgeMode !== 'laterMessageNode') { + responsePromise = await this.helpers.createDeferredPromise(); + } else if (acknowledgeMode === 'laterMessageNode') { + responsePromiseHook = + await this.helpers.createDeferredPromise(); + } + if (responsePromiseHook) { + this.emit([[item]], responsePromiseHook, undefined); + } else { + this.emit([[item]], undefined, responsePromise); + } + if (responsePromise && acknowledgeMode !== 'laterMessageNode') { + // Acknowledge message after the execution finished + await responsePromise.promise().then(async (data: IRun) => { + if (data.data.resultData.error) { + // The execution did fail + if (acknowledgeMode === 'executionFinishesSuccessfully') { + channel.nack(message); + messageTracker.answered(message); + return; } - channel.ack(message); - messageTracker.answered(message); - }); - } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { - await responsePromiseHook.promise().then(() => { - channel.ack(message); - messageTracker.answered(message); - }); - } else { - // Acknowledge message directly + } channel.ack(message); - } - } catch (error) { - const workflow = this.getWorkflow(); - const node = this.getNode(); - if (acknowledgeMode !== 'immediately') { messageTracker.answered(message); - } - - this.logger.error( - `There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`, - { - node: node.name, - workflowId: workflow.id, - }, - ); + }); + } else if (responsePromiseHook && acknowledgeMode === 'laterMessageNode') { + await responsePromiseHook.promise().then(() => { + channel.ack(message); + messageTracker.answered(message); + }); + } else { + // Acknowledge message directly + channel.ack(message); + } + } catch (error) { + const workflow = this.getWorkflow(); + const node = this.getNode(); + if (acknowledgeMode !== 'immediately') { + messageTracker.answered(message); } + + this.logger.error( + `There was a problem with the RabbitMQ Trigger node "${node.name}" in workflow "${workflow.id}": "${error.message}"`, + { + node: node.name, + workflowId: workflow.id, + }, + ); } - }); - consumerTag = consumerInfo.consumerTag; - }; - await startConsumer(); + } + }); + const consumerTag = consumerInfo.consumerTag; // The "closeFunction" function gets called by n8n whenever // the workflow gets deactivated and can so clean up. From 40a80e3515094941caaa4aec6ff7fe360a47662b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 4 Sep 2024 13:24:42 +0200 Subject: [PATCH 2/3] improve type-safety --- .../credentials/RabbitMQ.credentials.ts | 4 +- .../nodes/RabbitMQ/GenericFunctions.ts | 135 ++++++++++-------- .../nodes/RabbitMQ/RabbitMQ.node.ts | 54 ++----- .../nodes/RabbitMQ/RabbitMQTrigger.node.ts | 54 ++----- packages/nodes-base/nodes/RabbitMQ/types.ts | 69 +++++++++ 5 files changed, 175 insertions(+), 141 deletions(-) create mode 100644 packages/nodes-base/nodes/RabbitMQ/types.ts diff --git a/packages/nodes-base/credentials/RabbitMQ.credentials.ts b/packages/nodes-base/credentials/RabbitMQ.credentials.ts index daebd2fcf36e6..5ea71e07430c8 100644 --- a/packages/nodes-base/credentials/RabbitMQ.credentials.ts +++ b/packages/nodes-base/credentials/RabbitMQ.credentials.ts @@ -1,4 +1,4 @@ -import type { ICredentialType, IDisplayOptions, INodeProperties } from 'n8n-workflow'; +import type { ICredentialType, INodeProperties } from 'n8n-workflow'; export class RabbitMQ implements ICredentialType { name = 'rabbitmq'; @@ -90,7 +90,7 @@ export class RabbitMQ implements ICredentialType { ssl: [true], passwordless: [true], }, - } as IDisplayOptions, + }, default: '', description: 'SSL Client Certificate to use', }, diff --git a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts index f85b20f078aa2..ed283bb31c7ca 100644 --- a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts +++ b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts @@ -1,64 +1,55 @@ -import type { IDataObject, IExecuteFunctions, ITriggerFunctions } from 'n8n-workflow'; -import { sleep } from 'n8n-workflow'; +import type { + IDataObject, + IExecuteFunctions, + INodeExecutionData, + ITriggerFunctions, +} from 'n8n-workflow'; +import { jsonParse, sleep } from 'n8n-workflow'; import * as amqplib from 'amqplib'; import { formatPrivateKey } from '@utils/utilities'; +import type { Options, RabbitMQCredentials, TriggerOptions } from './types'; -export async function rabbitmqConnect( - this: IExecuteFunctions | ITriggerFunctions, - options: IDataObject, -): Promise { - const credentials = await this.getCredentials('rabbitmq'); +const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const; - const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost']; - - const credentialData: IDataObject = {}; - credentialKeys.forEach((key) => { - credentialData[key] = credentials[key] === '' ? undefined : credentials[key]; - }); +export async function rabbitmqConnect( + credentials: RabbitMQCredentials, +): Promise { + const credentialData = credentialKeys.reduce((acc, key) => { + acc[key] = credentials[key] === '' ? undefined : credentials[key]; + return acc; + }, {} as IDataObject) as amqplib.Options.Connect; const optsData: IDataObject = {}; - if (credentials.ssl === true) { + if (credentials.ssl) { credentialData.protocol = 'amqps'; optsData.ca = - credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca as string))]; - if (credentials.passwordless === true) { + credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca))]; + if (credentials.passwordless) { optsData.cert = - credentials.cert === '' - ? undefined - : Buffer.from(formatPrivateKey(credentials.cert as string)); + credentials.cert === '' ? undefined : Buffer.from(formatPrivateKey(credentials.cert)); optsData.key = - credentials.key === '' - ? undefined - : Buffer.from(formatPrivateKey(credentials.key as string)); + credentials.key === '' ? undefined : Buffer.from(formatPrivateKey(credentials.key)); optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase; optsData.credentials = amqplib.credentials.external(); } } + return await amqplib.connect(credentialData, optsData); +} + +export async function rabbitmqCreateChannel( + this: IExecuteFunctions | ITriggerFunctions, +): Promise { + const credentials = await this.getCredentials('rabbitmq'); + return await new Promise(async (resolve, reject) => { try { - const connection = await amqplib.connect(credentialData, optsData); - - connection.on('error', (error: Error) => { - reject(error); - }); - - const channel = (await connection.createChannel().catch(console.warn)) as amqplib.Channel; - - if ( - options.arguments && - ((options.arguments as IDataObject).argument! as IDataObject[]).length - ) { - const additionalArguments: IDataObject = {}; - ((options.arguments as IDataObject).argument as IDataObject[]).forEach( - (argument: IDataObject) => { - additionalArguments[argument.key as string] = argument.value; - }, - ); - options.arguments = additionalArguments; - } + const connection = await rabbitmqConnect(credentials); + // TODO: why is this error handler being added here? + connection.on('error', reject); + const channel = await connection.createChannel(); resolve(channel); } catch (error) { reject(error); @@ -69,9 +60,9 @@ export async function rabbitmqConnect( export async function rabbitmqConnectQueue( this: IExecuteFunctions | ITriggerFunctions, queue: string, - options: IDataObject, + options: Options | TriggerOptions, ): Promise { - const channel = await rabbitmqConnect.call(this, options); + const channel = await rabbitmqCreateChannel.call(this); return await new Promise(async (resolve, reject) => { try { @@ -81,16 +72,10 @@ export async function rabbitmqConnectQueue( await channel.checkQueue(queue); } - if (options.binding && ((options.binding as IDataObject).bindings! as IDataObject[]).length) { - ((options.binding as IDataObject).bindings as IDataObject[]).forEach( - async (binding: IDataObject) => { - await channel.bindQueue( - queue, - binding.exchange as string, - binding.routingKey as string, - ); - }, - ); + if ('binding' in options && options.binding?.bindings.length) { + options.binding.bindings.forEach(async (binding) => { + await channel.bindQueue(queue, binding.exchange, binding.routingKey); + }); } resolve(channel); @@ -104,9 +89,9 @@ export async function rabbitmqConnectExchange( this: IExecuteFunctions | ITriggerFunctions, exchange: string, type: string, - options: IDataObject, + options: Options | TriggerOptions, ): Promise { - const channel = await rabbitmqConnect.call(this, options); + const channel = await rabbitmqCreateChannel.call(this); return await new Promise(async (resolve, reject) => { try { @@ -170,3 +155,41 @@ export class MessageTracker { await channel.connection.close(); } } + +export const parsePublishArguments = (options: Options) => { + const additionalArguments: IDataObject = {}; + if (options.arguments?.argument.length) { + options.arguments.argument.forEach((argument) => { + additionalArguments[argument.key] = argument.value; + }); + } + return additionalArguments as amqplib.Options.Publish; +}; + +export const parseMessage = async ( + message: amqplib.Message, + options: TriggerOptions, + helpers: ITriggerFunctions['helpers'], +): Promise => { + if (options.contentIsBinary) { + const { content } = message; + message.content = undefined as unknown as Buffer; + return { + binary: { + data: await helpers.prepareBinaryData(content), + }, + json: message as unknown as IDataObject, + }; + } else { + let content: IDataObject | string = message.content.toString(); + if (options.jsonParseBody) { + content = jsonParse(content); + } + if (options.onlyContent) { + return { json: content as IDataObject }; + } else { + message.content = content as unknown as Buffer; + return { json: message as unknown as IDataObject }; + } + } +}; diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts index 377ca289ba729..387eb2d8c4def 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -1,6 +1,5 @@ /* eslint-disable n8n-nodes-base/node-filename-against-convention */ -import * as amqplib from 'amqplib'; -import type { Options } from 'amqplib'; +import type * as amqplib from 'amqplib'; import type { IExecuteFunctions, ICredentialsDecrypted, @@ -14,8 +13,13 @@ import type { } from 'n8n-workflow'; import { NodeApiError, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; -import { rabbitmqConnectExchange, rabbitmqConnectQueue } from './GenericFunctions'; -import { formatPrivateKey } from '@utils/utilities'; +import { + parsePublishArguments, + rabbitmqConnect, + rabbitmqConnectExchange, + rabbitmqConnectQueue, +} from './GenericFunctions'; +import type { Options, RabbitMQCredentials } from './types'; export class RabbitMQ implements INodeType { description: INodeTypeDescription = { @@ -363,38 +367,8 @@ export class RabbitMQ implements INodeType { this: ICredentialTestFunctions, credential: ICredentialsDecrypted, ): Promise { - const credentials = credential.data as IDataObject; try { - const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost']; - - const credentialData: IDataObject = {}; - credentialKeys.forEach((key) => { - credentialData[key] = credentials[key] === '' ? undefined : credentials[key]; - }); - - const optsData: IDataObject = {}; - if (credentials.ssl === true) { - credentialData.protocol = 'amqps'; - - optsData.ca = - credentials.ca === '' - ? undefined - : [Buffer.from(formatPrivateKey(credentials.ca as string))]; - if (credentials.passwordless === true) { - optsData.cert = - credentials.cert === '' - ? undefined - : Buffer.from(formatPrivateKey(credentials.cert as string)); - optsData.key = - credentials.key === '' - ? undefined - : Buffer.from(formatPrivateKey(credentials.key as string)); - optsData.passphrase = - credentials.passphrase === '' ? undefined : credentials.passphrase; - optsData.credentials = amqplib.credentials.external(); - } - } - const connection = await amqplib.connect(credentialData, optsData); + const connection = await rabbitmqConnect(credential.data as RabbitMQCredentials); await connection.close(); } catch (error) { return { @@ -411,7 +385,7 @@ export class RabbitMQ implements INodeType { }; async execute(this: IExecuteFunctions): Promise { - let channel, options: IDataObject; + let channel: amqplib.Channel | undefined; try { const items = this.getInputData(); const operation = this.getNodeParameter('operation', 0); @@ -424,7 +398,7 @@ export class RabbitMQ implements INodeType { if (mode === 'queue') { const queue = this.getNodeParameter('queue', 0) as string; - options = this.getNodeParameter('options', 0, {}); + const options = this.getNodeParameter('options', 0, {}) as Options; channel = await rabbitmqConnectQueue.call(this, queue, options); @@ -457,7 +431,7 @@ export class RabbitMQ implements INodeType { queuePromises.push( channel.sendToQueue(queue, Buffer.from(message), { headers, - ...(options.arguments ? (options.arguments as Options.Publish) : {}), + ...parsePublishArguments(options), }), ); } @@ -495,7 +469,7 @@ export class RabbitMQ implements INodeType { const type = this.getNodeParameter('exchangeType', 0) as string; const routingKey = this.getNodeParameter('routingKey', 0) as string; - options = this.getNodeParameter('options', 0, {}); + const options = this.getNodeParameter('options', 0, {}) as Options; channel = await rabbitmqConnectExchange.call(this, exchange, type, options); @@ -529,7 +503,7 @@ export class RabbitMQ implements INodeType { exchangePromises.push( channel.publish(exchange, routingKey, Buffer.from(message), { headers, - ...(options.arguments ? (options.arguments as Options.Publish) : {}), + ...parsePublishArguments(options), }), ); } diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts index d3abf36b2a0bd..a12bd54f2ae70 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQTrigger.node.ts @@ -1,10 +1,8 @@ /* eslint-disable n8n-nodes-base/node-filename-against-convention */ import type { Message } from 'amqplib'; import type { - IDataObject, IDeferredPromise, IExecuteResponsePromiseData, - INodeExecutionData, INodeProperties, INodeType, INodeTypeDescription, @@ -12,11 +10,12 @@ import type { ITriggerFunctions, ITriggerResponse, } from 'n8n-workflow'; -import { jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow'; +import { NodeConnectionType, NodeOperationError } from 'n8n-workflow'; import { rabbitDefaultOptions } from './DefaultOptions'; -import { MessageTracker, rabbitmqConnectQueue } from './GenericFunctions'; +import { MessageTracker, rabbitmqConnectQueue, parseMessage } from './GenericFunctions'; +import type { TriggerOptions } from './types'; export class RabbitMQTrigger implements INodeType { description: INodeTypeDescription = { @@ -206,34 +205,7 @@ export class RabbitMQTrigger implements INodeType { async trigger(this: ITriggerFunctions): Promise { const queue = this.getNodeParameter('queue') as string; - const options = this.getNodeParameter('options', {}) as IDataObject; - - const parseMessage = async (message: Message): Promise => { - if (options.contentIsBinary === true) { - const { content } = message; - message.content = undefined as unknown as Buffer; - return { - binary: { - data: await this.helpers.prepareBinaryData(content), - }, - json: message as unknown as IDataObject, - }; - } else { - const item: INodeExecutionData = { json: {} }; - let content: IDataObject | string = message.content.toString(); - if (options.jsonParseBody === true) { - content = jsonParse(content); - } - if (options.onlyContent === true) { - item.json = content as IDataObject; - } else { - message.content = content as unknown as Buffer; - item.json = message as unknown as IDataObject; - } - return item; - } - }; - + const options = this.getNodeParameter('options', {}) as TriggerOptions; const channel = await rabbitmqConnectQueue.call(this, queue, options); if (this.getMode() === 'manual') { @@ -243,11 +215,11 @@ export class RabbitMQTrigger implements INodeType { const processMessage = async (message: Message | null) => { if (message !== null) { - const item = await parseMessage(message); + const item = await parseMessage(message, options, this.helpers); channel.ack(message); this.emit([[item]]); } else { - // TODO: throw an error saying that rabbitmq closed the consumer + this.emitError(new Error('Connection got closed unexpectedly')); } }; @@ -268,19 +240,15 @@ export class RabbitMQTrigger implements INodeType { }; } - const parallelMessages = - options.parallelMessages !== undefined && options.parallelMessages !== -1 - ? parseInt(options.parallelMessages as string, 10) - : -1; - - if (parallelMessages === 0 || parallelMessages < -1) { + const parallelMessages = options.parallelMessages ?? -1; + if (isNaN(parallelMessages) || parallelMessages === 0 || parallelMessages < -1) { throw new NodeOperationError( this.getNode(), - 'Parallel message processing limit must be greater than zero (or -1 for no limit)', + 'Parallel message processing limit must be a number greater than zero (or -1 for no limit)', ); } - let acknowledgeMode = options.acknowledge ? options.acknowledge : 'immediately'; + let acknowledgeMode = options.acknowledge ?? 'immediately'; if (parallelMessages !== -1 && acknowledgeMode === 'immediately') { // If parallel message limit is set, then the default mode is "executionFinishes" @@ -309,7 +277,7 @@ export class RabbitMQTrigger implements INodeType { messageTracker.received(message); } - const item = await parseMessage(message); + const item = await parseMessage(message, options, this.helpers); let responsePromise: IDeferredPromise | undefined = undefined; let responsePromiseHook: IDeferredPromise | undefined = diff --git a/packages/nodes-base/nodes/RabbitMQ/types.ts b/packages/nodes-base/nodes/RabbitMQ/types.ts new file mode 100644 index 0000000000000..30ff5e4183042 --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/types.ts @@ -0,0 +1,69 @@ +type Argument = { + key: string; + value?: string; +}; + +type Binding = { + exchange: string; + routingKey: string; +}; + +type Header = { + key: string; + value?: string; +}; + +export type Options = { + autoDelete: boolean; + assertExchange: boolean; + assertQueue: boolean; + durable: boolean; + exclusive: boolean; + arguments: { + argument: Argument[]; + }; + headers: { + header: Header[]; + }; +}; + +type ContentOptions = + | { + contentIsBinary: true; + } + | { + contentIsBinary: false; + jsonParseBody: boolean; + onlyContent: boolean; + }; + +export type TriggerOptions = Options & { + acknowledge: + | 'executionFinishes' + | 'executionFinishesSuccessfully' + | 'immediately' + | 'laterMessageNode'; + parallelMessages: number; + binding: { + bindings: Binding[]; + }; +} & ContentOptions; + +export type RabbitMQCredentials = { + hostname: string; + port: number; + username: string; + password: string; + vhost: string; +} & ( + | { ssl: false } + | ({ ssl: true; ca: string } & ( + | { passwordless: false } + | { + passwordless: true; + cert: string; + key: string; + passphrase: string; + } + )) +); From adf85a466e6073475484bbdce2be105a5fe2cd7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E0=A4=95=E0=A4=BE=E0=A4=B0=E0=A4=A4=E0=A5=8B=E0=A4=AB?= =?UTF-8?q?=E0=A5=8D=E0=A4=AB=E0=A5=87=E0=A4=B2=E0=A4=B8=E0=A5=8D=E0=A4=95?= =?UTF-8?q?=E0=A5=8D=E0=A4=B0=E0=A4=BF=E0=A4=AA=E0=A5=8D=E0=A4=9F=E2=84=A2?= Date: Wed, 4 Sep 2024 14:49:15 +0200 Subject: [PATCH 3/3] add some tests --- .../nodes/RabbitMQ/GenericFunctions.ts | 6 +- .../nodes/RabbitMQ/RabbitMQ.node.ts | 3 +- .../RabbitMQ/test/GenericFunctions.test.ts | 192 ++++++++++++++++++ packages/nodes-base/nodes/RabbitMQ/types.ts | 2 + 4 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts diff --git a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts index ed283bb31c7ca..c551e61863b32 100644 --- a/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts +++ b/packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts @@ -7,7 +7,7 @@ import type { import { jsonParse, sleep } from 'n8n-workflow'; import * as amqplib from 'amqplib'; import { formatPrivateKey } from '@utils/utilities'; -import type { Options, RabbitMQCredentials, TriggerOptions } from './types'; +import type { ExchangeType, Options, RabbitMQCredentials, TriggerOptions } from './types'; const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const; @@ -88,15 +88,15 @@ export async function rabbitmqConnectQueue( export async function rabbitmqConnectExchange( this: IExecuteFunctions | ITriggerFunctions, exchange: string, - type: string, options: Options | TriggerOptions, ): Promise { + const exchangeType = this.getNodeParameter('exchangeType', 0) as ExchangeType; const channel = await rabbitmqCreateChannel.call(this); return await new Promise(async (resolve, reject) => { try { if (options.assertExchange) { - await channel.assertExchange(exchange, type, options); + await channel.assertExchange(exchange, exchangeType, options); } else { await channel.checkExchange(exchange); } diff --git a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts index 387eb2d8c4def..29475c1ace77d 100644 --- a/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts +++ b/packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts @@ -466,12 +466,11 @@ export class RabbitMQ implements INodeType { await channel.connection.close(); } else if (mode === 'exchange') { const exchange = this.getNodeParameter('exchange', 0) as string; - const type = this.getNodeParameter('exchangeType', 0) as string; const routingKey = this.getNodeParameter('routingKey', 0) as string; const options = this.getNodeParameter('options', 0, {}) as Options; - channel = await rabbitmqConnectExchange.call(this, exchange, type, options); + channel = await rabbitmqConnectExchange.call(this, exchange, options); const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean; diff --git a/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts b/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts new file mode 100644 index 0000000000000..0532e67ae4e72 --- /dev/null +++ b/packages/nodes-base/nodes/RabbitMQ/test/GenericFunctions.test.ts @@ -0,0 +1,192 @@ +import type { Channel, Connection, ConsumeMessage, Message } from 'amqplib'; +import { mock } from 'jest-mock-extended'; +import type { ITriggerFunctions } from 'n8n-workflow'; + +const mockChannel = mock(); +const mockConnection = mock({ createChannel: async () => mockChannel }); +mockChannel.connection = mockConnection; +const connect = jest.fn().mockReturnValue(mockConnection); +jest.mock('amqplib', () => ({ connect })); + +import type { TriggerOptions } from '../types'; +import { + parseMessage, + rabbitmqConnect, + rabbitmqConnectExchange, + rabbitmqConnectQueue, + rabbitmqCreateChannel, + MessageTracker, +} from '../GenericFunctions'; + +describe('RabbitMQ GenericFunctions', () => { + const credentials = { + hostname: 'some.host', + port: 5672, + username: 'user', + password: 'pass', + vhost: '/', + }; + const context = mock(); + + beforeEach(() => jest.clearAllMocks()); + + describe('parseMessage', () => { + const helpers = mock(); + + it('should handle binary data', async () => { + const message = mock(); + const content = Buffer.from('test'); + message.content = content; + const options = mock({ contentIsBinary: true }); + helpers.prepareBinaryData.mockResolvedValue(mock()); + + const item = await parseMessage(message, options, helpers); + expect(item.json).toBe(message); + expect(item.binary?.data).toBeDefined(); + expect(helpers.prepareBinaryData).toHaveBeenCalledWith(content); + expect(message.content).toBeUndefined(); + }); + + it('should handle JSON data', async () => { + const message = mock(); + const content = Buffer.from(JSON.stringify({ test: 'test' })); + message.content = content; + const options = mock({ + contentIsBinary: false, + jsonParseBody: true, + onlyContent: false, + }); + + const item = await parseMessage(message, options, helpers); + expect(item.json).toBe(message); + expect(item.binary).toBeUndefined(); + expect(helpers.prepareBinaryData).not.toHaveBeenCalled(); + expect(message.content).toEqual({ test: 'test' }); + }); + + it('should return only content, when requested', async () => { + const message = mock(); + const content = Buffer.from(JSON.stringify({ test: 'test' })); + message.content = content; + const options = mock({ + contentIsBinary: false, + jsonParseBody: false, + onlyContent: true, + }); + + const item = await parseMessage(message, options, helpers); + expect(item.json).toBe(content.toString()); + expect(item.binary).toBeUndefined(); + expect(helpers.prepareBinaryData).not.toHaveBeenCalled(); + expect(message.content).toEqual(content); + }); + }); + + describe('rabbitmqConnect', () => { + it('should connect to RabbitMQ', async () => { + const connection = await rabbitmqConnect({ ...credentials, ssl: false }); + expect(connect).toHaveBeenCalledWith(credentials, {}); + expect(connection).toBe(mockConnection); + }); + + it('should connect to RabbitMQ over SSL', async () => { + const connection = await rabbitmqConnect({ + ...credentials, + ssl: true, + ca: 'ca', + passwordless: false, + }); + expect(connect).toHaveBeenCalledWith( + { ...credentials, protocol: 'amqps' }, + { ca: [Buffer.from('ca')] }, + ); + expect(connection).toBe(mockConnection); + }); + }); + + describe('rabbitmqCreateChannel', () => { + it('should create a channel', async () => { + context.getCredentials.mockResolvedValue(credentials); + const channel = await rabbitmqCreateChannel.call(context); + expect(channel).toBe(mockChannel); + }); + }); + + describe('rabbitmqConnectQueue', () => { + it('should assert a queue', async () => { + context.getCredentials.mockResolvedValue(credentials); + const options = mock({ assertQueue: true }); + await rabbitmqConnectQueue.call(context, 'queue', options); + + expect(mockChannel.assertQueue).toHaveBeenCalledWith('queue', options); + expect(mockChannel.checkQueue).not.toHaveBeenCalled(); + expect(mockChannel.bindQueue).not.toHaveBeenCalled(); + }); + + it('should check a queue', async () => { + context.getCredentials.mockResolvedValue(credentials); + const options = mock({ assertQueue: false }); + await rabbitmqConnectQueue.call(context, 'queue', options); + + expect(mockChannel.assertQueue).not.toHaveBeenCalled(); + expect(mockChannel.checkQueue).toHaveBeenCalledWith('queue'); + expect(mockChannel.bindQueue).not.toHaveBeenCalled(); + }); + }); + + describe('rabbitmqConnectExchange', () => { + it('should assert a queue', async () => { + context.getCredentials.mockResolvedValue(credentials); + context.getNodeParameter.calledWith('exchangeType', 0).mockReturnValue('topic'); + const options = mock({ assertExchange: true }); + await rabbitmqConnectExchange.call(context, 'exchange', options); + + expect(mockChannel.assertExchange).toHaveBeenCalledWith('exchange', 'topic', options); + expect(mockChannel.checkExchange).not.toHaveBeenCalled(); + }); + + it('should check a queue', async () => { + context.getCredentials.mockResolvedValue(credentials); + const options = mock({ assertExchange: false }); + await rabbitmqConnectExchange.call(context, 'exchange', options); + + expect(mockChannel.assertExchange).not.toHaveBeenCalled(); + expect(mockChannel.checkExchange).toHaveBeenCalledWith('exchange'); + }); + }); + + describe('MessageTracker', () => { + let messageTracker: MessageTracker; + + beforeEach(() => { + messageTracker = new MessageTracker(); + }); + + it('should track received messages', () => { + const message = { fields: { deliveryTag: 1 } } as ConsumeMessage; + messageTracker.received(message); + expect(messageTracker.messages).toContain(1); + }); + + it('should track answered messages', () => { + const message = { fields: { deliveryTag: 1 } } as ConsumeMessage; + messageTracker.received(message); + messageTracker.answered(message); + expect(messageTracker.messages).not.toContain(1); + }); + + it('should return the number of unanswered messages', () => { + const message = { fields: { deliveryTag: 1 } } as ConsumeMessage; + messageTracker.received(message); + expect(messageTracker.unansweredMessages()).toBe(1); + }); + + it('should close the channel and connection', async () => { + await messageTracker.closeChannel(mockChannel, 'consumerTag'); + + expect(mockChannel.cancel).toHaveBeenCalledWith('consumerTag'); + expect(mockChannel.close).toHaveBeenCalled(); + expect(mockConnection.close).toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/nodes-base/nodes/RabbitMQ/types.ts b/packages/nodes-base/nodes/RabbitMQ/types.ts index 30ff5e4183042..f70272d696bb5 100644 --- a/packages/nodes-base/nodes/RabbitMQ/types.ts +++ b/packages/nodes-base/nodes/RabbitMQ/types.ts @@ -67,3 +67,5 @@ export type RabbitMQCredentials = { } )) ); + +export type ExchangeType = 'direct' | 'topic' | 'headers' | 'fanout';