diff --git a/src/pubsub.js b/src/pubsub.js index 4547de9bf2..8e33761a01 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -37,8 +37,11 @@ module.exports = (node) => { if (!node.isStarted() && !floodSub.started) { throw new Error(NOT_STARTED_YET) } - - floodSub.removeListener(topic, handler) + if (!handler && !callback) { + floodSub.removeAllListeners(topic) + } else { + floodSub.removeListener(topic, handler) + } if (floodSub.listenerCount(topic) === 0) { floodSub.unsubscribe(topic) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 5cc24db825..aaa2ba928b 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -87,6 +87,51 @@ describe('.pubsub', () => { expect(err).to.not.exist().mark() }) }) + it('start two nodes and send one message, then unsubscribe without handler', (done) => { + // Check the final series error, and the publish handler + expect(3).checks(done) + + let nodes + const data = Buffer.from('test') + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() + } + + series([ + // Start the nodes + (cb) => startTwo((err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // Wait a moment before unsubscribing + (cb) => setTimeout(cb, 500), + // unsubscribe on the first + (cb) => { + nodes[0].pubsub.unsubscribe('pubsub') + // Wait a moment to make sure the ubsubscribe-from-all worked + setTimeout(cb, 500) + }, + // Verify unsubscribed + (cb) => { + nodes[0].pubsub.ls((err, topics) => { + expect(topics.length).to.eql(0).mark() + cb(err) + }) + }, + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) + }) }) describe('.pubsub off', () => {