diff --git a/docs/topology.md b/docs/topology.md index fda563a..add748c 100644 --- a/docs/topology.md +++ b/docs/topology.md @@ -100,6 +100,12 @@ The unique option has 3 different possible values, each with its own behavior: > Note: the concept of queue recovery is that the same queue name will be generated in the event of a process restart. If using `hash` or `id`, the pid is used and a different queue name will be generated each time the process starts. +You can specify unique queues by their friendly-name when handling and subscribing. To get the actual assigned queue name (which you should not need), you can use: + +```js +const realQueueName = rabbot.getQueue('friendly-q-name').uniqueName; +``` + ## `rabbot.bindExchange( sourceExchange, targetExchange, [routingKeys], [connectionName] )` Binds the target exchange to the source exchange. Messages flow from source to target. diff --git a/spec/integration/queueSpecificHandle.spec.js b/spec/integration/queueSpecificHandle.spec.js index 306e14b..42e5f7f 100644 --- a/spec/integration/queueSpecificHandle.spec.js +++ b/spec/integration/queueSpecificHandle.spec.js @@ -8,71 +8,148 @@ const config = require('./configuration'); of the bound fanout queue's. */ describe('Queue Specific Handler', function () { - var harness; + describe('with standard queues', function () { + var harness; - before(function (done) { - rabbit.configure({ - connection: config.connection, - exchanges: [ - { - name: 'rabbot-ex.fanout', - type: 'fanout', - autoDelete: true - } - ], - queues: [ - { - name: 'rabbot-q.general1', - autoDelete: true, - subscribe: true - }, - { - name: 'rabbot-q.general2', - noAck: true, - autoDelete: true, - subscribe: true - } - ], - bindings: [ - { - exchange: 'rabbot-ex.fanout', - target: 'rabbot-q.general1', - keys: [] - }, - { - exchange: 'rabbot-ex.fanout', - target: 'rabbot-q.general2', - keys: [] - } - ] - }).then(() => { - rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'one' }); - rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'two' }); - rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'three' }); + before(function (done) { + rabbit.configure({ + connection: config.connection, + exchanges: [ + { + name: 'rabbot-ex.fanout', + type: 'fanout', + autoDelete: true + } + ], + queues: [ + { + name: 'rabbot-q.general1', + autoDelete: true, + subscribe: true + }, + { + name: 'rabbot-q.general2', + noAck: true, + autoDelete: true, + subscribe: true + } + ], + bindings: [ + { + exchange: 'rabbot-ex.fanout', + target: 'rabbot-q.general1', + keys: [] + }, + { + exchange: 'rabbot-ex.fanout', + target: 'rabbot-q.general2', + keys: [] + } + ] + }).then(() => { + rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'one' }); + rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'two' }); + rabbit.publish('rabbot-ex.fanout', { type: '', routingKey: '', body: 'three' }); + }); + + harness = harnessFactory(rabbit, done, 6); + harness.handle('', undefined, 'rabbot-q.general1'); }); - harness = harnessFactory(rabbit, done, 6); - harness.handle('', undefined, 'rabbot-q.general1'); - }); + it('should only handle messages for the specified queue', function () { + const results = harness.received.map((m) => ({ + body: m.body, + queue: m.queue + })); + sortBy(results, 'body').should.eql( + [ + { body: 'one', queue: 'rabbot-q.general1' }, + { body: 'three', queue: 'rabbot-q.general1' }, + { body: 'two', queue: 'rabbot-q.general1' } + ]); + }); - it('should only handle messages for the specified queue', function () { - const results = harness.received.map((m) => ({ - body: m.body, - queue: m.queue - })); - sortBy(results, 'body').should.eql( - [ - { body: 'one', queue: 'rabbot-q.general1' }, - { body: 'three', queue: 'rabbot-q.general1' }, - { body: 'two', queue: 'rabbot-q.general1' } - ]); - }); + it('should show the other messages as unhandled', function () { + harness.unhandled.length.should.eql(3); + }); - it('should show the other messages as unhandled', function () { - harness.unhandled.length.should.eql(3); + after(function () { + return harness.clean('default'); + }); }); - after(function () { - return harness.clean('default'); + describe('with unique queue', function () { + var harness; + + before(function (done) { + rabbit.configure({ + connection: config.connection, + exchanges: [ + { + name: 'rabbot-ex.topic', + type: 'topic', + autoDelete: true + } + ], + queues: [ + { + name: 'rabbot-q.general1', + autoDelete: true, + unique: 'hash', + subscribe: true + }, + { + name: 'rabbot-q.general2', + noAck: true, + autoDelete: true, + subscribe: true + } + ], + bindings: [ + { + exchange: 'rabbot-ex.topic', + target: 'rabbot-q.general1', + keys: ['a'] + }, + { + exchange: 'rabbot-ex.topic', + target: 'rabbot-q.general2', + keys: ['b'] + } + ] + }).then(() => { + rabbit.publish('rabbot-ex.topic', { type: 'a', body: 'one' }); + rabbit.publish('rabbot-ex.topic', { type: 'b', body: 'two' }); + rabbit.publish('rabbot-ex.topic', { type: 'a', body: 'three' }); + rabbit.publish('rabbot-ex.topic', { type: 'b', body: 'four' }); + rabbit.publish('rabbot-ex.topic', { type: 'a', body: 'five' }); + rabbit.publish('rabbot-ex.topic', { type: 'b', body: 'six' }); + }); + + harness = harnessFactory(rabbit, done, 6); + harness.handle('a', undefined, 'rabbot-q.general1'); + }); + + it('should only handle messages for the specified queue', function () { + const uniqueName = rabbit.getQueue('rabbot-q.general1').uniqueName + const results = harness.received.map((m) => ({ + body: m.body, + queue: m.queue + })); + sortBy(results, 'body').should.eql( + [ + { body: 'five', queue: uniqueName }, + { body: 'one', queue: uniqueName }, + { body: 'three', queue: uniqueName } + ]); + }); + + it('should show the other messages as unhandled', function () { + harness.unhandled.length.should.eql(3); + }); + + after(function () { + return harness.clean('default'); + }); }); }); diff --git a/src/amqp/queue.js b/src/amqp/queue.js index 55daa2c..5c9de09 100644 --- a/src/amqp/queue.js +++ b/src/amqp/queue.js @@ -351,7 +351,7 @@ function subscribe (channelName, channel, topology, serializers, messages, optio options.exclusive = true; } raw.queue = channelName; - var parts = [ channelName.replace(/[.]/g, '-') ]; + var parts = [ options.name.replace(/[.]/g, '-') ]; if (raw.type) { parts.push(raw.type); } diff --git a/src/index.js b/src/index.js index 9cdd4cc..3252f99 100644 --- a/src/index.js +++ b/src/index.js @@ -267,7 +267,7 @@ Broker.prototype.getExchange = function (name, connectionName = DEFAULT) { }; Broker.prototype.getQueue = function (name, connectionName = DEFAULT) { - return this.connections[ connectionName ].channels[ 'queue:' + name ]; + return this.connections[ connectionName ].channels[ `queue:${name}` ]; }; Broker.prototype.handle = function (messageType, handler, queueName, context) { diff --git a/src/queueFsm.js b/src/queueFsm.js index 62a9ceb..134b82c 100644 --- a/src/queueFsm.js +++ b/src/queueFsm.js @@ -28,6 +28,7 @@ var Factory = function (options, connection, topology, serializers, queueFn) { var Fsm = machina.Fsm.extend({ name: options.name, + uniqueName: options.uniqueName, responseSubscriptions: {}, signalSubscription: undefined, subscribed: false,