From 481e560f0f3b560d8e71ed3834fe24fd3644226a Mon Sep 17 00:00:00 2001 From: Takatoshi Kondo Date: Mon, 27 May 2019 17:08:14 +0900 Subject: [PATCH] Fixed #952. (#953) * Fixed #952. Conditional flush `outgoing` on close. Scenario: 1. The client connect to the server. 2. The client sends subscribe to the server. 3. The server destroys the client connection before suback sending. 4. The client detect `close` event, then reconnects to the server. At the step4, `outgoing` still stored the callback for subscribe. However, it has never called because server doen't send corresponding suback. The same thing happens on unsubscribe. So I defined subscribe/unsubscribe as volatile. The volatile type of `outgoing` entries should be cleared when `close` from the server is detected. On the contrary, QoS1 and QoS2 publish is not volatile. Because they are resent after reconnection. And then, callback in the `store` is called. This behavior shouldn't be changed. So I added `volatile` flag to `outgoing` element. * Fixed cb accessing code. If `outgoing[mid]` doesn't match, then accessing `outgoing[mid].cb` causes `Cannot read property` error. So added checking code. Fixed outgoing assignment at `_onConnect` function. --- lib/client.js | 66 ++++++++++++++++++++++++++++------------- test/abstract_client.js | 51 +++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 21 deletions(-) diff --git a/lib/client.js b/lib/client.js index a434f31c9..ca003bb6b 100644 --- a/lib/client.js +++ b/lib/client.js @@ -90,8 +90,19 @@ function sendPacket (client, packet, cb) { function flush (queue) { if (queue) { Object.keys(queue).forEach(function (messageId) { - if (typeof queue[messageId] === 'function') { - queue[messageId](new Error('Connection closed')) + if (typeof queue[messageId].cb === 'function') { + queue[messageId].cb(new Error('Connection closed')) + delete queue[messageId] + } + }) + } +} + +function flushVolatile (queue) { + if (queue) { + Object.keys(queue).forEach(function (messageId) { + if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { + queue[messageId].cb(new Error('Connection closed')) delete queue[messageId] } }) @@ -290,6 +301,7 @@ MqttClient.prototype._setupStream = function () { // Echo stream close this.stream.on('close', function () { + flushVolatile(that.outgoing) that.emit('close') }) @@ -447,7 +459,10 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) { case 1: case 2: // Add to callbacks - this.outgoing[packet.messageId] = callback || nop + this.outgoing[packet.messageId] = { + volatile: false, + cb: callback || nop + } if (this._storeProcessing) { this._packetIdsDuringStoreProcessing[packet.messageId] = false this._storePacket(packet, undefined, opts.cbStorePut) @@ -606,15 +621,18 @@ MqttClient.prototype.subscribe = function () { that.messageIdToTopic[packet.messageId] = topics } - this.outgoing[packet.messageId] = function (err, packet) { - if (!err) { - var granted = packet.granted - for (var i = 0; i < granted.length; i += 1) { - subs[i].qos = granted[i] + this.outgoing[packet.messageId] = { + volatile: true, + cb: function (err, packet) { + if (!err) { + var granted = packet.granted + for (var i = 0; i < granted.length; i += 1) { + subs[i].qos = granted[i] + } } - } - callback(err, subs) + callback(err, subs) + } } this._sendPacket(packet) @@ -678,7 +696,10 @@ MqttClient.prototype.unsubscribe = function () { packet.properties = opts.properties } - this.outgoing[packet.messageId] = callback + this.outgoing[packet.messageId] = { + volatile: true, + cb: callback + } this._sendPacket(packet) @@ -772,7 +793,7 @@ MqttClient.prototype.end = function () { * @example client.removeOutgoingMessage(client.getLastMessageId()); */ MqttClient.prototype.removeOutgoingMessage = function (mid) { - var cb = this.outgoing[mid] + var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null delete this.outgoing[mid] this.outgoingStore.del({messageId: mid}, function () { cb(new Error('Message removed')) @@ -957,7 +978,7 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { this.queue.push({ packet: packet, cb: cb }) } else if (packet.qos > 0) { - cb = this.outgoing[packet.messageId] + cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null this.outgoingStore.put(packet, function (err) { if (err) { return cb && cb(err) @@ -1172,7 +1193,7 @@ MqttClient.prototype._handleAck = function (packet) { var mid = packet.messageId var type = packet.cmd var response = null - var cb = this.outgoing[mid] + var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null var that = this var err @@ -1395,14 +1416,17 @@ MqttClient.prototype._onConnect = function (packet) { // Avoid unnecessary stream read operations when disconnected if (!that.disconnecting && !that.reconnectTimer) { - cb = that.outgoing[packet.messageId] - that.outgoing[packet.messageId] = function (err, status) { - // Ensure that the original callback passed in to publish gets invoked - if (cb) { - cb(err, status) + cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null + that.outgoing[packet.messageId] = { + volatile: false, + cb: function (err, status) { + // Ensure that the original callback passed in to publish gets invoked + if (cb) { + cb(err, status) + } + + storeDeliver() } - - storeDeliver() } that._packetIdsDuringStoreProcessing[packet.messageId] = true that._sendPacket(packet) diff --git a/test/abstract_client.js b/test/abstract_client.js index 814923a7d..017a2e377 100644 --- a/test/abstract_client.js +++ b/test/abstract_client.js @@ -2682,6 +2682,57 @@ module.exports = function (server, config) { }) }) + it('should clear outgoing if close from server', function (done) { + var reconnect = false + var client = {} + var server2 = new Server(function (c) { + c.on('connect', function (packet) { + c.connack({returnCode: 0}) + }) + c.on('subscribe', function (packet) { + if (reconnect) { + c.suback({ + messageId: packet.messageId, + granted: packet.subscriptions.map(function (e) { + return e.qos + }) + }) + } else { + c.destroy() + } + }) + }) + + server2.listen(port + 50, function () { + client = mqtt.connect({ + port: port + 50, + host: 'localhost', + clean: true, + clientId: 'cid1', + reconnectPeriod: 0 + }) + + client.on('connect', function () { + client.subscribe('test', {qos: 2}, function (e) { + if (!e) { + client.end() + } + }) + }) + + client.on('close', function () { + if (reconnect) { + server2.close() + done() + } else { + Object.keys(client.outgoing).length.should.equal(0) + reconnect = true + client.reconnect() + } + }) + }) + }) + it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) { var reconnect = false var client = {}