Skip to content

Commit

Permalink
refactor(RabbitMQ Trigger Node): Improve type-safety, add tests, and …
Browse files Browse the repository at this point in the history
…fix issues with manual triggers (#10663)
  • Loading branch information
netroy authored and riascho committed Sep 23, 2024
1 parent 7ad909b commit 9b8d954
Show file tree
Hide file tree
Showing 6 changed files with 465 additions and 210 deletions.
4 changes: 2 additions & 2 deletions packages/nodes-base/credentials/RabbitMQ.credentials.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ICredentialType, IDisplayOptions, INodeProperties } from 'n8n-workflow';
import type { ICredentialType, INodeProperties } from 'n8n-workflow';

export class RabbitMQ implements ICredentialType {
name = 'rabbitmq';
Expand Down Expand Up @@ -90,7 +90,7 @@ export class RabbitMQ implements ICredentialType {
ssl: [true],
passwordless: [true],
},
} as IDisplayOptions,
},
default: '',
description: 'SSL Client Certificate to use',
},
Expand Down
139 changes: 81 additions & 58 deletions packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,55 @@
import type { IDataObject, IExecuteFunctions, ITriggerFunctions } from 'n8n-workflow';
import { sleep } from 'n8n-workflow';
import type {
IDataObject,
IExecuteFunctions,
INodeExecutionData,
ITriggerFunctions,
} from 'n8n-workflow';
import { jsonParse, sleep } from 'n8n-workflow';
import * as amqplib from 'amqplib';
import { formatPrivateKey } from '@utils/utilities';
import type { ExchangeType, Options, RabbitMQCredentials, TriggerOptions } from './types';

export async function rabbitmqConnect(
this: IExecuteFunctions | ITriggerFunctions,
options: IDataObject,
): Promise<amqplib.Channel> {
const credentials = await this.getCredentials('rabbitmq');
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const;

const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'];

const credentialData: IDataObject = {};
credentialKeys.forEach((key) => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
});
export async function rabbitmqConnect(
credentials: RabbitMQCredentials,
): Promise<amqplib.Connection> {
const credentialData = credentialKeys.reduce((acc, key) => {
acc[key] = credentials[key] === '' ? undefined : credentials[key];
return acc;
}, {} as IDataObject) as amqplib.Options.Connect;

const optsData: IDataObject = {};
if (credentials.ssl === true) {
if (credentials.ssl) {
credentialData.protocol = 'amqps';

optsData.ca =
credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca as string))];
if (credentials.passwordless === true) {
credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca))];
if (credentials.passwordless) {
optsData.cert =
credentials.cert === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.cert as string));
credentials.cert === '' ? undefined : Buffer.from(formatPrivateKey(credentials.cert));
optsData.key =
credentials.key === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.key as string));
credentials.key === '' ? undefined : Buffer.from(formatPrivateKey(credentials.key));
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
}
}

return await amqplib.connect(credentialData, optsData);
}

export async function rabbitmqCreateChannel(
this: IExecuteFunctions | ITriggerFunctions,
): Promise<amqplib.Channel> {
const credentials = await this.getCredentials<RabbitMQCredentials>('rabbitmq');

return await new Promise(async (resolve, reject) => {
try {
const connection = await amqplib.connect(credentialData, optsData);

connection.on('error', (error: Error) => {
reject(error);
});

const channel = (await connection.createChannel().catch(console.warn)) as amqplib.Channel;

if (
options.arguments &&
((options.arguments as IDataObject).argument! as IDataObject[]).length
) {
const additionalArguments: IDataObject = {};
((options.arguments as IDataObject).argument as IDataObject[]).forEach(
(argument: IDataObject) => {
additionalArguments[argument.key as string] = argument.value;
},
);
options.arguments = additionalArguments;
}
const connection = await rabbitmqConnect(credentials);
// TODO: why is this error handler being added here?
connection.on('error', reject);

const channel = await connection.createChannel();
resolve(channel);
} catch (error) {
reject(error);
Expand All @@ -69,9 +60,9 @@ export async function rabbitmqConnect(
export async function rabbitmqConnectQueue(
this: IExecuteFunctions | ITriggerFunctions,
queue: string,
options: IDataObject,
options: Options | TriggerOptions,
): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);
const channel = await rabbitmqCreateChannel.call(this);

return await new Promise(async (resolve, reject) => {
try {
Expand All @@ -81,16 +72,10 @@ export async function rabbitmqConnectQueue(
await channel.checkQueue(queue);
}

if (options.binding && ((options.binding as IDataObject).bindings! as IDataObject[]).length) {
((options.binding as IDataObject).bindings as IDataObject[]).forEach(
async (binding: IDataObject) => {
await channel.bindQueue(
queue,
binding.exchange as string,
binding.routingKey as string,
);
},
);
if ('binding' in options && options.binding?.bindings.length) {
options.binding.bindings.forEach(async (binding) => {
await channel.bindQueue(queue, binding.exchange, binding.routingKey);
});
}

resolve(channel);
Expand All @@ -103,15 +88,15 @@ export async function rabbitmqConnectQueue(
export async function rabbitmqConnectExchange(
this: IExecuteFunctions | ITriggerFunctions,
exchange: string,
type: string,
options: IDataObject,
options: Options | TriggerOptions,
): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);
const exchangeType = this.getNodeParameter('exchangeType', 0) as ExchangeType;
const channel = await rabbitmqCreateChannel.call(this);

return await new Promise(async (resolve, reject) => {
try {
if (options.assertExchange) {
await channel.assertExchange(exchange, type, options);
await channel.assertExchange(exchange, exchangeType, options);
} else {
await channel.checkExchange(exchange);
}
Expand Down Expand Up @@ -170,3 +155,41 @@ export class MessageTracker {
await channel.connection.close();
}
}

export const parsePublishArguments = (options: Options) => {
const additionalArguments: IDataObject = {};
if (options.arguments?.argument.length) {
options.arguments.argument.forEach((argument) => {
additionalArguments[argument.key] = argument.value;
});
}
return additionalArguments as amqplib.Options.Publish;
};

export const parseMessage = async (
message: amqplib.Message,
options: TriggerOptions,
helpers: ITriggerFunctions['helpers'],
): Promise<INodeExecutionData> => {
if (options.contentIsBinary) {
const { content } = message;
message.content = undefined as unknown as Buffer;
return {
binary: {
data: await helpers.prepareBinaryData(content),
},
json: message as unknown as IDataObject,
};
} else {
let content: IDataObject | string = message.content.toString();
if (options.jsonParseBody) {
content = jsonParse(content);
}
if (options.onlyContent) {
return { json: content as IDataObject };
} else {
message.content = content as unknown as Buffer;
return { json: message as unknown as IDataObject };
}
}
};
57 changes: 15 additions & 42 deletions packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
import * as amqplib from 'amqplib';
import type { Options } from 'amqplib';
import type * as amqplib from 'amqplib';
import type {
IExecuteFunctions,
ICredentialsDecrypted,
Expand All @@ -14,8 +13,13 @@ import type {
} from 'n8n-workflow';
import { NodeApiError, NodeConnectionType, NodeOperationError } from 'n8n-workflow';

import { rabbitmqConnectExchange, rabbitmqConnectQueue } from './GenericFunctions';
import { formatPrivateKey } from '@utils/utilities';
import {
parsePublishArguments,
rabbitmqConnect,
rabbitmqConnectExchange,
rabbitmqConnectQueue,
} from './GenericFunctions';
import type { Options, RabbitMQCredentials } from './types';

export class RabbitMQ implements INodeType {
description: INodeTypeDescription = {
Expand Down Expand Up @@ -363,38 +367,8 @@ export class RabbitMQ implements INodeType {
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as IDataObject;
try {
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'];

const credentialData: IDataObject = {};
credentialKeys.forEach((key) => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
});

const optsData: IDataObject = {};
if (credentials.ssl === true) {
credentialData.protocol = 'amqps';

optsData.ca =
credentials.ca === ''
? undefined
: [Buffer.from(formatPrivateKey(credentials.ca as string))];
if (credentials.passwordless === true) {
optsData.cert =
credentials.cert === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.cert as string));
optsData.key =
credentials.key === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.key as string));
optsData.passphrase =
credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
}
}
const connection = await amqplib.connect(credentialData, optsData);
const connection = await rabbitmqConnect(credential.data as RabbitMQCredentials);
await connection.close();
} catch (error) {
return {
Expand All @@ -411,7 +385,7 @@ export class RabbitMQ implements INodeType {
};

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let channel, options: IDataObject;
let channel: amqplib.Channel | undefined;
try {
const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0);
Expand All @@ -424,7 +398,7 @@ export class RabbitMQ implements INodeType {
if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;

options = this.getNodeParameter('options', 0, {});
const options = this.getNodeParameter('options', 0, {}) as Options;

channel = await rabbitmqConnectQueue.call(this, queue, options);

Expand Down Expand Up @@ -457,7 +431,7 @@ export class RabbitMQ implements INodeType {
queuePromises.push(
channel.sendToQueue(queue, Buffer.from(message), {
headers,
...(options.arguments ? (options.arguments as Options.Publish) : {}),
...parsePublishArguments(options),
}),
);
}
Expand Down Expand Up @@ -492,12 +466,11 @@ export class RabbitMQ implements INodeType {
await channel.connection.close();
} else if (mode === 'exchange') {
const exchange = this.getNodeParameter('exchange', 0) as string;
const type = this.getNodeParameter('exchangeType', 0) as string;
const routingKey = this.getNodeParameter('routingKey', 0) as string;

options = this.getNodeParameter('options', 0, {});
const options = this.getNodeParameter('options', 0, {}) as Options;

channel = await rabbitmqConnectExchange.call(this, exchange, type, options);
channel = await rabbitmqConnectExchange.call(this, exchange, options);

const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;

Expand Down Expand Up @@ -529,7 +502,7 @@ export class RabbitMQ implements INodeType {
exchangePromises.push(
channel.publish(exchange, routingKey, Buffer.from(message), {
headers,
...(options.arguments ? (options.arguments as Options.Publish) : {}),
...parsePublishArguments(options),
}),
);
}
Expand Down
Loading

0 comments on commit 9b8d954

Please sign in to comment.