This Project is no more maintained as of July 2023.
Node js Rabbit MQ client which has connection management backed into it. This project is written on top of amqp-connection-manager.
- Automatically reconnect when your amqplib broker dies in a fire.
- Round-robin connections between multiple brokers in a cluster.
- If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
- Very un-opinionated library - a thin wrapper around amqplib.
Using yarn: yarn add node-rabbitmq-client
Using npm: npm install node-rabbitmq-client
const RabbitMQClient = require('node-rabbitmq-client');
/**
* options is the object which is passed to consume at the time of initialization
*
* options = {
* queue: {
* name: 'some-queue-name'
* }
* }
*/
const { publish, consume, purgeQueue, ackAll } = RabbitMQClient;
/* to publish a message */
publish({ queue: { name: 'some name' } }, data);
`data` is JS object
/* to consume from a queue */
consume({ queue: { name: 'some name' } }, promiseHandler);
/* to purge a queue */
purge({ queue: { name: 'some name' } });
/* to ack all messages */
ackAll();
promiseHandler
for consume should always return a resolved Promise even if some operations on the received message fails.- When returning a resolved Promise, parameters need not be passed to it.If passed, these are simply ignored.
- Best practice is to implement a catch handler for the
promiseHandler
and push to some other queue and return a resolved Promise from there. - If parsing the JSON message fails while consuming, this will try to push this error to another queue
parsingErrors
. So, if this failure is to be handled and noted, provideparsingErrors
queue in the same config. (This is optional. Whether or not queue is provided, the error will be logged); promiseHandler
gets the message and the options that were passed to consume intially
/**
*
options is the object which is passed to consume at the time of initialization
{
queue: {
name: 'some-queue-name',
messagePriority: message priority (1-10), // set if the queue is a priority queue. It is optional
options: {
arguments: {
'x-max-priority': queue priority (1- 10) // set to make the queue a priority queue. It is optional
}
}
}
}
*/
promiseFunction(message, options)
.then(data => {
/* once processing the message is successful, return resolved promise */
/* if status queue is provided and success should be recorded */
if (statusQueue && recordSuccess) {
publish(statusQueue, {
status: 'success',
queueName,
message: data
});
}
/* this is needed to ack to the channel regarding this message */
return Promise.resolve();
})
.catch(error => {
if (statusQueue && recordError) {
/* if status queue is provided and failure should be recorded */
publish(statusQueue, {
status: 'error',
queueName,
error,
message
}).then(() => channel.ack(msg));
}
logger.log('error', {
note: `Error while processing the message from ${queueName}`,
error: error
});
/* return resolved Promise from here */
return Promise.resolve();
});
By default, this looks at /config/env/${NODE_ENV}
file for rabbitMQ configuration
If configuration is not found here, then it looks at /src/config/env/${NODE_ENV}
config = {
rabbitMQ: {
host: process.env.PUBSUB_RABBITMQ_SERVICE_HOST,
port: process.env.PUBSUB_RABBITMQ_SERVICE_PORT_AMQP || 5672,
username: process.env.RABBITMQ_USERNAME,
password: process.env.RABBITMQ_PASSWORD,
prefetch: process.env.PREFETCH_JOBS || 2,
vhost: process.env.VHOST || '/',
heartbeatInterval: process.env.HEARTBEAT || 5,
reconnectTime: process.env.RECONNECT_TIME || 10,
protocol: process.env.RABBITMQ_PROTOCOL || 'amqp',
defaultQueueFeatures: { durable: true }
}
}