Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(RabbitMQ Trigger Node): Improve type-safety, add tests, and fix issues with manual triggers #10663

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/nodes-base/credentials/RabbitMQ.credentials.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ICredentialType, IDisplayOptions, INodeProperties } from 'n8n-workflow';
import type { ICredentialType, INodeProperties } from 'n8n-workflow';

export class RabbitMQ implements ICredentialType {
name = 'rabbitmq';
Expand Down Expand Up @@ -90,7 +90,7 @@ export class RabbitMQ implements ICredentialType {
ssl: [true],
passwordless: [true],
},
} as IDisplayOptions,
},
default: '',
description: 'SSL Client Certificate to use',
},
Expand Down
139 changes: 81 additions & 58 deletions packages/nodes-base/nodes/RabbitMQ/GenericFunctions.ts
Original file line number Diff line number Diff line change
@@ -1,64 +1,55 @@
import type { IDataObject, IExecuteFunctions, ITriggerFunctions } from 'n8n-workflow';
import { sleep } from 'n8n-workflow';
import type {
IDataObject,
IExecuteFunctions,
INodeExecutionData,
ITriggerFunctions,
} from 'n8n-workflow';
import { jsonParse, sleep } from 'n8n-workflow';
import * as amqplib from 'amqplib';
import { formatPrivateKey } from '@utils/utilities';
import type { ExchangeType, Options, RabbitMQCredentials, TriggerOptions } from './types';

export async function rabbitmqConnect(
this: IExecuteFunctions | ITriggerFunctions,
options: IDataObject,
): Promise<amqplib.Channel> {
const credentials = await this.getCredentials('rabbitmq');
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'] as const;

const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'];

const credentialData: IDataObject = {};
credentialKeys.forEach((key) => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
});
export async function rabbitmqConnect(
credentials: RabbitMQCredentials,
): Promise<amqplib.Connection> {
const credentialData = credentialKeys.reduce((acc, key) => {
acc[key] = credentials[key] === '' ? undefined : credentials[key];
return acc;
}, {} as IDataObject) as amqplib.Options.Connect;

const optsData: IDataObject = {};
if (credentials.ssl === true) {
if (credentials.ssl) {
credentialData.protocol = 'amqps';

optsData.ca =
credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca as string))];
if (credentials.passwordless === true) {
credentials.ca === '' ? undefined : [Buffer.from(formatPrivateKey(credentials.ca))];
if (credentials.passwordless) {
optsData.cert =
credentials.cert === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.cert as string));
credentials.cert === '' ? undefined : Buffer.from(formatPrivateKey(credentials.cert));
optsData.key =
credentials.key === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.key as string));
credentials.key === '' ? undefined : Buffer.from(formatPrivateKey(credentials.key));
optsData.passphrase = credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
}
}

return await amqplib.connect(credentialData, optsData);
}

export async function rabbitmqCreateChannel(
this: IExecuteFunctions | ITriggerFunctions,
): Promise<amqplib.Channel> {
const credentials = await this.getCredentials<RabbitMQCredentials>('rabbitmq');

return await new Promise(async (resolve, reject) => {
try {
const connection = await amqplib.connect(credentialData, optsData);

connection.on('error', (error: Error) => {
reject(error);
});

const channel = (await connection.createChannel().catch(console.warn)) as amqplib.Channel;

if (
options.arguments &&
((options.arguments as IDataObject).argument! as IDataObject[]).length
) {
const additionalArguments: IDataObject = {};
((options.arguments as IDataObject).argument as IDataObject[]).forEach(
(argument: IDataObject) => {
additionalArguments[argument.key as string] = argument.value;
},
);
options.arguments = additionalArguments;
}
const connection = await rabbitmqConnect(credentials);
// TODO: why is this error handler being added here?
connection.on('error', reject);

Comment on lines +49 to +50
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't know why this line exists, since

  1. the connection can emit an error event long after this promise has been resolved
  2. the connection isn't going to emit an error event when creating a channel. that error is thrown in the createChannel call itself.

const channel = await connection.createChannel();
resolve(channel);
} catch (error) {
reject(error);
Expand All @@ -69,9 +60,9 @@ export async function rabbitmqConnect(
export async function rabbitmqConnectQueue(
this: IExecuteFunctions | ITriggerFunctions,
queue: string,
options: IDataObject,
options: Options | TriggerOptions,
): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);
const channel = await rabbitmqCreateChannel.call(this);

return await new Promise(async (resolve, reject) => {
try {
Expand All @@ -81,16 +72,10 @@ export async function rabbitmqConnectQueue(
await channel.checkQueue(queue);
}

if (options.binding && ((options.binding as IDataObject).bindings! as IDataObject[]).length) {
((options.binding as IDataObject).bindings as IDataObject[]).forEach(
async (binding: IDataObject) => {
await channel.bindQueue(
queue,
binding.exchange as string,
binding.routingKey as string,
);
},
);
if ('binding' in options && options.binding?.bindings.length) {
options.binding.bindings.forEach(async (binding) => {
await channel.bindQueue(queue, binding.exchange, binding.routingKey);
});
}

resolve(channel);
Expand All @@ -103,15 +88,15 @@ export async function rabbitmqConnectQueue(
export async function rabbitmqConnectExchange(
this: IExecuteFunctions | ITriggerFunctions,
exchange: string,
type: string,
options: IDataObject,
options: Options | TriggerOptions,
): Promise<amqplib.Channel> {
const channel = await rabbitmqConnect.call(this, options);
const exchangeType = this.getNodeParameter('exchangeType', 0) as ExchangeType;
const channel = await rabbitmqCreateChannel.call(this);

return await new Promise(async (resolve, reject) => {
try {
if (options.assertExchange) {
await channel.assertExchange(exchange, type, options);
await channel.assertExchange(exchange, exchangeType, options);
} else {
await channel.checkExchange(exchange);
}
Expand Down Expand Up @@ -170,3 +155,41 @@ export class MessageTracker {
await channel.connection.close();
}
}

export const parsePublishArguments = (options: Options) => {
const additionalArguments: IDataObject = {};
if (options.arguments?.argument.length) {
options.arguments.argument.forEach((argument) => {
additionalArguments[argument.key] = argument.value;
});
}
return additionalArguments as amqplib.Options.Publish;
};

export const parseMessage = async (
message: amqplib.Message,
options: TriggerOptions,
helpers: ITriggerFunctions['helpers'],
): Promise<INodeExecutionData> => {
if (options.contentIsBinary) {
const { content } = message;
message.content = undefined as unknown as Buffer;
return {
binary: {
data: await helpers.prepareBinaryData(content),
},
json: message as unknown as IDataObject,
};
} else {
let content: IDataObject | string = message.content.toString();
if (options.jsonParseBody) {
content = jsonParse(content);
}
if (options.onlyContent) {
return { json: content as IDataObject };
} else {
message.content = content as unknown as Buffer;
return { json: message as unknown as IDataObject };
}
}
};
57 changes: 15 additions & 42 deletions packages/nodes-base/nodes/RabbitMQ/RabbitMQ.node.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable n8n-nodes-base/node-filename-against-convention */
import * as amqplib from 'amqplib';
import type { Options } from 'amqplib';
import type * as amqplib from 'amqplib';
import type {
IExecuteFunctions,
ICredentialsDecrypted,
Expand All @@ -14,8 +13,13 @@ import type {
} from 'n8n-workflow';
import { NodeApiError, NodeConnectionType, NodeOperationError } from 'n8n-workflow';

import { rabbitmqConnectExchange, rabbitmqConnectQueue } from './GenericFunctions';
import { formatPrivateKey } from '@utils/utilities';
import {
parsePublishArguments,
rabbitmqConnect,
rabbitmqConnectExchange,
rabbitmqConnectQueue,
} from './GenericFunctions';
import type { Options, RabbitMQCredentials } from './types';

export class RabbitMQ implements INodeType {
description: INodeTypeDescription = {
Expand Down Expand Up @@ -363,38 +367,8 @@ export class RabbitMQ implements INodeType {
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as IDataObject;
try {
const credentialKeys = ['hostname', 'port', 'username', 'password', 'vhost'];

const credentialData: IDataObject = {};
credentialKeys.forEach((key) => {
credentialData[key] = credentials[key] === '' ? undefined : credentials[key];
});

const optsData: IDataObject = {};
if (credentials.ssl === true) {
credentialData.protocol = 'amqps';

optsData.ca =
credentials.ca === ''
? undefined
: [Buffer.from(formatPrivateKey(credentials.ca as string))];
if (credentials.passwordless === true) {
optsData.cert =
credentials.cert === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.cert as string));
optsData.key =
credentials.key === ''
? undefined
: Buffer.from(formatPrivateKey(credentials.key as string));
optsData.passphrase =
credentials.passphrase === '' ? undefined : credentials.passphrase;
optsData.credentials = amqplib.credentials.external();
}
}
const connection = await amqplib.connect(credentialData, optsData);
const connection = await rabbitmqConnect(credential.data as RabbitMQCredentials);
await connection.close();
} catch (error) {
return {
Expand All @@ -411,7 +385,7 @@ export class RabbitMQ implements INodeType {
};

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
let channel, options: IDataObject;
let channel: amqplib.Channel | undefined;
try {
const items = this.getInputData();
const operation = this.getNodeParameter('operation', 0);
Expand All @@ -424,7 +398,7 @@ export class RabbitMQ implements INodeType {
if (mode === 'queue') {
const queue = this.getNodeParameter('queue', 0) as string;

options = this.getNodeParameter('options', 0, {});
const options = this.getNodeParameter('options', 0, {}) as Options;

channel = await rabbitmqConnectQueue.call(this, queue, options);

Expand Down Expand Up @@ -457,7 +431,7 @@ export class RabbitMQ implements INodeType {
queuePromises.push(
channel.sendToQueue(queue, Buffer.from(message), {
headers,
...(options.arguments ? (options.arguments as Options.Publish) : {}),
...parsePublishArguments(options),
}),
);
}
Expand Down Expand Up @@ -492,12 +466,11 @@ export class RabbitMQ implements INodeType {
await channel.connection.close();
} else if (mode === 'exchange') {
const exchange = this.getNodeParameter('exchange', 0) as string;
const type = this.getNodeParameter('exchangeType', 0) as string;
const routingKey = this.getNodeParameter('routingKey', 0) as string;

options = this.getNodeParameter('options', 0, {});
const options = this.getNodeParameter('options', 0, {}) as Options;

channel = await rabbitmqConnectExchange.call(this, exchange, type, options);
channel = await rabbitmqConnectExchange.call(this, exchange, options);

const sendInputData = this.getNodeParameter('sendInputData', 0) as boolean;

Expand Down Expand Up @@ -529,7 +502,7 @@ export class RabbitMQ implements INodeType {
exchangePromises.push(
channel.publish(exchange, routingKey, Buffer.from(message), {
headers,
...(options.arguments ? (options.arguments as Options.Publish) : {}),
...parsePublishArguments(options),
}),
);
}
Expand Down
Loading
Loading