Skip to content

Commit

Permalink
feat: Enable parallel processing on multiple queue nodes (#6295)
Browse files Browse the repository at this point in the history
* Add non-parallel execution

* Add parallel processing for MQTT

* Fix logic expression for trigger

* fixes

* remove unused import

* fix MQTT parallel processing

* fix AMQPTrigger node parallelProcessing

* MQTTTrigger node default parallelProcessing to true

* add AMQP credential test

* improve error handling

---------

Co-authored-by: Marcus <marcus@n8n.io>
  • Loading branch information
2 people authored and netroy committed Aug 17, 2023
1 parent f3b380d commit e45461a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 6 deletions.
2 changes: 1 addition & 1 deletion packages/nodes-base/credentials/Amqp.credentials.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export class Amqp implements ICredentialType {
name: 'transportType',
type: 'string',
default: '',
description: 'Optional Transport Type to use',
description: 'Optional Transport Type to use. Either tcp or tls.',
},
];
}
53 changes: 52 additions & 1 deletion packages/nodes-base/nodes/Amqp/Amqp.node.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ContainerOptions, Dictionary, EventContext } from 'rhea';
import type { Connection, ContainerOptions, Dictionary, EventContext } from 'rhea';
import { create_container } from 'rhea';

import type {
Expand All @@ -7,6 +7,10 @@ import type {
INodeExecutionData,
INodeType,
INodeTypeDescription,
ICredentialTestFunctions,
INodeCredentialTestResult,
ICredentialsDecrypted,
ICredentialDataDecryptedObject,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';

Expand All @@ -28,6 +32,7 @@ export class Amqp implements INodeType {
{
name: 'amqp',
required: true,
testedBy: 'amqpConnectionTest',
},
],
properties: [
Expand Down Expand Up @@ -95,6 +100,52 @@ export class Amqp implements INodeType {
],
};

methods = {
credentialTest: {
async amqpConnectionTest(
this: ICredentialTestFunctions,
credential: ICredentialsDecrypted,
): Promise<INodeCredentialTestResult> {
const credentials = credential.data as ICredentialDataDecryptedObject;
const connectOptions: ContainerOptions = {
reconnect: false,
host: credentials.hostname as string,
hostname: credentials.hostname as string,
port: credentials.port as number,
username: credentials.username ? (credentials.username as string) : undefined,
password: credentials.password ? (credentials.password as string) : undefined,
transport: credentials.transportType ? (credentials.transportType as string) : undefined,
};

let conn: Connection | undefined = undefined;
try {
const container = create_container();
await new Promise<void>((resolve, reject) => {
container.on('connection_open', function (_contex: EventContext) {
resolve();
});
container.on('disconnected', function (context: EventContext) {
reject(context.error ?? new Error('unknown error'));
});
conn = container.connect(connectOptions);
});
} catch (error) {
return {
status: 'Error',
message: (error as Error).message,
};
} finally {
if (conn) (conn as Connection).close();
}

return {
status: 'OK',
message: 'Connection successful!',
};
},
},
};

async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
try {
const credentials = await this.getCredentials('amqp');
Expand Down
23 changes: 21 additions & 2 deletions packages/nodes-base/nodes/Amqp/AmqpTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import type {
INodeType,
INodeTypeDescription,
ITriggerResponse,
IDeferredPromise,
IRun,
} from 'n8n-workflow';
import { deepCopy, jsonParse, NodeOperationError } from 'n8n-workflow';

Expand Down Expand Up @@ -100,6 +102,13 @@ export class AmqpTrigger implements INodeType {
default: false,
description: 'Whether to return only the body property',
},
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
description: 'Whether to process messages in parallel',
},
{
displayName: 'Reconnect',
name: 'reconnect',
Expand Down Expand Up @@ -133,6 +142,7 @@ export class AmqpTrigger implements INodeType {
const clientname = this.getNodeParameter('clientname', '') as string;
const subscription = this.getNodeParameter('subscription', '') as string;
const options = this.getNodeParameter('options', {}) as IDataObject;
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;
const pullMessagesNumber = (options.pullMessagesNumber as number) || 100;
const containerId = options.containerId as string;
const containerReconnect = (options.reconnect as boolean) || true;
Expand All @@ -156,7 +166,7 @@ export class AmqpTrigger implements INodeType {
context.receiver?.add_credit(pullMessagesNumber);
});

container.on('message', (context: EventContext) => {
container.on('message', async (context: EventContext) => {
// No message in the context
if (!context.message) {
return;
Expand Down Expand Up @@ -195,7 +205,16 @@ export class AmqpTrigger implements INodeType {
data = data.body;
}

this.emit([this.helpers.returnJsonArray([data as any])]);
let responsePromise: IDeferredPromise<IRun> | undefined = undefined;
if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise();
}
if (responsePromise) {
this.emit([this.helpers.returnJsonArray([data as any])], undefined, responsePromise);
await responsePromise.promise();
} else {
this.emit([this.helpers.returnJsonArray([data as any])]);
}

if (!context.receiver?.has_credit()) {
setTimeout(() => {
Expand Down
23 changes: 21 additions & 2 deletions packages/nodes-base/nodes/MQTT/MqttTrigger.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import type {
INodeType,
INodeTypeDescription,
ITriggerResponse,
IDeferredPromise,
IRun,
} from 'n8n-workflow';
import { NodeOperationError } from 'n8n-workflow';

Expand Down Expand Up @@ -71,6 +73,14 @@ export class MqttTrigger implements INodeType {
default: false,
description: 'Whether to return only the message property',
},
{
displayName: 'Parallel Processing',
name: 'parallelProcessing',
type: 'boolean',
default: true,
description:
'Whether to process messages in parallel or by keeping the message in order',
},
],
},
],
Expand All @@ -89,6 +99,7 @@ export class MqttTrigger implements INodeType {
}

const options = this.getNodeParameter('options') as IDataObject;
const parallelProcessing = this.getNodeParameter('options.parallelProcessing', true) as boolean;

if (!topics) {
throw new NodeOperationError(this.getNode(), 'Topics are mandatory!');
Expand Down Expand Up @@ -147,7 +158,7 @@ export class MqttTrigger implements INodeType {
if (error) {
reject(error);
}
client.on('message', (topic: string, message: Buffer | string) => {
client.on('message', async (topic: string, message: Buffer | string) => {
let result: IDataObject = {};

message = message.toString();
Expand All @@ -165,7 +176,15 @@ export class MqttTrigger implements INodeType {
//@ts-ignore
result = [message as string];
}
this.emit([this.helpers.returnJsonArray(result)]);

let responsePromise: IDeferredPromise<IRun> | undefined;
if (!parallelProcessing) {
responsePromise = await this.helpers.createDeferredPromise();
}
this.emit([this.helpers.returnJsonArray([result])], undefined, responsePromise);
if (responsePromise) {
await responsePromise.promise();
}
resolve(true);
});
});
Expand Down

0 comments on commit e45461a

Please sign in to comment.