Skip to content

Commit

Permalink
Implement _removeOutgoingAndStoreMessage to summarize common processes
Browse files Browse the repository at this point in the history
  • Loading branch information
ogis-yamazaki committed Jul 13, 2023
1 parent caca723 commit 79a5789
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 23 deletions.
56 changes: 35 additions & 21 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1047,11 +1047,12 @@ class MqttClient extends EventEmitter {
* @example client.removeOutgoingMessage(client.getLastAllocated());
*/
removeOutgoingMessage (messageId) {
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
delete this.outgoing[messageId]
this.outgoingStore.del({ messageId }, function () {
cb(new Error('Message removed'))
})
if (this.outgoing[messageId]) {
const cb = this.outgoing[messageId].cb
this._removeOutgoingAndStoreMessage(messageId, function () {
cb(new Error('Message removed'))
})
}
return this
}

Expand Down Expand Up @@ -1593,7 +1594,7 @@ class MqttClient extends EventEmitter {
const messageId = packet.messageId
const type = packet.cmd
let response = null
let cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
const that = this
let err

Expand Down Expand Up @@ -1625,14 +1626,13 @@ class MqttClient extends EventEmitter {
if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
err = new Error('Publish error: ' + errors[pubackRC])
err.code = pubackRC
cb(err, packet)
// prevent invoking callback twice when deleting message from store #1511
cb = null
this._removeOutgoingAndStoreMessage(messageId, function () {
cb(err, packet)
})
} else {
this._removeOutgoingAndStoreMessage(messageId, cb)
}
delete this.outgoing[messageId]
this.outgoingStore.del(packet, cb)
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()

break
}
case 'pubrec': {
Expand All @@ -1646,13 +1646,9 @@ class MqttClient extends EventEmitter {
if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
err = new Error('Publish error: ' + errors[pubrecRC])
err.code = pubrecRC
cb(err, packet)
// prevent invoking callback twice when deleting message from store #1511
cb = null
delete this.outgoing[messageId]
this.outgoingStore.del(packet, cb)
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()
this._removeOutgoingAndStoreMessage(messageId, function () {
cb(err, packet)
})
} else {
this._sendPacket(response)
}
Expand Down Expand Up @@ -1896,7 +1892,9 @@ class MqttClient extends EventEmitter {
}

_invokeStoreProcessingQueue () {
if (this._storeProcessingQueue.length > 0) {
// If _storeProcessing is true, the message is resending.
// During resend, processing is skipped to prevent new messages from interrupting. #1635
if (!this._storeProcessing && this._storeProcessingQueue.length > 0) {
const f = this._storeProcessingQueue[0]
if (f && f.invoke()) {
this._storeProcessingQueue.shift()
Expand All @@ -1917,6 +1915,22 @@ class MqttClient extends EventEmitter {
}
this._storeProcessingQueue.splice(0)
}

/**
* _removeOutgoingAndStoreMessage
* @param {Number} messageId - messageId to remove message
* @param {Function} cb - called when the message removed
* @api private
*/
_removeOutgoingAndStoreMessage (messageId, cb) {
const self = this
delete this.outgoing[messageId]
self.outgoingStore.del({ messageId }, function (err, packet) {
cb(err, packet)
self.messageIdProvider.deallocate(messageId)
self._invokeStoreProcessingQueue()
})
}
}

module.exports = MqttClient
2 changes: 1 addition & 1 deletion lib/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class Store {
packet = this._inflights.get(packet.messageId)
if (packet) {
this._inflights.delete(packet.messageId)
if (cb) cb(null, packet)
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
Expand Down
2 changes: 1 addition & 1 deletion test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,7 @@ module.exports = function (server, config) {
})
})

it.only('should fire a callback (qos 2) on error', function (done) {
it('should fire a callback (qos 2) on error', function (done) {
// 145 = Packet Identifier in use
const pubrecReasonCode = 145
const pubOpts = { qos: 2 }
Expand Down

0 comments on commit 79a5789

Please sign in to comment.