Wrapper for amqplib which simplifies its usage for publishing and subscribing
npm i --save amqpator
npx install-peerdeps --save amqpator # Optionally
You should have running RabbitMQ instance with management plugin installed.
npm run test:jest # Run Jest with coverage collection
npm run test:coverage # Send coverage to .coveralls.io
npm run test # npm run test:jest && npm run test:coverage
Also, you may set connection params for this instance via environment variables:
export AMQPATOR_HOST = 'localhost'
export AMQPATOR_PORT = 5672
export AMQPATOR_USERNAME = 'guest'
export AMQPATOR_PASSWORD = 'guest'
export AMQPATOR_VHOST = '/'
export AMQPATOR_PORT_HTTP = 15672
// Simple echo pub/sub
const Amqpator = require('amqpator')
const amqp = new Amqpator(/* {
host = 'localhost',
username = 'guest',
logger = console,
onConnectionClose = () => {},
onConnectionError = () => {},
connectionOptions = {},
password = 'guest',
port = 5672,
query = { heartbeat: 30 },
reconnect = true,
reconnectAttempts = 10,
reconnectInterval = 299,
vhost = '/',
} */)
const exchange = 'echo_exchange'
const routingKey = 'echo_exchange_routing_key'
amqp.getSub({
exchange,
exchangeOptions: {
autoDelete: true,
},
routingKey,
queue: 'echo_queue',
queueOptions: {
autoDelete: true,
exclusive: true,
},
onQueueMsg: ({ echo }, fields, properties) => {
console.log(echo, fields, properties)
},
}).then(
_ => _.subscribe()
)
amqp.getPub({
exchange,
exchangeOptions: {
autoDelete: true,
},
routingKey,
}).then(
_ => _.publish({
message: { echo: 'Echo' },
messageOptions: {
// expiration (string)
// userId (string)
// CC (string or array of string)
// priority (positive integer)
// persistent (boolean)
// deliveryMode (boolean or numeric)
// mandatory (boolean)
// BCC (string or array of string)
// immediate (boolean)
// contentType (string)
// contentEncoding (string)
// headers (object)
// correlationId (string)
// replyTo (string)
// messageId (string)
// timestamp (positive number)
// type (string)
// appId (string)
},
)
)