diff --git a/lib/client.js b/lib/client.js index c1757aaf6..0601dd2e7 100644 --- a/lib/client.js +++ b/lib/client.js @@ -18,9 +18,10 @@ const xtend = require('xtend') const debug = require('debug')('mqttjs:client') const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) } const setImmediate = global.setImmediate || function (callback) { - // works in node v0.8 - nextTick(callback) + const args = arguments.slice(1) + process.nextTick(callback.bind(null, ...args)) } + const defaultConnectOptions = { keepalive: 60, reschedulePings: true, @@ -89,11 +90,11 @@ const errors = { 162: 'Wildcard Subscriptions not supported' } -function defaultId () { +function defaultId() { return 'mqttjs_' + Math.random().toString(16).substr(2, 8) } -function applyTopicAlias (client, packet) { +function applyTopicAlias(client, packet) { if (client.options.protocolVersion === 5) { if (packet.cmd === 'publish') { let alias @@ -143,7 +144,7 @@ function applyTopicAlias (client, packet) { } } -function removeTopicAliasAndRecoverTopicName (client, packet) { +function removeTopicAliasAndRecoverTopicName(client, packet) { let alias if (packet.properties) { alias = packet.properties.topicAlias @@ -168,7 +169,7 @@ function removeTopicAliasAndRecoverTopicName (client, packet) { } } -function sendPacket (client, packet, cb) { +function sendPacket(client, packet, cb) { debug('sendPacket :: packet: %O', packet) debug('sendPacket :: emitting `packetsend`') @@ -186,7 +187,7 @@ function sendPacket (client, packet, cb) { } } -function flush (queue) { +function flush(queue) { if (queue) { debug('flush: queue exists? %b', !!(queue)) Object.keys(queue).forEach(function (messageId) { @@ -200,7 +201,7 @@ function flush (queue) { } } -function flushVolatile (queue) { +function flushVolatile(queue) { if (queue) { debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function') Object.keys(queue).forEach(function (messageId) { @@ -212,7 +213,7 @@ function flushVolatile (queue) { } } -function storeAndSend (client, packet, cb, cbStorePut) { +function storeAndSend(client, packet, cb, cbStorePut) { debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd) let storePacket = packet let err @@ -226,7 +227,7 @@ function storeAndSend (client, packet, cb, cbStorePut) { return cb && cb(err) } } - client.outgoingStore.put(storePacket, function storedPacket (err) { + client.outgoingStore.put(storePacket, function storedPacket(err) { if (err) { return cb && cb(err) } @@ -235,7 +236,7 @@ function storeAndSend (client, packet, cb, cbStorePut) { }) } -function nop (error) { +function nop(error) { debug('nop ::', error) } @@ -246,7 +247,7 @@ function nop (error) { * @param {Object} [options] - connection options * (see Connection#connect) */ -function MqttClient (streamBuilder, options) { +function MqttClient(streamBuilder, options) { let k const that = this @@ -338,7 +339,7 @@ function MqttClient (streamBuilder, options) { this.on('connect', function () { const queue = that.queue - function deliver () { + function deliver() { const entry = queue.shift() debug('deliver :: entry %o', entry) let packet = null @@ -426,7 +427,7 @@ MqttClient.prototype._setupStream = function () { packets.push(packet) }) - function nextTickWork () { + function nextTickWork() { if (packets.length) { nextTick(work) } else { @@ -436,7 +437,7 @@ MqttClient.prototype._setupStream = function () { } } - function work () { + function work() { debug('work :: getting next packet in queue') const packet = packets.shift() @@ -459,7 +460,7 @@ MqttClient.prototype._setupStream = function () { work() } - function streamErrorHandler (error) { + function streamErrorHandler(error) { debug('streamErrorHandler :: error', error.message) if (socketErrors.includes(error.code)) { // handle error @@ -747,7 +748,7 @@ MqttClient.prototype.subscribe = function () { debug('subscribe: array topic %s', topic) if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) || that._resubscribeTopics[topic].qos < opts.qos || - resubscribe) { + resubscribe) { const currentOpts = { topic: topic, qos: opts.qos @@ -769,7 +770,7 @@ MqttClient.prototype.subscribe = function () { debug('subscribe: object topic %s', k) if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) || that._resubscribeTopics[k].qos < obj[k].qos || - resubscribe) { + resubscribe) { const currentOpts = { topic: k, qos: obj[k].qos @@ -988,7 +989,7 @@ MqttClient.prototype.end = function (force, opts, cb) { debug('end :: cb? %s', !!cb) cb = cb || nop - function closeStores () { + function closeStores() { debug('end :: closeStores: closing incoming and outgoing stores') that.disconnected = true that.incomingStore.close(function (e1) { @@ -1007,7 +1008,7 @@ MqttClient.prototype.end = function (force, opts, cb) { } } - function finish () { + function finish() { // defer closesStores of an I/O cycle, // just to make sure things are // ok for websockets @@ -1263,7 +1264,7 @@ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) { * anyway it will result in -1 evaluation */ case 0: - /* falls through */ + /* falls through */ default: sendPacket(this, packet, cb) break @@ -1622,7 +1623,7 @@ MqttClient.prototype._handleAck = function (packet) { debug('_handleAck :: packet type', type) switch (type) { case 'pubcomp': - // same thing as puback for QoS 2 + // same thing as puback for QoS 2 case 'puback': { const pubackRC = packet.reasonCode // Callback - we're done @@ -1685,7 +1686,7 @@ MqttClient.prototype._handleAck = function (packet) { } if (this.disconnecting && - Object.keys(this.outgoing).length === 0) { + Object.keys(this.outgoing).length === 0) { this.emit('outgoingEmpty') } } @@ -1754,8 +1755,8 @@ MqttClient.prototype._resubscribe = function () { debug('_resubscribe') const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) if (!this._firstConnection && - (this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) && - _resubscribeTopicsKeys.length > 0) { + (this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) && + _resubscribeTopicsKeys.length > 0) { if (this.options.resubscribe) { if (this.options.protocolVersion === 5) { debug('_resubscribe: protocolVersion 5') @@ -1796,10 +1797,10 @@ MqttClient.prototype._onConnect = function (packet) { this.connected = true - function startStreamProcess () { + function startStreamProcess() { let outStore = that.outgoingStore.createStream() - function clearStoreProcessing () { + function clearStoreProcessing() { that._storeProcessing = false that._packetIdsDuringStoreProcessing = {} } @@ -1812,14 +1813,14 @@ MqttClient.prototype._onConnect = function (packet) { that.emit('error', err) }) - function remove () { + function remove() { outStore.destroy() outStore = null that._flushStoreProcessingQueue() clearStoreProcessing() } - function storeDeliver () { + function storeDeliver() { // edge case, we wrapped this twice if (!outStore) { return