Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed #952. #953

Merged
merged 2 commits into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,19 @@ function sendPacket (client, packet, cb) {
function flush (queue) {
if (queue) {
Object.keys(queue).forEach(function (messageId) {
if (typeof queue[messageId] === 'function') {
queue[messageId](new Error('Connection closed'))
if (typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
}
}

function flushVolatile (queue) {
if (queue) {
Object.keys(queue).forEach(function (messageId) {
if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
Expand Down Expand Up @@ -290,6 +301,7 @@ MqttClient.prototype._setupStream = function () {

// Echo stream close
this.stream.on('close', function () {
flushVolatile(that.outgoing)
that.emit('close')
})

Expand Down Expand Up @@ -447,7 +459,10 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {
case 1:
case 2:
// Add to callbacks
this.outgoing[packet.messageId] = callback || nop
this.outgoing[packet.messageId] = {
volatile: false,
cb: callback || nop
}
if (this._storeProcessing) {
this._packetIdsDuringStoreProcessing[packet.messageId] = false
this._storePacket(packet, undefined, opts.cbStorePut)
Expand Down Expand Up @@ -606,15 +621,18 @@ MqttClient.prototype.subscribe = function () {
that.messageIdToTopic[packet.messageId] = topics
}

this.outgoing[packet.messageId] = function (err, packet) {
if (!err) {
var granted = packet.granted
for (var i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
this.outgoing[packet.messageId] = {
volatile: true,
cb: function (err, packet) {
if (!err) {
var granted = packet.granted
for (var i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
}
}
}

callback(err, subs)
callback(err, subs)
}
}

this._sendPacket(packet)
Expand Down Expand Up @@ -678,7 +696,10 @@ MqttClient.prototype.unsubscribe = function () {
packet.properties = opts.properties
}

this.outgoing[packet.messageId] = callback
this.outgoing[packet.messageId] = {
volatile: true,
cb: callback
}

this._sendPacket(packet)

Expand Down Expand Up @@ -772,7 +793,7 @@ MqttClient.prototype.end = function () {
* @example client.removeOutgoingMessage(client.getLastMessageId());
*/
MqttClient.prototype.removeOutgoingMessage = function (mid) {
var cb = this.outgoing[mid]
var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
delete this.outgoing[mid]
this.outgoingStore.del({messageId: mid}, function () {
cb(new Error('Message removed'))
Expand Down Expand Up @@ -957,7 +978,7 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
} else if (packet.qos > 0) {
cb = this.outgoing[packet.messageId]
cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
this.outgoingStore.put(packet, function (err) {
if (err) {
return cb && cb(err)
Expand Down Expand Up @@ -1172,7 +1193,7 @@ MqttClient.prototype._handleAck = function (packet) {
var mid = packet.messageId
var type = packet.cmd
var response = null
var cb = this.outgoing[mid]
var cb = this.outgoing[mid] ? this.outgoing[mid].cb : null
var that = this
var err

Expand Down Expand Up @@ -1395,14 +1416,17 @@ MqttClient.prototype._onConnect = function (packet) {

// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId]
that.outgoing[packet.messageId] = function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
that.outgoing[packet.messageId] = {
volatile: false,
cb: function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}

storeDeliver()
}

storeDeliver()
}
that._packetIdsDuringStoreProcessing[packet.messageId] = true
that._sendPacket(packet)
Expand Down
51 changes: 51 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2682,6 +2682,57 @@ module.exports = function (server, config) {
})
})

it('should clear outgoing if close from server', function (done) {
var reconnect = false
var client = {}
var server2 = new Server(function (c) {
c.on('connect', function (packet) {
c.connack({returnCode: 0})
})
c.on('subscribe', function (packet) {
if (reconnect) {
c.suback({
messageId: packet.messageId,
granted: packet.subscriptions.map(function (e) {
return e.qos
})
})
} else {
c.destroy()
}
})
})

server2.listen(port + 50, function () {
client = mqtt.connect({
port: port + 50,
host: 'localhost',
clean: true,
clientId: 'cid1',
reconnectPeriod: 0
})

client.on('connect', function () {
client.subscribe('test', {qos: 2}, function (e) {
if (!e) {
client.end()
}
})
})

client.on('close', function () {
if (reconnect) {
server2.close()
done()
} else {
Object.keys(client.outgoing).length.should.equal(0)
reconnect = true
client.reconnect()
}
})
})
})

it('should resend in-flight QoS 1 publish messages from the client if clean is false', function (done) {
var reconnect = false
var client = {}
Expand Down