From 20f5a475bc5d689c96fb746620ce281c4f8fb07e Mon Sep 17 00:00:00 2001 From: Yoseph Maguire Date: Thu, 19 Aug 2021 13:20:48 -0700 Subject: [PATCH 1/2] initial redesign --- lib/client.js | 1714 +--------------------- lib/connect/ali.js | 128 -- lib/connect/index.js | 164 --- lib/connect/tcp.js | 21 - lib/connect/tls.js | 45 - lib/connect/ws.js | 256 ---- lib/connect/wx.js | 134 -- lib/default-message-id-provider.js | 100 +- lib/errors.js | 47 + lib/handlers/auth.js | 3 + lib/handlers/connack.js | 3 + lib/handlers/connect.js | 6 + lib/handlers/disconnect.js | 3 + lib/handlers/index.js | 73 + lib/handlers/ping.js | 3 + lib/handlers/pingreq.js | 3 + lib/handlers/pingresp.js | 3 + lib/handlers/pub.js | 3 + lib/handlers/puback.js | 3 + lib/handlers/pubcomp.js | 3 + lib/handlers/pubrec.js | 3 + lib/handlers/pubrel.js | 3 + lib/handlers/sub.js | 3 + lib/handlers/suback.js | 3 + lib/handlers/unsub.js | 3 + lib/handlers/unsuback.js | 3 + lib/index.js | 13 + lib/unique-message-id-provider.js | 102 +- lib/{validations.js => validateTopic.js} | 8 +- mqtt.js | 21 - types/lib/client.d.ts | 2 +- 31 files changed, 334 insertions(+), 2545 deletions(-) delete mode 100644 lib/connect/ali.js delete mode 100644 lib/connect/index.js delete mode 100644 lib/connect/tcp.js delete mode 100644 lib/connect/tls.js delete mode 100644 lib/connect/ws.js delete mode 100644 lib/connect/wx.js create mode 100644 lib/errors.js create mode 100644 lib/handlers/auth.js create mode 100644 lib/handlers/connack.js create mode 100644 lib/handlers/connect.js create mode 100644 lib/handlers/disconnect.js create mode 100644 lib/handlers/index.js create mode 100644 lib/handlers/ping.js create mode 100644 lib/handlers/pingreq.js create mode 100644 lib/handlers/pingresp.js create mode 100644 lib/handlers/pub.js create mode 100644 lib/handlers/puback.js create mode 100644 lib/handlers/pubcomp.js create mode 100644 lib/handlers/pubrec.js create mode 100644 lib/handlers/pubrel.js create mode 100644 lib/handlers/sub.js create mode 100644 lib/handlers/suback.js create mode 100644 lib/handlers/unsub.js create mode 100644 lib/handlers/unsuback.js create mode 100644 lib/index.js rename lib/{validations.js => validateTopic.js} (90%) delete mode 100644 mqtt.js diff --git a/lib/client.js b/lib/client.js index c481e7a04..2d07df752 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,1682 +1,72 @@ 'use strict' -/** - * Module dependencies - */ -var EventEmitter = require('events').EventEmitter -var Store = require('./store') -var mqttPacket = require('mqtt-packet') -var DefaultMessageIdProvider = require('./default-message-id-provider') -var Writable = require('readable-stream').Writable -var inherits = require('inherits') -var reInterval = require('reinterval') -var validations = require('./validations') -var xtend = require('xtend') -var debug = require('debug')('mqttjs:client') -var nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) } -var setImmediate = global.setImmediate || function (callback) { - // works in node v0.8 - nextTick(callback) -} -var defaultConnectOptions = { - keepalive: 60, - reschedulePings: true, - protocolId: 'MQTT', - protocolVersion: 4, - reconnectPeriod: 1000, - connectTimeout: 30 * 1000, - clean: true, - resubscribe: true -} - -var socketErrors = [ - 'ECONNREFUSED', - 'EADDRINUSE', - 'ECONNRESET', - 'ENOTFOUND' -] - -// Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND. - -var errors = { - 0: '', - 1: 'Unacceptable protocol version', - 2: 'Identifier rejected', - 3: 'Server unavailable', - 4: 'Bad username or password', - 5: 'Not authorized', - 16: 'No matching subscribers', - 17: 'No subscription existed', - 128: 'Unspecified error', - 129: 'Malformed Packet', - 130: 'Protocol Error', - 131: 'Implementation specific error', - 132: 'Unsupported Protocol Version', - 133: 'Client Identifier not valid', - 134: 'Bad User Name or Password', - 135: 'Not authorized', - 136: 'Server unavailable', - 137: 'Server busy', - 138: 'Banned', - 139: 'Server shutting down', - 140: 'Bad authentication method', - 141: 'Keep Alive timeout', - 142: 'Session taken over', - 143: 'Topic Filter invalid', - 144: 'Topic Name invalid', - 145: 'Packet identifier in use', - 146: 'Packet Identifier not found', - 147: 'Receive Maximum exceeded', - 148: 'Topic Alias invalid', - 149: 'Packet too large', - 150: 'Message rate too high', - 151: 'Quota exceeded', - 152: 'Administrative action', - 153: 'Payload format invalid', - 154: 'Retain not supported', - 155: 'QoS not supported', - 156: 'Use another server', - 157: 'Server moved', - 158: 'Shared Subscriptions not supported', - 159: 'Connection rate exceeded', - 160: 'Maximum connect time', - 161: 'Subscription Identifiers not supported', - 162: 'Wildcard Subscriptions not supported' -} - -function defaultId () { - return 'mqttjs_' + Math.random().toString(16).substr(2, 8) -} - -function sendPacket (client, packet, cb) { - debug('sendPacket :: packet: %O', packet) - debug('sendPacket :: emitting `packetsend`') - client.emit('packetsend', packet) - - debug('sendPacket :: writing to stream') - var result = mqttPacket.writeToStream(packet, client.stream, client.options) - debug('sendPacket :: writeToStream result %s', result) - if (!result && cb) { - debug('sendPacket :: handle events on `drain` once through callback.') - client.stream.once('drain', cb) - } else if (cb) { - debug('sendPacket :: invoking cb') - cb() - } -} - -function flush (queue) { - if (queue) { - debug('flush: queue exists? %b', !!(queue)) - Object.keys(queue).forEach(function (messageId) { - if (typeof queue[messageId].cb === 'function') { - queue[messageId].cb(new Error('Connection closed')) - delete queue[messageId] - } - }) - } -} - -function flushVolatile (queue) { - if (queue) { - debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function') - Object.keys(queue).forEach(function (messageId) { - if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') { - queue[messageId].cb(new Error('Connection closed')) - delete queue[messageId] - } - }) - } -} - -function storeAndSend (client, packet, cb, cbStorePut) { - debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd) - client.outgoingStore.put(packet, function storedPacket (err) { - if (err) { - return cb && cb(err) - } - cbStorePut() - sendPacket(client, packet, cb) - }) -} - -function nop (error) { - debug('nop ::', error) -} - -/** - * MqttClient constructor - * - * @param {Stream} stream - stream - * @param {Object} [options] - connection options - * (see Connection#connect) - */ -function MqttClient (streamBuilder, options) { - var k - var that = this - - if (!(this instanceof MqttClient)) { - return new MqttClient(streamBuilder, options) - } - - this.options = options || {} - - // Defaults - for (k in defaultConnectOptions) { - if (typeof this.options[k] === 'undefined') { - this.options[k] = defaultConnectOptions[k] - } else { - this.options[k] = options[k] - } - } - - debug('MqttClient :: options.protocol', options.protocol) - debug('MqttClient :: options.protocolVersion', options.protocolVersion) - debug('MqttClient :: options.username', options.username) - debug('MqttClient :: options.keepalive', options.keepalive) - debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod) - debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized) - - this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId() - - debug('MqttClient :: clientId', this.options.clientId) - - this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) } - - this.streamBuilder = streamBuilder - - this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider - - // Inflight message storages - this.outgoingStore = options.outgoingStore || new Store() - this.incomingStore = options.incomingStore || new Store() - - // Should QoS zero messages be queued when the connection is broken? - this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero - - // map of subscribed topics to support reconnection - this._resubscribeTopics = {} +const mqtt = require('mqtt-packet') +// const eventEmitter = require('events') +const handle = require('handlers') +// const mqttErrors = require('errors') - // map of a subscribe messageId and a topic - this.messageIdToTopic = {} +// const logger = require('pino')() - // Ping timer, setup in _setupPingTimer - this.pingTimer = null - // Is the client connected? - this.connected = false - // Are we disconnecting? - this.disconnecting = false - // Packet queue - this.queue = [] - // connack timer - this.connackTimer = null - // Reconnect timer - this.reconnectTimer = null - // Is processing store? - this._storeProcessing = false - // Packet Ids are put into the store during store processing - this._packetIdsDuringStoreProcessing = {} - // Store processing queue - this._storeProcessingQueue = [] - - // Inflight callbacks - this.outgoing = {} - - // True if connection is first time. - this._firstConnection = true - - // Send queued packets - this.on('connect', function () { - var queue = this.queue - - function deliver () { - var entry = queue.shift() - debug('deliver :: entry %o', entry) - var packet = null - - if (!entry) { - return - } - - packet = entry.packet - debug('deliver :: call _sendPacket for %o', packet) - var send = true - if (packet.messageId && packet.messageId !== 0) { - if (!that.messageIdProvider.register(packet.messageId)) { - packet.messageeId = that.messageIdProvider.allocate() - if (packet.messageId === null) { - send = false - } - } - } - if (send) { - that._sendPacket( - packet, - function (err) { - if (entry.cb) { - entry.cb(err) - } - deliver() - } - ) - } else { - debug('messageId: %d has already used.', packet.messageId) - deliver() - } - } - - debug('connect :: sending queued packets') - deliver() - }) - - this.on('close', function () { - debug('close :: connected set to `false`') +export class Client { + constructor (options) { + this.closed = false + this.connecting = false this.connected = false - - debug('close :: clearing connackTimer') - clearTimeout(this.connackTimer) - - debug('close :: clearing ping timer') - if (that.pingTimer !== null) { - that.pingTimer.clear() - that.pingTimer = null - } - - debug('close :: calling _setupReconnect') - this._setupReconnect() - }) - EventEmitter.call(this) - - debug('MqttClient :: setting up stream') - this._setupStream() -} -inherits(MqttClient, EventEmitter) - -/** - * setup the event handlers in the inner stream. - * - * @api private - */ -MqttClient.prototype._setupStream = function () { - var connectPacket - var that = this - var writable = new Writable() - var parser = mqttPacket.parser(this.options) - var completeParse = null - var packets = [] - - debug('_setupStream :: calling method to clear reconnect') - this._clearReconnect() - - debug('_setupStream :: using streamBuilder provided to client to create stream') - this.stream = this.streamBuilder(this) - - parser.on('packet', function (packet) { - debug('parser :: on packet push to packets array.') - packets.push(packet) - }) - - function nextTickWork () { - if (packets.length) { - nextTick(work) - } else { - var done = completeParse - completeParse = null - done() - } - } - - function work () { - debug('work :: getting next packet in queue') - var packet = packets.shift() - - if (packet) { - debug('work :: packet pulled from queue') - that._handlePacket(packet, nextTickWork) - } else { - debug('work :: no packets in queue') - var done = completeParse - completeParse = null - debug('work :: done flag is %s', !!(done)) - if (done) done() - } - } - - writable._write = function (buf, enc, done) { - completeParse = done - debug('writable stream :: parsing buffer') - parser.parse(buf) - work() - } - - function streamErrorHandler (error) { - debug('streamErrorHandler :: error', error.message) - if (socketErrors.includes(error.code)) { - // handle error - debug('streamErrorHandler :: emitting error') - that.emit('error', error) - } else { - nop(error) - } - } - - debug('_setupStream :: pipe stream to writable stream') - this.stream.pipe(writable) - - // Suppress connection errors - this.stream.on('error', streamErrorHandler) - - // Echo stream close - this.stream.on('close', function () { - debug('(%s)stream :: on close', that.options.clientId) - flushVolatile(that.outgoing) - debug('stream: emit close to MqttClient') - that.emit('close') - }) - - // Send a connect packet - debug('_setupStream: sending packet `connect`') - connectPacket = Object.create(this.options) - connectPacket.cmd = 'connect' - // avoid message queue - sendPacket(this, connectPacket) - - // Echo connection errors - parser.on('error', this.emit.bind(this, 'error')) - - // auth - if (this.options.properties) { - if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) { - that.end(() => - this.emit('error', new Error('Packet has no Authentication Method') - )) - return this - } - if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') { - var authPacket = xtend({cmd: 'auth', reasonCode: 0}, this.options.authPacket) - sendPacket(this, authPacket) - } - } - - // many drain listeners are needed for qos 1 callbacks if the connection is intermittent - this.stream.setMaxListeners(1000) - - clearTimeout(this.connackTimer) - this.connackTimer = setTimeout(function () { - debug('!!connectTimeout hit!! Calling _cleanUp with force `true`') - that._cleanUp(true) - }, this.options.connectTimeout) -} - -MqttClient.prototype._handlePacket = function (packet, done) { - var options = this.options - - if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) { - this.emit('error', new Error('exceeding packets size ' + packet.cmd)) - this.end({reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' }}) - return this - } - debug('_handlePacket :: emitting packetreceive') - this.emit('packetreceive', packet) - - switch (packet.cmd) { - case 'publish': - this._handlePublish(packet, done) - break - case 'puback': - case 'pubrec': - case 'pubcomp': - case 'suback': - case 'unsuback': - this._handleAck(packet) - done() - break - case 'pubrel': - this._handlePubrel(packet, done) - break - case 'connack': - this._handleConnack(packet) - done() - break - case 'pingresp': - this._handlePingresp(packet) - done() - break - case 'disconnect': - this._handleDisconnect(packet) - done() - break - default: - // do nothing - // maybe we should do an error handling - // or just log it - break - } -} - -MqttClient.prototype._checkDisconnecting = function (callback) { - if (this.disconnecting) { - if (callback) { - callback(new Error('client disconnecting')) - } else { - this.emit('error', new Error('client disconnecting')) - } - } - return this.disconnecting -} - -/** - * publish - publish to - * - * @param {String} topic - topic to publish to - * @param {String, Buffer} message - message to publish - * @param {Object} [opts] - publish options, includes: - * {Number} qos - qos level to publish on - * {Boolean} retain - whether or not to retain the message - * {Boolean} dup - whether or not mark a message as duplicate - * {Function} cbStorePut - function(){} called when message is put into `outgoingStore` - * @param {Function} [callback] - function(err){} - * called when publish succeeds or fails - * @returns {MqttClient} this - for chaining - * @api public - * - * @example client.publish('topic', 'message'); - * @example - * client.publish('topic', 'message', {qos: 1, retain: true, dup: true}); - * @example client.publish('topic', 'message', console.log); - */ -MqttClient.prototype.publish = function (topic, message, opts, callback) { - debug('publish :: message `%s` to topic `%s`', message, topic) - var packet - var options = this.options - - // .publish(topic, payload, cb); - if (typeof opts === 'function') { - callback = opts - opts = null - } - - // default opts - var defaultOpts = {qos: 0, retain: false, dup: false} - opts = xtend(defaultOpts, opts) - - if (this._checkDisconnecting(callback)) { - return this - } - - var that = this - var publishProc = function () { - var messageId = 0 - if (opts.qos === 1 || opts.qos === 2) { - messageId = that._nextId() - if (messageId === null) { - debug('No messageId left') - return false - } - } - packet = { - cmd: 'publish', - topic: topic, - payload: message, - qos: opts.qos, - retain: opts.retain, - messageId: messageId, - dup: opts.dup - } - - if (options.protocolVersion === 5) { - packet.properties = opts.properties - if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) && - ((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) || - (!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) { - /* - if we are don`t setup topic alias or - topic alias maximum less than topic alias or - server don`t give topic alias maximum, - we are removing topic alias from packet - */ - delete packet.properties.topicAlias - } - } - - debug('publish :: qos', opts.qos) - switch (opts.qos) { - case 1: - case 2: - // Add to callbacks - that.outgoing[packet.messageId] = { - volatile: false, - cb: callback || nop - } - debug('MqttClient:publish: packet cmd: %s', packet.cmd) - that._sendPacket(packet, undefined, opts.cbStorePut) - break - default: - debug('MqttClient:publish: packet cmd: %s', packet.cmd) - that._sendPacket(packet, callback, opts.cbStorePut) - break - } - return true - } - - if (this._storeProcessing || this._storeProcessingQueue.length > 0) { - this._storeProcessingQueue.push( - { - 'invoke': publishProc, - 'cbStorePut': opts.cbStorePut, - 'callback': callback - } - ) - } else { - publishProc() - } - return this -} - -/** - * subscribe - subscribe to - * - * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} - * @param {Object} [opts] - optional subscription options, includes: - * {Number} qos - subscribe qos level - * @param {Function} [callback] - function(err, granted){} where: - * {Error} err - subscription error (none at the moment!) - * {Array} granted - array of {topic: 't', qos: 0} - * @returns {MqttClient} this - for chaining - * @api public - * @example client.subscribe('topic'); - * @example client.subscribe('topic', {qos: 1}); - * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); - * @example client.subscribe('topic', console.log); - */ -MqttClient.prototype.subscribe = function () { - var that = this - var args = new Array(arguments.length) - for (var i = 0; i < arguments.length; i++) { - args[i] = arguments[i] - } - var subs = [] - var obj = args.shift() - var resubscribe = obj.resubscribe - var callback = args.pop() || nop - var opts = args.pop() - var version = this.options.protocolVersion - - delete obj.resubscribe - - if (typeof obj === 'string') { - obj = [obj] - } - - if (typeof callback !== 'function') { - opts = callback - callback = nop - } - - var invalidTopic = validations.validateTopics(obj) - if (invalidTopic !== null) { - setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) - return this - } - - if (this._checkDisconnecting(callback)) { - debug('subscribe: discconecting true') - return this - } - - var defaultOpts = { - qos: 0 - } - if (version === 5) { - defaultOpts.nl = false - defaultOpts.rap = false - defaultOpts.rh = 0 - } - opts = xtend(defaultOpts, opts) - - if (Array.isArray(obj)) { - obj.forEach(function (topic) { - debug('subscribe: array topic %s', topic) - if (!that._resubscribeTopics.hasOwnProperty(topic) || - that._resubscribeTopics[topic].qos < opts.qos || - resubscribe) { - var currentOpts = { - topic: topic, - qos: opts.qos - } - if (version === 5) { - currentOpts.nl = opts.nl - currentOpts.rap = opts.rap - currentOpts.rh = opts.rh - currentOpts.properties = opts.properties - } - debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos) - subs.push(currentOpts) - } - }) - } else { - Object - .keys(obj) - .forEach(function (k) { - debug('subscribe: object topic %s', k) - if (!that._resubscribeTopics.hasOwnProperty(k) || - that._resubscribeTopics[k].qos < obj[k].qos || - resubscribe) { - var currentOpts = { - topic: k, - qos: obj[k].qos - } - if (version === 5) { - currentOpts.nl = obj[k].nl - currentOpts.rap = obj[k].rap - currentOpts.rh = obj[k].rh - currentOpts.properties = opts.properties - } - debug('subscribe: pushing `%s` to subs list', currentOpts) - subs.push(currentOpts) - } - }) - } - - if (!subs.length) { - callback(null, []) - return this - } - - var subscribeProc = function () { - var messageId = that._nextId() - if (messageId === null) { - debug('No messageId left') - return false - } - - var packet = { - cmd: 'subscribe', - subscriptions: subs, - qos: 1, - retain: false, - dup: false, - messageId: messageId - } - - if (opts.properties) { - packet.properties = opts.properties - } - - // subscriptions to resubscribe to in case of disconnect - if (that.options.resubscribe) { - debug('subscribe :: resubscribe true') - var topics = [] - subs.forEach(function (sub) { - if (that.options.reconnectPeriod > 0) { - var topic = { qos: sub.qos } - if (version === 5) { - topic.nl = sub.nl || false - topic.rap = sub.rap || false - topic.rh = sub.rh || 0 - topic.properties = sub.properties - } - that._resubscribeTopics[sub.topic] = topic - topics.push(sub.topic) - } - }) - that.messageIdToTopic[packet.messageId] = topics - } - - that.outgoing[packet.messageId] = { - volatile: true, - cb: function (err, packet) { - if (!err) { - var granted = packet.granted - for (var i = 0; i < granted.length; i += 1) { - subs[i].qos = granted[i] - } - } - - callback(err, subs) - } - } - debug('subscribe :: call _sendPacket') - that._sendPacket(packet) + this.errored = false + this.id = null + this.clean = true + this.version = null + + this._disconnected = false + this._authorized = false + this._parser = mqtt.parser() + this._defaultConnectOptions = { + keepalive: 60, + reschedulePings: true, + protocolId: 'MQTT', + protocolVersion: 4, + reconnectPeriod: 1000, + connectTimeout: 30 * 1000, + clean: true, + resubscribe: true + } + + this._options = options || {...this._defaultConnectOptions} + this._options.clientId = options.clientId || `mqttjs_ ${Math.random().toString(16).substr(2, 8)}` + this._parser.client = this + this._parser._queue = [] + this._parser.on('packet', this.enqueue) + this.once('connected', this.dequeue) + // TBD + } + + enqueue () { return true } - if (this._storeProcessing || this._storeProcessingQueue.length > 0) { - this._storeProcessingQueue.push( - { - 'invoke': subscribeProc, - 'callback': callback - } - ) - } else { - subscribeProc() - } - - return this -} - -/** - * unsubscribe - unsubscribe from topic(s) - * - * @param {String, Array} topic - topics to unsubscribe from - * @param {Object} [opts] - optional subscription options, includes: - * {Object} properties - properties of unsubscribe packet - * @param {Function} [callback] - callback fired on unsuback - * @returns {MqttClient} this - for chaining - * @api public - * @example client.unsubscribe('topic'); - * @example client.unsubscribe('topic', console.log); - */ -MqttClient.prototype.unsubscribe = function () { - var that = this - var args = new Array(arguments.length) - for (var i = 0; i < arguments.length; i++) { - args[i] = arguments[i] - } - var topic = args.shift() - var callback = args.pop() || nop - var opts = args.pop() - if (typeof topic === 'string') { - topic = [topic] - } - - if (typeof callback !== 'function') { - opts = callback - callback = nop - } - - var invalidTopic = validations.validateTopics(topic) - if (invalidTopic !== null) { - setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) - return this - } - - if (that._checkDisconnecting(callback)) { - return this - } - - var unsubscribeProc = function () { - var messageId = that._nextId() - if (messageId === null) { - debug('No messageId left') - return false - } - var packet = { - cmd: 'unsubscribe', - qos: 1, - messageId: messageId - } - - if (typeof topic === 'string') { - packet.unsubscriptions = [topic] - } else if (Array.isArray(topic)) { - packet.unsubscriptions = topic - } - - if (that.options.resubscribe) { - packet.unsubscriptions.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) - } - - if (typeof opts === 'object' && opts.properties) { - packet.properties = opts.properties - } - - that.outgoing[packet.messageId] = { - volatile: true, - cb: callback - } - - debug('unsubscribe: call _sendPacket') - that._sendPacket(packet) - + dequeue () { return true } - if (this._storeProcessing || this._storeProcessingQueue.length > 0) { - this._storeProcessingQueue.push( - { - 'invoke': unsubscribeProc, - 'callback': callback - } - ) - } else { - unsubscribeProc() - } - - return this -} - -/** - * end - close connection - * - * @returns {MqttClient} this - for chaining - * @param {Boolean} force - do not wait for all in-flight messages to be acked - * @param {Object} opts - added to the disconnect packet - * @param {Function} cb - called when the client has been closed - * - * @api public - */ -MqttClient.prototype.end = function (force, opts, cb) { - var that = this - - debug('end :: (%s)', this.options.clientId) - - if (force == null || typeof force !== 'boolean') { - cb = opts || nop - opts = force - force = false - if (typeof opts !== 'object') { - cb = opts - opts = null - if (typeof cb !== 'function') { - cb = nop - } - } - } - - if (typeof opts !== 'object') { - cb = opts - opts = null - } - - debug('end :: cb? %s', !!cb) - cb = cb || nop - - function closeStores () { - debug('end :: closeStores: closing incoming and outgoing stores') - that.disconnected = true - that.incomingStore.close(function (e1) { - that.outgoingStore.close(function (e2) { - debug('end :: closeStores: emitting end') - that.emit('end') - if (cb) { - let err = e1 || e2 - debug('end :: closeStores: invoking callback with args') - cb(err) - } - }) - }) - if (that._deferredReconnect) { - that._deferredReconnect() - } - } - - function finish () { - // defer closesStores of an I/O cycle, - // just to make sure things are - // ok for websockets - debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force) - that._cleanUp(force, () => { - debug('end :: finish :: calling process.nextTick on closeStores') - // var boundProcess = nextTick.bind(null, closeStores) - nextTick(closeStores.bind(that)) - }, opts) - } - - if (this.disconnecting) { - cb() - return this - } - - this._clearReconnect() - - this.disconnecting = true - - if (!force && Object.keys(this.outgoing).length > 0) { - // wait 10ms, just to be sure we received all of it - debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId) - this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) - } else { - debug('end :: (%s) :: immediately calling finish', that.options.clientId) - finish() - } - - return this -} - -/** - * removeOutgoingMessage - remove a message in outgoing store - * the outgoing callback will be called withe Error('Message removed') if the message is removed - * - * @param {Number} messageId - messageId to remove message - * @returns {MqttClient} this - for chaining - * @api public - * - * @example client.removeOutgoingMessage(client.getLastAllocated()); - */ -MqttClient.prototype.removeOutgoingMessage = function (messageId) { - var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null - delete this.outgoing[messageId] - this.outgoingStore.del({messageId: messageId}, function () { - cb(new Error('Message removed')) - }) - return this -} - -/** - * reconnect - connect again using the same options as connect() - * - * @param {Object} [opts] - optional reconnect options, includes: - * {Store} incomingStore - a store for the incoming packets - * {Store} outgoingStore - a store for the outgoing packets - * if opts is not given, current stores are used - * @returns {MqttClient} this - for chaining - * - * @api public - */ -MqttClient.prototype.reconnect = function (opts) { - debug('client reconnect') - var that = this - var f = function () { - if (opts) { - that.options.incomingStore = opts.incomingStore - that.options.outgoingStore = opts.outgoingStore - } else { - that.options.incomingStore = null - that.options.outgoingStore = null - } - that.incomingStore = that.options.incomingStore || new Store() - that.outgoingStore = that.options.outgoingStore || new Store() - that.disconnecting = false - that.disconnected = false - that._deferredReconnect = null - that._reconnect() - } - - if (this.disconnecting && !this.disconnected) { - this._deferredReconnect = f - } else { - f() - } - return this -} - -/** - * _reconnect - implement reconnection - * @api privateish - */ -MqttClient.prototype._reconnect = function () { - debug('_reconnect: emitting reconnect to client') - this.emit('reconnect') - if (this.connected) { - this.end(() => { this._setupStream() }) - debug('client already connected. disconnecting first.') - } else { - debug('_reconnect: calling _setupStream') - this._setupStream() - } -} - -/** - * _setupReconnect - setup reconnect timer - */ -MqttClient.prototype._setupReconnect = function () { - var that = this - - if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) { - if (!this.reconnecting) { - debug('_setupReconnect :: emit `offline` state') - this.emit('offline') - debug('_setupReconnect :: set `reconnecting` to `true`') - this.reconnecting = true - } - debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod) - that.reconnectTimer = setInterval(function () { - debug('reconnectTimer :: reconnect triggered!') - that._reconnect() - }, that.options.reconnectPeriod) - } else { - debug('_setupReconnect :: doing nothing...') - } -} - -/** - * _clearReconnect - clear the reconnect timer - */ -MqttClient.prototype._clearReconnect = function () { - debug('_clearReconnect : clearing reconnect timer') - if (this.reconnectTimer) { - clearInterval(this.reconnectTimer) - this.reconnectTimer = null - } -} - -/** - * _cleanUp - clean up on connection end - * @api private - */ -MqttClient.prototype._cleanUp = function (forced, done) { - var opts = arguments[2] - if (done) { - debug('_cleanUp :: done callback provided for on stream close') - this.stream.on('close', done) - } - - debug('_cleanUp :: forced? %s', forced) - if (forced) { - if ((this.options.reconnectPeriod === 0) && this.options.clean) { - flush(this.outgoing) - } - debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId) - this.stream.destroy() - } else { - var packet = xtend({ cmd: 'disconnect' }, opts) - debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId) - this._sendPacket( - packet, - setImmediate.bind( - null, - this.stream.end.bind(this.stream) - ) - ) - } - - if (!this.disconnecting) { - debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.') - this._clearReconnect() - this._setupReconnect() - } - - if (this.pingTimer !== null) { - debug('_cleanUp :: clearing pingTimer') - this.pingTimer.clear() - this.pingTimer = null - } - - if (done && !this.connected) { - debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId) - this.stream.removeListener('close', done) - done() - } -} - -/** - * _sendPacket - send or queue a packet - * @param {Object} packet - packet options - * @param {Function} cb - callback when the packet is sent - * @param {Function} cbStorePut - called when message is put into outgoingStore - * @api private - */ -MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) { - debug('_sendPacket :: (%s) :: start', this.options.clientId) - cbStorePut = cbStorePut || nop - - if (!this.connected) { - debug('_sendPacket :: client not connected. Storing packet offline.') - this._storePacket(packet, cb, cbStorePut) - return - } - - // When sending a packet, reschedule the ping timer - this._shiftPingInterval() - - switch (packet.cmd) { - case 'publish': - break - case 'pubrel': - storeAndSend(this, packet, cb, cbStorePut) - return - default: - sendPacket(this, packet, cb) - return - } - - switch (packet.qos) { - case 2: - case 1: - storeAndSend(this, packet, cb, cbStorePut) - break - /** - * no need of case here since it will be caught by default - * and jshint comply that before default it must be a break - * anyway it will result in -1 evaluation - */ - case 0: - /* falls through */ - default: - sendPacket(this, packet, cb) - break - } - debug('_sendPacket :: (%s) :: end', this.options.clientId) -} - -/** - * _storePacket - queue a packet - * @param {Object} packet - packet options - * @param {Function} cb - callback when the packet is sent - * @param {Function} cbStorePut - called when message is put into outgoingStore - * @api private - */ -MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) { - debug('_storePacket :: packet: %o', packet) - debug('_storePacket :: cb? %s', !!cb) - cbStorePut = cbStorePut || nop - - // check that the packet is not a qos of 0, or that the command is not a publish - if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { - this.queue.push({ packet: packet, cb: cb }) - } else if (packet.qos > 0) { - cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null - this.outgoingStore.put(packet, function (err) { - if (err) { - return cb && cb(err) - } - cbStorePut() - }) - } else if (cb) { - cb(new Error('No connection to broker')) - } -} - -/** - * _setupPingTimer - setup the ping timer - * - * @api private - */ -MqttClient.prototype._setupPingTimer = function () { - debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive) - var that = this - - if (!this.pingTimer && this.options.keepalive) { - this.pingResp = true - this.pingTimer = reInterval(function () { - that._checkPing() - }, this.options.keepalive * 1000) - } -} - -/** - * _shiftPingInterval - reschedule the ping interval - * - * @api private - */ -MqttClient.prototype._shiftPingInterval = function () { - if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { - this.pingTimer.reschedule(this.options.keepalive * 1000) - } -} -/** - * _checkPing - check if a pingresp has come back, and ping the server again - * - * @api private - */ -MqttClient.prototype._checkPing = function () { - debug('_checkPing :: checking ping...') - if (this.pingResp) { - debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`') - this.pingResp = false - this._sendPacket({ cmd: 'pingreq' }) - } else { - // do a forced cleanup since socket will be in bad shape - debug('_checkPing :: calling _cleanUp with force true') - this._cleanUp(true) - } -} - -/** - * _handlePingresp - handle a pingresp - * - * @api private - */ -MqttClient.prototype._handlePingresp = function () { - this.pingResp = true -} - -/** - * _handleConnack - * - * @param {Object} packet - * @api private - */ -MqttClient.prototype._handleConnack = function (packet) { - debug('_handleConnack') - var options = this.options - var version = options.protocolVersion - var rc = version === 5 ? packet.reasonCode : packet.returnCode - - clearTimeout(this.connackTimer) - - if (packet.properties) { - if (packet.properties.topicAliasMaximum) { - if (!options.properties) { options.properties = {} } - options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum - } - if (packet.properties.serverKeepAlive && options.keepalive) { - options.keepalive = packet.properties.serverKeepAlive - this._shiftPingInterval() - } - if (packet.properties.maximumPacketSize) { - if (!options.properties) { options.properties = {} } - options.properties.maximumPacketSize = packet.properties.maximumPacketSize - } - } - - if (rc === 0) { - this.reconnecting = false - this._onConnect(packet) - } else if (rc > 0) { - var err = new Error('Connection refused: ' + errors[rc]) - err.code = rc - this.emit('error', err) - } -} - -/** - * _handlePublish - * - * @param {Object} packet - * @api private - */ -/* -those late 2 case should be rewrite to comply with coding style: - -case 1: -case 0: - // do not wait sending a puback - // no callback passed - if (1 === qos) { - this._sendPacket({ - cmd: 'puback', - messageId: messageId - }); - } - // emit the message event for both qos 1 and 0 - this.emit('message', topic, message, packet); - this.handleMessage(packet, done); - break; -default: - // do nothing but every switch mus have a default - // log or throw an error about unknown qos - break; - -for now i just suppressed the warnings -*/ -MqttClient.prototype._handlePublish = function (packet, done) { - debug('_handlePublish: packet %o', packet) - done = typeof done !== 'undefined' ? done : nop - var topic = packet.topic.toString() - var message = packet.payload - var qos = packet.qos - var messageId = packet.messageId - var that = this - var options = this.options - var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] - debug('_handlePublish: qos %d', qos) - switch (qos) { - case 2: { - options.customHandleAcks(topic, message, packet, function (error, code) { - if (!(error instanceof Error)) { - code = error - error = null - } - if (error) { return that.emit('error', error) } - if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } - if (code) { - that._sendPacket({cmd: 'pubrec', messageId: messageId, reasonCode: code}, done) - } else { - that.incomingStore.put(packet, function () { - that._sendPacket({cmd: 'pubrec', messageId: messageId}, done) - }) - } - }) - break - } - case 1: { - // emit the message event - options.customHandleAcks(topic, message, packet, function (error, code) { - if (!(error instanceof Error)) { - code = error - error = null - } - if (error) { return that.emit('error', error) } - if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } - if (!code) { that.emit('message', topic, message, packet) } - that.handleMessage(packet, function (err) { - if (err) { - return done && done(err) - } - that._sendPacket({cmd: 'puback', messageId: messageId, reasonCode: code}, done) - }) - }) - break - } - case 0: - // emit the message event - this.emit('message', topic, message, packet) - this.handleMessage(packet, done) - break - default: - // do nothing - debug('_handlePublish: unknown QoS. Doing nothing.') - // log or throw an error about unknown qos - break - } -} - -/** - * Handle messages with backpressure support, one at a time. - * Override at will. - * - * @param Packet packet the packet - * @param Function callback call when finished - * @api public - */ -MqttClient.prototype.handleMessage = function (packet, callback) { - callback() -} - -/** - * _handleAck - * - * @param {Object} packet - * @api private - */ - -MqttClient.prototype._handleAck = function (packet) { - /* eslint no-fallthrough: "off" */ - var messageId = packet.messageId - var type = packet.cmd - var response = null - var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null - var that = this - var err - - if (!cb) { - debug('_handleAck :: Server sent an ack in error. Ignoring.') - // Server sent an ack in error, ignore it. - return - } - - // Process - debug('_handleAck :: packet type', type) - switch (type) { - case 'pubcomp': - // same thing as puback for QoS 2 - case 'puback': - var pubackRC = packet.reasonCode - // Callback - we're done - if (pubackRC && pubackRC > 0 && pubackRC !== 16) { - err = new Error('Publish error: ' + errors[pubackRC]) - err.code = pubackRC - cb(err, packet) - } - delete this.outgoing[messageId] - this.outgoingStore.del(packet, cb) - this.messageIdProvider.deallocate(messageId) - this._invokeStoreProcessingQueue() - break - case 'pubrec': - response = { - cmd: 'pubrel', - qos: 2, - messageId: messageId - } - var pubrecRC = packet.reasonCode - - if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { - err = new Error('Publish error: ' + errors[pubrecRC]) - err.code = pubrecRC - cb(err, packet) - } else { - this._sendPacket(response) - } - break - case 'suback': - delete this.outgoing[messageId] - this.messageIdProvider.deallocate(messageId) - for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { - if ((packet.granted[grantedI] & 0x80) !== 0) { - // suback with Failure status - var topics = this.messageIdToTopic[messageId] - if (topics) { - topics.forEach(function (topic) { - delete that._resubscribeTopics[topic] - }) - } - } - } - this._invokeStoreProcessingQueue() - cb(null, packet) - break - case 'unsuback': - delete this.outgoing[messageId] - this.messageIdProvider.deallocate(messageId) - this._invokeStoreProcessingQueue() - cb(null) - break - default: - that.emit('error', new Error('unrecognized packet type')) - } - - if (this.disconnecting && - Object.keys(this.outgoing).length === 0) { - this.emit('outgoingEmpty') - } -} - -/** - * _handlePubrel - * - * @param {Object} packet - * @api private - */ -MqttClient.prototype._handlePubrel = function (packet, callback) { - debug('handling pubrel packet') - callback = typeof callback !== 'undefined' ? callback : nop - var messageId = packet.messageId - var that = this - - var comp = {cmd: 'pubcomp', messageId: messageId} - - that.incomingStore.get(packet, function (err, pub) { - if (!err) { - that.emit('message', pub.topic, pub.payload, pub) - that.handleMessage(pub, function (err) { - if (err) { - return callback(err) - } - that.incomingStore.del(pub, nop) - that._sendPacket(comp, callback) - }) - } else { - that._sendPacket(comp, callback) - } - }) -} - -/** - * _handleDisconnect - * - * @param {Object} packet - * @api private - */ -MqttClient.prototype._handleDisconnect = function (packet) { - this.emit('disconnect', packet) -} - -/** - * _nextId - * @return unsigned int - */ -MqttClient.prototype._nextId = function () { - return this.messageIdProvider.allocate() -} - -/** - * getLastMessageId - * @return unsigned int - */ -MqttClient.prototype.getLastMessageId = function () { - return this.messageIdProvider.getLastAllocated() -} - -/** - * _resubscribe - * @api private - */ -MqttClient.prototype._resubscribe = function (connack) { - debug('_resubscribe') - var _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics) - if (!this._firstConnection && - (this.options.clean || (this.options.protocolVersion === 5 && !connack.sessionPresent)) && - _resubscribeTopicsKeys.length > 0) { - if (this.options.resubscribe) { - if (this.options.protocolVersion === 5) { - debug('_resubscribe: protocolVersion 5') - for (var topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) { - var resubscribeTopic = {} - resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]] - resubscribeTopic.resubscribe = true - this.subscribe(resubscribeTopic, {properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties}) - } - } else { - this._resubscribeTopics.resubscribe = true - this.subscribe(this._resubscribeTopics) - } - } else { - this._resubscribeTopics = {} - } + static async connect (options) { + return new Client(options) } - this._firstConnection = false -} - -/** - * _onConnect - * - * @api private - */ -MqttClient.prototype._onConnect = function (packet) { - if (this.disconnected) { - this.emit('connect', packet) - return + async publish (topic, message, opts) { + const result = await handle.publish(this, message) + return result } - var that = this - - this.messageIdProvider.clear() - this._setupPingTimer() - this._resubscribe(packet) - - this.connected = true - - function startStreamProcess () { - var outStore = that.outgoingStore.createStream() - - function clearStoreProcessing () { - that._storeProcessing = false - that._packetIdsDuringStoreProcessing = {} + async subscribe (packet) { + if (!packet.subscriptions) { + packet = {subscriptions: Array.isArray(packet) ? packet : [packet]} } - - that.once('close', remove) - outStore.on('error', function (err) { - clearStoreProcessing() - that._flushStoreProcessingQueue() - that.removeListener('close', remove) - that.emit('error', err) - }) - - function remove () { - outStore.destroy() - outStore = null - that._flushStoreProcessingQueue() - clearStoreProcessing() - } - - function storeDeliver () { - // edge case, we wrapped this twice - if (!outStore) { - return - } - that._storeProcessing = true - - var packet = outStore.read(1) - - var cb - - if (!packet) { - // read when data is available in the future - outStore.once('readable', storeDeliver) - return - } - - // Skip already processed store packets - if (that._packetIdsDuringStoreProcessing[packet.messageId]) { - storeDeliver() - return - } - - // Avoid unnecessary stream read operations when disconnected - if (!that.disconnecting && !that.reconnectTimer) { - cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null - that.outgoing[packet.messageId] = { - volatile: false, - cb: function (err, status) { - // Ensure that the original callback passed in to publish gets invoked - if (cb) { - cb(err, status) - } - - storeDeliver() - } - } - that._packetIdsDuringStoreProcessing[packet.messageId] = true - if (that.messageIdProvider.register(packet.messageId)) { - that._sendPacket(packet) - } else { - debug('messageId: %d has already used.', packet.messageId) - } - } else if (outStore.destroy) { - outStore.destroy() - } - } - - outStore.on('end', function () { - var allProcessed = true - for (var id in that._packetIdsDuringStoreProcessing) { - if (!that._packetIdsDuringStoreProcessing[id]) { - allProcessed = false - break - } - } - if (allProcessed) { - clearStoreProcessing() - that.removeListener('close', remove) - that._invokeAllStoreProcessingQueue() - that.emit('connect', packet) - } else { - startStreamProcess() - } - }) - storeDeliver() + const result = await handle.subscribe(this, packet) + return result } - // start flowing - startStreamProcess() -} -MqttClient.prototype._invokeStoreProcessingQueue = function () { - if (this._storeProcessingQueue.length > 0) { - var f = this._storeProcessingQueue[0] - if (f && f.invoke()) { - this._storeProcessingQueue.shift() - return true - } + async unsubscribe (packet) { + const result = await handle.unsubscribe(this, packet) + return result } - return false -} - -MqttClient.prototype._invokeAllStoreProcessingQueue = function () { - while (this._invokeStoreProcessingQueue()) {} } - -MqttClient.prototype._flushStoreProcessingQueue = function () { - for (var f of this._storeProcessingQueue) { - if (f.cbStorePut) f.cbStorePut(new Error('Connection closed')) - if (f.callback) f.callback(new Error('Connection closed')) - } - this._storeProcessingQueue.splice(0) -} - -module.exports = MqttClient diff --git a/lib/connect/ali.js b/lib/connect/ali.js deleted file mode 100644 index e7fe6a3c5..000000000 --- a/lib/connect/ali.js +++ /dev/null @@ -1,128 +0,0 @@ -'use strict' - -var Transform = require('readable-stream').Transform -var duplexify = require('duplexify') - -/* global FileReader */ -var my -var proxy -var stream -var isInitialized = false - -function buildProxy () { - var proxy = new Transform() - proxy._write = function (chunk, encoding, next) { - my.sendSocketMessage({ - data: chunk.buffer, - success: function () { - next() - }, - fail: function () { - next(new Error()) - } - }) - } - proxy._flush = function socketEnd (done) { - my.closeSocket({ - success: function () { - done() - } - }) - } - - return proxy -} - -function setDefaultOpts (opts) { - if (!opts.hostname) { - opts.hostname = 'localhost' - } - if (!opts.path) { - opts.path = '/' - } - - if (!opts.wsOptions) { - opts.wsOptions = {} - } -} - -function buildUrl (opts, client) { - var protocol = opts.protocol === 'alis' ? 'wss' : 'ws' - var url = protocol + '://' + opts.hostname + opts.path - if (opts.port && opts.port !== 80 && opts.port !== 443) { - url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path - } - if (typeof (opts.transformWsUrl) === 'function') { - url = opts.transformWsUrl(url, opts, client) - } - return url -} - -function bindEventHandler () { - if (isInitialized) return - - isInitialized = true - - my.onSocketOpen(function () { - stream.setReadable(proxy) - stream.setWritable(proxy) - stream.emit('connect') - }) - - my.onSocketMessage(function (res) { - if (typeof res.data === 'string') { - var buffer = Buffer.from(res.data, 'base64') - proxy.push(buffer) - } else { - var reader = new FileReader() - reader.addEventListener('load', function () { - var data = reader.result - - if (data instanceof ArrayBuffer) data = Buffer.from(data) - else data = Buffer.from(data, 'utf8') - proxy.push(data) - }) - reader.readAsArrayBuffer(res.data) - } - }) - - my.onSocketClose(function () { - stream.end() - stream.destroy() - }) - - my.onSocketError(function (res) { - stream.destroy(res) - }) -} - -function buildStream (client, opts) { - opts.hostname = opts.hostname || opts.host - - if (!opts.hostname) { - throw new Error('Could not determine host. Specify host manually.') - } - - var websocketSubProtocol = - (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) - ? 'mqttv3.1' - : 'mqtt' - - setDefaultOpts(opts) - - var url = buildUrl(opts, client) - my = opts.my - my.connectSocket({ - url: url, - protocols: websocketSubProtocol - }) - - proxy = buildProxy() - stream = duplexify.obj() - - bindEventHandler() - - return stream -} - -module.exports = buildStream diff --git a/lib/connect/index.js b/lib/connect/index.js deleted file mode 100644 index 97e7b4c15..000000000 --- a/lib/connect/index.js +++ /dev/null @@ -1,164 +0,0 @@ -'use strict' - -var MqttClient = require('../client') -var Store = require('../store') -var url = require('url') -var xtend = require('xtend') -var debug = require('debug')('mqttjs') - -var protocols = {} - -// eslint-disable-next-line camelcase -if ((typeof process !== 'undefined' && process.title !== 'browser') || typeof __webpack_require__ !== 'function') { - protocols.mqtt = require('./tcp') - protocols.tcp = require('./tcp') - protocols.ssl = require('./tls') - protocols.tls = require('./tls') - protocols.mqtts = require('./tls') -} else { - protocols.wx = require('./wx') - protocols.wxs = require('./wx') - - protocols.ali = require('./ali') - protocols.alis = require('./ali') -} - -protocols.ws = require('./ws') -protocols.wss = require('./ws') - -/** - * Parse the auth attribute and merge username and password in the options object. - * - * @param {Object} [opts] option object - */ -function parseAuthOptions (opts) { - var matches - if (opts.auth) { - matches = opts.auth.match(/^(.+):(.+)$/) - if (matches) { - opts.username = matches[1] - opts.password = matches[2] - } else { - opts.username = opts.auth - } - } -} - -/** - * connect - connect to an MQTT broker. - * - * @param {String} [brokerUrl] - url of the broker, optional - * @param {Object} opts - see MqttClient#constructor - */ -function connect (brokerUrl, opts) { - debug('connecting to an MQTT broker...') - if ((typeof brokerUrl === 'object') && !opts) { - opts = brokerUrl - brokerUrl = null - } - - opts = opts || {} - - if (brokerUrl) { - var parsed = url.parse(brokerUrl, true) - if (parsed.port != null) { - parsed.port = Number(parsed.port) - } - - opts = xtend(parsed, opts) - - if (opts.protocol === null) { - throw new Error('Missing protocol') - } - - opts.protocol = opts.protocol.replace(/:$/, '') - } - - // merge in the auth options if supplied - parseAuthOptions(opts) - - // support clientId passed in the query string of the url - if (opts.query && typeof opts.query.clientId === 'string') { - opts.clientId = opts.query.clientId - } - - if (opts.cert && opts.key) { - if (opts.protocol) { - if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { - switch (opts.protocol) { - case 'mqtt': - opts.protocol = 'mqtts' - break - case 'ws': - opts.protocol = 'wss' - break - case 'wx': - opts.protocol = 'wxs' - break - case 'ali': - opts.protocol = 'alis' - break - default: - throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!') - } - } - } else { - // A cert and key was provided, however no protocol was specified, so we will throw an error. - throw new Error('Missing secure protocol key') - } - } - - if (!protocols[opts.protocol]) { - var isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 - opts.protocol = [ - 'mqtt', - 'mqtts', - 'ws', - 'wss', - 'wx', - 'wxs', - 'ali', - 'alis' - ].filter(function (key, index) { - if (isSecure && index % 2 === 0) { - // Skip insecure protocols when requesting a secure one. - return false - } - return (typeof protocols[key] === 'function') - })[0] - } - - if (opts.clean === false && !opts.clientId) { - throw new Error('Missing clientId for unclean clients') - } - - if (opts.protocol) { - opts.defaultProtocol = opts.protocol - } - - function wrapper (client) { - if (opts.servers) { - if (!client._reconnectCount || client._reconnectCount === opts.servers.length) { - client._reconnectCount = 0 - } - - opts.host = opts.servers[client._reconnectCount].host - opts.port = opts.servers[client._reconnectCount].port - opts.protocol = (!opts.servers[client._reconnectCount].protocol ? opts.defaultProtocol : opts.servers[client._reconnectCount].protocol) - opts.hostname = opts.host - - client._reconnectCount++ - } - - debug('calling streambuilder for', opts.protocol) - return protocols[opts.protocol](client, opts) - } - var client = new MqttClient(wrapper, opts) - client.on('error', function () { /* Automatically set up client error handling */ }) - return client -} - -module.exports = connect -module.exports.connect = connect -module.exports.MqttClient = MqttClient -module.exports.Store = Store diff --git a/lib/connect/tcp.js b/lib/connect/tcp.js deleted file mode 100644 index 9912102eb..000000000 --- a/lib/connect/tcp.js +++ /dev/null @@ -1,21 +0,0 @@ -'use strict' -var net = require('net') -var debug = require('debug')('mqttjs:tcp') - -/* - variables port and host can be removed since - you have all required information in opts object -*/ -function streamBuilder (client, opts) { - var port, host - opts.port = opts.port || 1883 - opts.hostname = opts.hostname || opts.host || 'localhost' - - port = opts.port - host = opts.hostname - - debug('port %d and host %s', port, host) - return net.createConnection(port, host) -} - -module.exports = streamBuilder diff --git a/lib/connect/tls.js b/lib/connect/tls.js deleted file mode 100644 index aac296666..000000000 --- a/lib/connect/tls.js +++ /dev/null @@ -1,45 +0,0 @@ -'use strict' -var tls = require('tls') -var debug = require('debug')('mqttjs:tls') - -function buildBuilder (mqttClient, opts) { - var connection - opts.port = opts.port || 8883 - opts.host = opts.hostname || opts.host || 'localhost' - opts.servername = opts.host - - opts.rejectUnauthorized = opts.rejectUnauthorized !== false - - delete opts.path - - debug('port %d host %s rejectUnauthorized %b', opts.port, opts.host, opts.rejectUnauthorized) - - connection = tls.connect(opts) - /* eslint no-use-before-define: [2, "nofunc"] */ - connection.on('secureConnect', function () { - if (opts.rejectUnauthorized && !connection.authorized) { - connection.emit('error', new Error('TLS not authorized')) - } else { - connection.removeListener('error', handleTLSerrors) - } - }) - - function handleTLSerrors (err) { - // How can I get verify this error is a tls error? - if (opts.rejectUnauthorized) { - mqttClient.emit('error', err) - } - - // close this connection to match the behaviour of net - // otherwise all we get is an error from the connection - // and close event doesn't fire. This is a work around - // to enable the reconnect code to work the same as with - // net.createConnection - connection.end() - } - - connection.on('error', handleTLSerrors) - return connection -} - -module.exports = buildBuilder diff --git a/lib/connect/ws.js b/lib/connect/ws.js deleted file mode 100644 index 5c1d2c691..000000000 --- a/lib/connect/ws.js +++ /dev/null @@ -1,256 +0,0 @@ -'use strict' - -const WS = require('ws') -const debug = require('debug')('mqttjs:ws') -const duplexify = require('duplexify') -const Transform = require('readable-stream').Transform - -let WSS_OPTIONS = [ - 'rejectUnauthorized', - 'ca', - 'cert', - 'key', - 'pfx', - 'passphrase' -] -// eslint-disable-next-line camelcase -const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function' -function buildUrl (opts, client) { - let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path - if (typeof (opts.transformWsUrl) === 'function') { - url = opts.transformWsUrl(url, opts, client) - } - return url -} - -function setDefaultOpts (opts) { - let options = opts - if (!opts.hostname) { - options.hostname = 'localhost' - } - if (!opts.port) { - if (opts.protocol === 'wss') { - options.port = 443 - } else { - options.port = 80 - } - } - if (!opts.path) { - options.path = '/' - } - - if (!opts.wsOptions) { - options.wsOptions = {} - } - if (!IS_BROWSER && opts.protocol === 'wss') { - // Add cert/key/ca etc options - WSS_OPTIONS.forEach(function (prop) { - if (opts.hasOwnProperty(prop) && !opts.wsOptions.hasOwnProperty(prop)) { - options.wsOptions[prop] = opts[prop] - } - }) - } - - return options -} - -function setDefaultBrowserOpts (opts) { - let options = setDefaultOpts(opts) - - if (!options.hostname) { - options.hostname = options.host - } - - if (!options.hostname) { - // Throwing an error in a Web Worker if no `hostname` is given, because we - // can not determine the `hostname` automatically. If connecting to - // localhost, please supply the `hostname` as an argument. - if (typeof (document) === 'undefined') { - throw new Error('Could not determine host. Specify host manually.') - } - const parsed = new URL(document.URL) - options.hostname = parsed.hostname - - if (!options.port) { - options.port = parsed.port - } - } - - // objectMode should be defined for logic - if (options.objectMode === undefined) { - options.objectMode = !(options.binary === true || options.binary === undefined) - } - - return options -} - -function createWebSocket (client, url, opts) { - debug('createWebSocket') - debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion) - const websocketSubProtocol = - (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) - ? 'mqttv3.1' - : 'mqtt' - - debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol) - let socket = new WS(url, [websocketSubProtocol], opts.wsOptions) - return socket -} - -function createBrowserWebSocket (client, opts) { - const websocketSubProtocol = - (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) - ? 'mqttv3.1' - : 'mqtt' - - let url = buildUrl(opts, client) - /* global WebSocket */ - let socket = new WebSocket(url, [websocketSubProtocol]) - socket.binaryType = 'arraybuffer' - return socket -} - -function streamBuilder (client, opts) { - debug('streamBuilder') - let options = setDefaultOpts(opts) - const url = buildUrl(options, client) - let socket = createWebSocket(client, url, options) - let webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) - webSocketStream.url = url - socket.on('close', () => { webSocketStream.destroy() }) - return webSocketStream -} - -function browserStreamBuilder (client, opts) { - debug('browserStreamBuilder') - let stream - let options = setDefaultBrowserOpts(opts) - // sets the maximum socket buffer size before throttling - const bufferSize = options.browserBufferSize || 1024 * 512 - - const bufferTimeout = opts.browserBufferTimeout || 1000 - - const coerceToBuffer = !opts.objectMode - - let socket = createBrowserWebSocket(client, opts) - - let proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) - - if (!opts.objectMode) { - proxy._writev = writev - } - proxy.on('close', () => { socket.close() }) - - const eventListenerSupport = (typeof socket.addEventListener !== 'undefined') - - // was already open when passed in - if (socket.readyState === socket.OPEN) { - stream = proxy - } else { - stream = stream = duplexify(undefined, undefined, opts) - if (!opts.objectMode) { - stream._writev = writev - } - - if (eventListenerSupport) { - socket.addEventListener('open', onopen) - } else { - socket.onopen = onopen - } - } - - stream.socket = socket - - if (eventListenerSupport) { - socket.addEventListener('close', onclose) - socket.addEventListener('error', onerror) - socket.addEventListener('message', onmessage) - } else { - socket.onclose = onclose - socket.onerror = onerror - socket.onmessage = onmessage - } - - // methods for browserStreamBuilder - - function buildProxy (options, socketWrite, socketEnd) { - let proxy = new Transform({ - objectModeMode: options.objectMode - }) - - proxy._write = socketWrite - proxy._flush = socketEnd - - return proxy - } - - function onopen () { - stream.setReadable(proxy) - stream.setWritable(proxy) - stream.emit('connect') - } - - function onclose () { - stream.end() - stream.destroy() - } - - function onerror (err) { - stream.destroy(err) - } - - function onmessage (event) { - let data = event.data - if (data instanceof ArrayBuffer) data = Buffer.from(data) - else data = Buffer.from(data, 'utf8') - proxy.push(data) - } - - // this is to be enabled only if objectMode is false - function writev (chunks, cb) { - const buffers = new Array(chunks.length) - for (let i = 0; i < chunks.length; i++) { - if (typeof chunks[i].chunk === 'string') { - buffers[i] = Buffer.from(chunks[i], 'utf8') - } else { - buffers[i] = chunks[i].chunk - } - } - - this._write(Buffer.concat(buffers), 'binary', cb) - } - - function socketWriteBrowser (chunk, enc, next) { - if (socket.bufferedAmount > bufferSize) { - // throttle data until buffered amount is reduced. - setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) - } - - if (coerceToBuffer && typeof chunk === 'string') { - chunk = Buffer.from(chunk, 'utf8') - } - - try { - socket.send(chunk) - } catch (err) { - return next(err) - } - - next() - } - - function socketEndBrowser (done) { - socket.close() - done() - } - - // end methods for browserStreamBuilder - - return stream -} - -if (IS_BROWSER) { - module.exports = browserStreamBuilder -} else { - module.exports = streamBuilder -} diff --git a/lib/connect/wx.js b/lib/connect/wx.js deleted file mode 100644 index b9c7a0705..000000000 --- a/lib/connect/wx.js +++ /dev/null @@ -1,134 +0,0 @@ -'use strict' - -var Transform = require('readable-stream').Transform -var duplexify = require('duplexify') - -/* global wx */ -var socketTask -var proxy -var stream - -function buildProxy () { - var proxy = new Transform() - proxy._write = function (chunk, encoding, next) { - socketTask.send({ - data: chunk.buffer, - success: function () { - next() - }, - fail: function (errMsg) { - next(new Error(errMsg)) - } - }) - } - proxy._flush = function socketEnd (done) { - socketTask.close({ - success: function () { - done() - } - }) - } - - return proxy -} - -function setDefaultOpts (opts) { - if (!opts.hostname) { - opts.hostname = 'localhost' - } - if (!opts.path) { - opts.path = '/' - } - - if (!opts.wsOptions) { - opts.wsOptions = {} - } -} - -function buildUrl (opts, client) { - var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' - var url = protocol + '://' + opts.hostname + opts.path - if (opts.port && opts.port !== 80 && opts.port !== 443) { - url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path - } - if (typeof (opts.transformWsUrl) === 'function') { - url = opts.transformWsUrl(url, opts, client) - } - return url -} - -function bindEventHandler () { - socketTask.onOpen(function () { - stream.setReadable(proxy) - stream.setWritable(proxy) - stream.emit('connect') - }) - - socketTask.onMessage(function (res) { - var data = res.data - - if (data instanceof ArrayBuffer) data = Buffer.from(data) - else data = Buffer.from(data, 'utf8') - proxy.push(data) - }) - - socketTask.onClose(function () { - stream.end() - stream.destroy() - }) - - socketTask.onError(function (res) { - stream.destroy(new Error(res.errMsg)) - }) -} - -function buildStream (client, opts) { - opts.hostname = opts.hostname || opts.host - - if (!opts.hostname) { - throw new Error('Could not determine host. Specify host manually.') - } - - var websocketSubProtocol = - (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) - ? 'mqttv3.1' - : 'mqtt' - - setDefaultOpts(opts) - - var url = buildUrl(opts, client) - socketTask = wx.connectSocket({ - url: url, - protocols: [websocketSubProtocol] - }) - - proxy = buildProxy() - stream = duplexify.obj() - stream._destroy = function (err, cb) { - socketTask.close({ - success: function () { - cb && cb(err) - } - }) - } - - var destroyRef = stream.destroy - stream.destroy = function () { - stream.destroy = destroyRef - - var self = this - setTimeout(function () { - socketTask.close({ - fail: function () { - self._destroy(new Error()) - } - }) - }, 0) - }.bind(stream) - - bindEventHandler() - - return stream -} - -module.exports = buildStream diff --git a/lib/default-message-id-provider.js b/lib/default-message-id-provider.js index c0a953f3f..bbe532ae8 100644 --- a/lib/default-message-id-provider.js +++ b/lib/default-message-id-provider.js @@ -4,66 +4,62 @@ * DefaultMessageAllocator constructor * @constructor */ -function DefaultMessageIdProvider () { - if (!(this instanceof DefaultMessageIdProvider)) { - return new DefaultMessageIdProvider() - } - +export class DefaultMessageIdProvider { /** * MessageIDs starting with 1 * ensure that nextId is min. 1, see https://github.com/mqttjs/MQTT.js/issues/810 */ - this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) -} + constructor () { + this.nextId = Math.max(1, Math.floor(Math.random() * 65535)) + } -/** - * allocate - * - * Get the next messageId. - * @return unsigned int - */ -DefaultMessageIdProvider.prototype.allocate = function () { - // id becomes current state of this.nextId and increments afterwards - var id = this.nextId++ - // Ensure 16 bit unsigned int (max 65535, nextId got one higher) - if (this.nextId === 65536) { - this.nextId = 1 + /** + * allocate + * + * Get the next messageId. + * @return unsigned int + */ + allocate () { + // id becomes current state of this.nextId and increments afterwards + var id = this.nextId++ + // Ensure 16 bit unsigned int (max 65535, nextId got one higher) + if (this.nextId === 65536) { + this.nextId = 1 + } + return id } - return id -} -/** - * getLastAllocated - * Get the last allocated messageId. - * @return unsigned int - */ -DefaultMessageIdProvider.prototype.getLastAllocated = function () { - return (this.nextId === 1) ? 65535 : (this.nextId - 1) -} + /** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ + getLastAllocated () { + return (this.nextId === 1) ? 65535 : (this.nextId - 1) + } -/** - * register - * Register messageId. If success return true, otherwise return false. - * @param { unsigned int } - messageId to register, - * @return boolean - */ -DefaultMessageIdProvider.prototype.register = function (messageId) { - return true -} + /** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ + register (messageId) { + return true + } -/** - * deallocate - * Deallocate messageId. - * @param { unsigned int } - messageId to deallocate, - */ -DefaultMessageIdProvider.prototype.deallocate = function (messageId) { -} + /** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ + deallocate (messageId) { + } -/** - * clear - * Deallocate all messageIds. - */ -DefaultMessageIdProvider.prototype.clear = function () { + /** + * clear + * Deallocate all messageIds. + */ + clear () { + } } - -module.exports = DefaultMessageIdProvider diff --git a/lib/errors.js b/lib/errors.js new file mode 100644 index 000000000..7fb11ed00 --- /dev/null +++ b/lib/errors.js @@ -0,0 +1,47 @@ +'use strict' + +export var errors = { + 0: '', + 1: 'Unacceptable protocol version', + 2: 'Identifier rejected', + 3: 'Server unavailable', + 4: 'Bad username or password', + 5: 'Not authorized', + 16: 'No matching subscribers', + 17: 'No subscription existed', + 128: 'Unspecified error', + 129: 'Malformed Packet', + 130: 'Protocol Error', + 131: 'Implementation specific error', + 132: 'Unsupported Protocol Version', + 133: 'Client Identifier not valid', + 134: 'Bad User Name or Password', + 135: 'Not authorized', + 136: 'Server unavailable', + 137: 'Server busy', + 138: 'Banned', + 139: 'Server shutting down', + 140: 'Bad authentication method', + 141: 'Keep Alive timeout', + 142: 'Session taken over', + 143: 'Topic Filter invalid', + 144: 'Topic Name invalid', + 145: 'Packet identifier in use', + 146: 'Packet Identifier not found', + 147: 'Receive Maximum exceeded', + 148: 'Topic Alias invalid', + 149: 'Packet too large', + 150: 'Message rate too high', + 151: 'Quota exceeded', + 152: 'Administrative action', + 153: 'Payload format invalid', + 154: 'Retain not supported', + 155: 'QoS not supported', + 156: 'Use another server', + 157: 'Server moved', + 158: 'Shared Subscriptions not supported', + 159: 'Connection rate exceeded', + 160: 'Maximum connect time', + 161: 'Subscription Identifiers not supported', + 162: 'Wildcard Subscriptions not supported' +} diff --git a/lib/handlers/auth.js b/lib/handlers/auth.js new file mode 100644 index 000000000..25176c9da --- /dev/null +++ b/lib/handlers/auth.js @@ -0,0 +1,3 @@ +export async function handleAuth () { + return true +} diff --git a/lib/handlers/connack.js b/lib/handlers/connack.js new file mode 100644 index 000000000..234f4b241 --- /dev/null +++ b/lib/handlers/connack.js @@ -0,0 +1,3 @@ +export async function handleConnAck () { + return true +} diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js new file mode 100644 index 000000000..a89e5c072 --- /dev/null +++ b/lib/handlers/connect.js @@ -0,0 +1,6 @@ + +export async function handleConnect (client, packet) { + client.connecting = true + // Connection logic + return true +} diff --git a/lib/handlers/disconnect.js b/lib/handlers/disconnect.js new file mode 100644 index 000000000..2bed1ca5b --- /dev/null +++ b/lib/handlers/disconnect.js @@ -0,0 +1,3 @@ +export async function handleDisconnect () { + return true +} diff --git a/lib/handlers/index.js b/lib/handlers/index.js new file mode 100644 index 000000000..705b741b1 --- /dev/null +++ b/lib/handlers/index.js @@ -0,0 +1,73 @@ +import {handleConnect} from './connect' +import {handleConnAck} from './connack' +import {handleDisconnect} from './disconnect' +import {handlePing} from './ping' +import {handlePingReq} from './pingreq' +import {handlePingResp} from './pingresp' +import {handlePub} from './pub' +import {handlePubRec} from './pubrec' +import {handlePubRel} from './pubrel' +import {handlePubComp} from './pubcomp' +import {handlePubAck} from './puback' +import {handleSub} from './sub' +import {handleSubAck} from './suback' +import {handleUnsub} from './unsub' +import {handleUnsubAck} from './unsuback' +import { handleAuth } from './auth' + +export async function handle (client, packet) { + let result + switch (packet.cmd) { + case 'auth': + result = await handleAuth(client, packet) + break + case 'connect': + result = await handleConnect(client, packet) + break + case 'connack': + result = await handleConnAck(client, packet) + break + case 'publish': + result = await handlePub(client, packet) + break + case 'subscribe': + result = await handleSub(client, packet) + break + case 'suback': + result = await handleSubAck(client, packet) + break + case 'unsubscribe': + result = await handleUnsub(client, packet) + break + case 'unsuback': + result = await handleUnsubAck(client, packet) + break + case 'pubcomp': + result = await handlePubComp(client, packet) + break + case 'puback': + result = await handlePubAck(client, packet) + break + case 'pubrel': + result = await handlePubRel(client, packet) + break + case 'pubrec': + result = await handlePubRec(client, packet) + break + case 'ping': + result = await handlePing(client, packet) + break + case 'pingreq': + result = await handlePingReq(client, packet) + break + case 'pingresp': + result = await handlePingResp(client, packet) + break + case 'disconnect': + result = await handleDisconnect(client, packet) + client._disconnected = true + break + } + + return result +} diff --git a/lib/handlers/ping.js b/lib/handlers/ping.js new file mode 100644 index 000000000..cfede6bd8 --- /dev/null +++ b/lib/handlers/ping.js @@ -0,0 +1,3 @@ +export async function handlePing () { + return true +} diff --git a/lib/handlers/pingreq.js b/lib/handlers/pingreq.js new file mode 100644 index 000000000..98e97a870 --- /dev/null +++ b/lib/handlers/pingreq.js @@ -0,0 +1,3 @@ +export async function handlePingReq () { + return true +} diff --git a/lib/handlers/pingresp.js b/lib/handlers/pingresp.js new file mode 100644 index 000000000..db82e662f --- /dev/null +++ b/lib/handlers/pingresp.js @@ -0,0 +1,3 @@ +export async function handlePingResp () { + return true +} diff --git a/lib/handlers/pub.js b/lib/handlers/pub.js new file mode 100644 index 000000000..269f53d84 --- /dev/null +++ b/lib/handlers/pub.js @@ -0,0 +1,3 @@ +export async function handlePub () { + return true +} diff --git a/lib/handlers/puback.js b/lib/handlers/puback.js new file mode 100644 index 000000000..b3e83976c --- /dev/null +++ b/lib/handlers/puback.js @@ -0,0 +1,3 @@ +export async function handlePubAck () { + return true +} diff --git a/lib/handlers/pubcomp.js b/lib/handlers/pubcomp.js new file mode 100644 index 000000000..f88556a74 --- /dev/null +++ b/lib/handlers/pubcomp.js @@ -0,0 +1,3 @@ +export async function handlePubComp () { + return true +} diff --git a/lib/handlers/pubrec.js b/lib/handlers/pubrec.js new file mode 100644 index 000000000..b312f60e3 --- /dev/null +++ b/lib/handlers/pubrec.js @@ -0,0 +1,3 @@ +export async function handlePubRec () { + return true +} diff --git a/lib/handlers/pubrel.js b/lib/handlers/pubrel.js new file mode 100644 index 000000000..ae73babd8 --- /dev/null +++ b/lib/handlers/pubrel.js @@ -0,0 +1,3 @@ +export async function handlePubRel () { + return true +} diff --git a/lib/handlers/sub.js b/lib/handlers/sub.js new file mode 100644 index 000000000..c5af630ea --- /dev/null +++ b/lib/handlers/sub.js @@ -0,0 +1,3 @@ +export async function handleSub () { + return true +} diff --git a/lib/handlers/suback.js b/lib/handlers/suback.js new file mode 100644 index 000000000..0049e3f92 --- /dev/null +++ b/lib/handlers/suback.js @@ -0,0 +1,3 @@ +export async function handleSubAck () { + return true +} diff --git a/lib/handlers/unsub.js b/lib/handlers/unsub.js new file mode 100644 index 000000000..0ffb6af9f --- /dev/null +++ b/lib/handlers/unsub.js @@ -0,0 +1,3 @@ +export async function handleUnsub () { + return true +} diff --git a/lib/handlers/unsuback.js b/lib/handlers/unsuback.js new file mode 100644 index 000000000..a21275ede --- /dev/null +++ b/lib/handlers/unsuback.js @@ -0,0 +1,3 @@ +export async function handleUnsubAck () { + return true +} diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 000000000..15a0b3aac --- /dev/null +++ b/lib/index.js @@ -0,0 +1,13 @@ +/* + * Copyright (c) 2015-2015 MQTT.js contributors. + * Copyright (c) 2011-2014 Adam Rudd. + * + * See LICENSE for more information + */ + +import { connect } from './client' +import { Store } from './store' +import { DefaultMessageIdProvider } from './default-message-id-provider' +import { UniqueMessageIdProvider } from './unique-message-id-provider' + +export {connect, Store, DefaultMessageIdProvider, UniqueMessageIdProvider} diff --git a/lib/unique-message-id-provider.js b/lib/unique-message-id-provider.js index 6ffd4bde6..bf30c226e 100644 --- a/lib/unique-message-id-provider.js +++ b/lib/unique-message-id-provider.js @@ -1,65 +1,57 @@ 'use strict' -var NumberAllocator = require('number-allocator').NumberAllocator +import { NumberAllocator } from 'number-allocator' -/** - * UniqueMessageAllocator constructor - * @constructor - */ -function UniqueMessageIdProvider () { - if (!(this instanceof UniqueMessageIdProvider)) { - return new UniqueMessageIdProvider() +export class UniqueMessageIdProvider { + constructor () { + this.numberAllocator = new NumberAllocator(1, 65535) } - this.numberAllocator = new NumberAllocator(1, 65535) -} - -/** - * allocate - * - * Get the next messageId. - * @return if messageId is fully allocated then return null, - * otherwise return the smallest usable unsigned int messageId. - */ -UniqueMessageIdProvider.prototype.allocate = function () { - this.lastId = this.numberAllocator.alloc() - return this.lastId -} + /** + * allocate + * + * Get the next messageId. + * @return if messageId is fully allocated then return null, + * otherwise return the smallest usable unsigned int messageId. + */ + allocate () { + this.lastId = this.numberAllocator.alloc() + return this.lastId + } -/** - * getLastAllocated - * Get the last allocated messageId. - * @return unsigned int - */ -UniqueMessageIdProvider.prototype.getLastAllocated = function () { - return this.lastId -} + /** + * getLastAllocated + * Get the last allocated messageId. + * @return unsigned int + */ + getLastAllocated () { + return this.lastId + } -/** - * register - * Register messageId. If success return true, otherwise return false. - * @param { unsigned int } - messageId to register, - * @return boolean - */ -UniqueMessageIdProvider.prototype.register = function (messageId) { - return this.numberAllocator.use(messageId) -} + /** + * register + * Register messageId. If success return true, otherwise return false. + * @param { unsigned int } - messageId to register, + * @return boolean + */ + register (messageId) { + return this.numberAllocator.use(messageId) + } -/** - * deallocate - * Deallocate messageId. - * @param { unsigned int } - messageId to deallocate, - */ -UniqueMessageIdProvider.prototype.deallocate = function (messageId) { - this.numberAllocator.free(messageId) -} + /** + * deallocate + * Deallocate messageId. + * @param { unsigned int } - messageId to deallocate, + */ + deallocate (messageId) { + this.numberAllocator.free(messageId) + } -/** - * clear - * Deallocate all messageIds. - */ -UniqueMessageIdProvider.prototype.clear = function () { - this.numberAllocator.clear() + /** + * clear + * Deallocate all messageIds. + */ + clear () { + this.numberAllocator.clear() + } } - -module.exports = UniqueMessageIdProvider diff --git a/lib/validations.js b/lib/validateTopic.js similarity index 90% rename from lib/validations.js rename to lib/validateTopic.js index 1a3277901..e8c6d62dc 100644 --- a/lib/validations.js +++ b/lib/validateTopic.js @@ -9,7 +9,7 @@ * @param {String} topic - A topic * @returns {Boolean} If the topic is valid, returns true. Otherwise, returns false. */ -function validateTopic (topic) { +export function validateTopic (topic) { var parts = topic.split('/') for (var i = 0; i < parts.length; i++) { @@ -35,7 +35,7 @@ function validateTopic (topic) { * @param {Array} topics - Array of topics * @returns {String} If the topics is valid, returns null. Otherwise, returns the invalid one */ -function validateTopics (topics) { +export function validateTopics (topics) { if (topics.length === 0) { return 'empty_topic_list' } @@ -46,7 +46,3 @@ function validateTopics (topics) { } return null } - -module.exports = { - validateTopics: validateTopics -} diff --git a/mqtt.js b/mqtt.js deleted file mode 100644 index c8b94fda1..000000000 --- a/mqtt.js +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) 2015-2015 MQTT.js contributors. - * Copyright (c) 2011-2014 Adam Rudd. - * - * See LICENSE for more information - */ - -var MqttClient = require('./lib/client') -var connect = require('./lib/connect') -var Store = require('./lib/store') -var DefaultMessageIdProvider = require('./lib/default-message-id-provider') -var UniqueMessageIdProvider = require('./lib/unique-message-id-provider') - -module.exports.connect = connect - -// Expose MqttClient -module.exports.MqttClient = MqttClient -module.exports.Client = MqttClient -module.exports.Store = Store -module.exports.DefaultMessageIdProvider = DefaultMessageIdProvider -module.exports.UniqueMessageIdProvider = UniqueMessageIdProvider diff --git a/types/lib/client.d.ts b/types/lib/client.d.ts index 7821a96d7..94657b6a5 100644 --- a/types/lib/client.d.ts +++ b/types/lib/client.d.ts @@ -204,7 +204,7 @@ export declare class MqttClient extends events.EventEmitter { * @returns {MqttClient} this - for chaining * @api public * - * @example client.removeOutgoingMessage(client.getLastMessageId()); + * @example client.removeOutgoingMessage(client.getLastMessageId()) */ public removeOutgoingMessage (mid: number): this From 8e71d0136cb07fb79f8a63336f1bb8e81d76b9c2 Mon Sep 17 00:00:00 2001 From: Yoseph Maguire Date: Thu, 26 Aug 2021 10:11:52 -0700 Subject: [PATCH 2/2] updates --- .eslintrc.json | 16 + lib/client.js | 653 ++++++++++++++++++++++++++++++++++++- lib/handlers/connack.js | 37 ++- lib/handlers/connect.js | 108 +++++- lib/handlers/disconnect.js | 4 +- lib/handlers/index.js | 32 +- lib/handlers/ping.js | 26 +- lib/handlers/pingresp.js | 4 + lib/handlers/pub.js | 92 +++++- lib/handlers/puback.js | 83 ++++- lib/handlers/pubcomp.js | 83 ++++- lib/handlers/pubrec.js | 82 ++++- lib/handlers/pubrel.js | 24 +- lib/handlers/sub.js | 181 +++++++++- lib/handlers/suback.js | 83 ++++- lib/handlers/unsub.js | 92 +++++- lib/handlers/unsuback.js | 83 ++++- lib/index.js | 61 ++++ package.json | 67 ++-- tslint.json | 3 - 20 files changed, 1718 insertions(+), 96 deletions(-) create mode 100644 .eslintrc.json delete mode 100644 tslint.json diff --git a/.eslintrc.json b/.eslintrc.json new file mode 100644 index 000000000..ac3f85860 --- /dev/null +++ b/.eslintrc.json @@ -0,0 +1,16 @@ +{ + "env": { + "browser": true, + "es2021": true, + "node": true + }, + "extends": [ + "standard" + ], + "parserOptions": { + "ecmaVersion": 12, + "sourceType": "module" + }, + "rules": { + } +} diff --git a/lib/client.js b/lib/client.js index 2d07df752..ba7c91339 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1,11 +1,14 @@ 'use strict' -const mqtt = require('mqtt-packet') +import { mqtt } from 'mqtt-packet' +import { handle } from 'handlers' +import net from 'net' +import WebSocket, { createWebSocketStream } from 'ws' + // const eventEmitter = require('events') -const handle = require('handlers') // const mqttErrors = require('errors') -// const logger = require('pino')() +const logger = require('pino')() export class Client { constructor (options) { @@ -16,6 +19,19 @@ export class Client { this.id = null this.clean = true this.version = null + // eslint-disable-next-line camelcase + // TODO: _isBrowser should be a global value and should be standardized.... + this._isBrowser = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function' + + // Connect Information + this.protocol = null + this.port = null + this.hostname = null + this.rejectUnauthorized = null + + this.stream = this._streamBuilder() + + this._reconnectCount = 0 this._disconnected = false this._authorized = false @@ -31,25 +47,368 @@ export class Client { resubscribe: true } - this._options = options || {...this._defaultConnectOptions} + this._options = options || { ...this._defaultConnectOptions } this._options.clientId = options.clientId || `mqttjs_ ${Math.random().toString(16).substr(2, 8)}` - this._parser.client = this this._parser._queue = [] this._parser.on('packet', this.enqueue) this.once('connected', this.dequeue) - // TBD + this.on('connect', this._sendQueuedPackets()) + this.on('close', this._closeClient()) + } + + _streamBuilder (opts) { + switch (this.protocol) { + case 'tcp': { + return net.createConnection(port, host) + } + case 'tls': { + // TODO: This needs to have options passed down to it. + // We should probably have the whole options object just + // passed down to tls.connect, right? + + function handleTLSErrors () { + // How can I get verify this error is a tls error? + if (opts.rejectUnauthorized) { + mqttClient.emit('error', err) + } + + // close this connection to match the behaviour of net + // otherwise all we get is an error from the connection + // and close event doesn't fire. This is a work around + // to enable the reconnect code to work the same as with + // net.createConnection + connection.end() + } + connection = tls.connect(opts) + connection.on('secureConnect', function () { + if (opts.rejectUnauthorized && !connection.authorized) { + connection.emit('error', new Error('TLS not authorized')) + } else { + connection.removeListener('error', handleTLSErrors) + } + }) + + connection.on('error', handleTLSErrors) + return connection + } + case 'ws': { + if (this._isBrowser) { + this._buildWebSocketStreamBrowser(opts) + } else { + this._buildWebSocketStream(opts) + } + } + } + } + + // To consider : do we want to have this in the main code, + // or do we want to have a browser shim? + _buildWebSocketStreamBrowser (opts) { + const options = opts + if (!opts.hostname) { + options.hostname = 'localhost' + } + if (!opts.port) { + if (opts.protocol === 'wss') { + options.port = 443 + } else { + options.port = 80 + } + } + if (!opts.path) { + options.path = '/' + } + + if (!opts.wsOptions) { + options.wsOptions = {} + } + if (!IS_BROWSER && opts.protocol === 'wss') { + // Add cert/key/ca etc options + WSS_OPTIONS.forEach(function (prop) { + if (opts.hasOwnProperty(prop) && !opts.wsOptions.hasOwnProperty(prop)) { + options.wsOptions[prop] = opts[prop] + } + }) + } + + if (!options.hostname) { + options.hostname = options.host + } + + if (!options.hostname) { + // Throwing an error in a Web Worker if no `hostname` is given, because we + // can not determine the `hostname` automatically. If connecting to + // localhost, please supply the `hostname` as an argument. + if (typeof (document) === 'undefined') { + throw new Error('Could not determine host. Specify host manually.') + } + const parsed = new URL(document.URL) + options.hostname = parsed.hostname + + if (!options.port) { + options.port = parsed.port + } + } + + // objectMode should be defined for logic + if (options.objectMode === undefined) { + options.objectMode = !(options.binary === true || options.binary === undefined) + } + const websocketSubProtocol = + (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) + ? 'mqttv3.1' + : 'mqtt' + + const url = buildUrl(opts, client) + /* global WebSocket */ + const socket = new WebSocket(url, [websocketSubProtocol]) + socket.binaryType = 'arraybuffer' + logger('browserStreamBuilder') + let stream + // sets the maximum socket buffer size before throttling + const bufferSize = options.browserBufferSize || 1024 * 512 + + const bufferTimeout = opts.browserBufferTimeout || 1000 + + const coerceToBuffer = !opts.objectMode + + const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) + + if (!opts.objectMode) { + proxy._writev = writev + } + proxy.on('close', () => { socket.close() }) + + const eventListenerSupport = (typeof socket.addEventListener !== 'undefined') + + // was already open when passed in + if (socket.readyState === socket.OPEN) { + stream = proxy + } else { + stream = stream = duplexify(undefined, undefined, opts) + if (!opts.objectMode) { + stream._writev = writev + } + + if (eventListenerSupport) { + socket.addEventListener('open', onopen) + } else { + socket.onopen = onopen + } + } + + stream.socket = socket + + if (eventListenerSupport) { + socket.addEventListener('close', onclose) + socket.addEventListener('error', onerror) + socket.addEventListener('message', onmessage) + } else { + socket.onclose = onclose + socket.onerror = onerror + socket.onmessage = onmessage + } + + // methods for browserStreamBuilder + + function buildProxy (options, socketWrite, socketEnd) { + const proxy = new Transform({ + objectModeMode: options.objectMode + }) + + proxy._write = socketWrite + proxy._flush = socketEnd + + return proxy + } + + function onopen () { + stream.setReadable(proxy) + stream.setWritable(proxy) + stream.emit('connect') + } + + function onclose () { + stream.end() + stream.destroy() + } + + function onerror (err) { + stream.destroy(err) + } + + function onmessage (event) { + let data = event.data + if (data instanceof ArrayBuffer) data = Buffer.from(data) + else data = Buffer.from(data, 'utf8') + proxy.push(data) + } + + // this is to be enabled only if objectMode is false + function writev (chunks, cb) { + const buffers = new Array(chunks.length) + for (let i = 0; i < chunks.length; i++) { + if (typeof chunks[i].chunk === 'string') { + buffers[i] = Buffer.from(chunks[i], 'utf8') + } else { + buffers[i] = chunks[i].chunk + } + } + + this._write(Buffer.concat(buffers), 'binary', cb) + } + + function socketWriteBrowser (chunk, enc, next) { + if (socket.bufferedAmount > bufferSize) { + // throttle data until buffered amount is reduced. + setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) + } + + if (coerceToBuffer && typeof chunk === 'string') { + chunk = Buffer.from(chunk, 'utf8') + } + + try { + socket.send(chunk) + } catch (err) { + return next(err) + } + + next() + } + + function socketEndBrowser (done) { + socket.close() + done() + } + + // end methods for browserStreamBuilder + + return stream + } + + _buildWebSocketStream (opts) { + const options = opts + if (!opts.hostname) { + options.hostname = 'localhost' + } + if (!opts.port) { + if (opts.protocol === 'wss') { + options.port = 443 + } else { + options.port = 80 + } + } + if (!opts.path) { + options.path = '/' + } + + if (!opts.wsOptions) { + options.wsOptions = {} + } + if (!IS_BROWSER && opts.protocol === 'wss') { + // Add cert/key/ca etc options + WSS_OPTIONS.forEach(function (prop) { + if (opts.hasOwnProperty(prop) && !opts.wsOptions.hasOwnProperty(prop)) { + options.wsOptions[prop] = opts[prop] + } + }) + } + const url = buildUrl(options, client) + const websocketSubProtocol = + (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) + ? 'mqttv3.1' + : 'mqtt' + + const socket = new WebSocket(url, [websocketSubProtocol], opts.wsOptions) + const webSocketStream = createWebSocketStream(socket, options.wsOptions) + webSocketStream.url = url + socket.on('close', () => { webSocketStream.destroy() }) + return webSocketStream + } + + async enqueue (packet) { + this._parsingBatch++ + // already connected or it's the first packet + if (this.connackSent || this._parsingBatch === 1) { + await handle(this, packet, this._nextBatch) + } else { + if (this._parser._queue.length < this._queueLimit) { + this._parser._queue.push(packet) + } else { + this.emit('error', new Error('Client queue limit reached')) + } + } } - enqueue () { - return true + async dequeue () { + const q = this._parser._queue + if (q) { + for (let i = 0, len = q.length; i < len; i++) { + await handle(this, q[i], this._nextBatch) + } + + this._parser._queue = null + } } - dequeue () { - return true + _closeClient () { + logger('close :: connected set to `false`') + this.connected = false + + logger('close :: clearing connackTimer') + clearTimeout(this.connackTimer) + + logger('close :: clearing ping timer') + if (that.pingTimer !== null) { + that.pingTimer.clear() + that.pingTimer = null + } + + logger('close :: calling _setupReconnect') + this._setupReconnect() } - static async connect (options) { - return new Client(options) + _sendQueuedPackets () { + const queue = this.queue + + function deliver () { + const entry = queue.shift() + logger('deliver :: entry %o', entry) + let packet = null + + if (!entry) { + return + } + + packet = entry.packet + logger('deliver :: call _sendPacket for %o', packet) + let send = true + if (packet.messageId && packet.messageId !== 0) { + if (!that.messageIdProvider.register(packet.messageId)) { + packet.messageeId = that.messageIdProvider.allocate() + if (packet.messageId === null) { + send = false + } + } + } + if (send) { + that._sendPacket( + packet, + function (err) { + if (entry.cb) { + entry.cb(err) + } + deliver() + } + ) + } else { + logger('messageId: %d has already used.', packet.messageId) + deliver() + } + } + + deliver() } async publish (topic, message, opts) { @@ -59,7 +418,7 @@ export class Client { async subscribe (packet) { if (!packet.subscriptions) { - packet = {subscriptions: Array.isArray(packet) ? packet : [packet]} + packet = { subscriptions: Array.isArray(packet) ? packet : [packet] } } const result = await handle.subscribe(this, packet) return result @@ -69,4 +428,272 @@ export class Client { const result = await handle.unsubscribe(this, packet) return result } + + async end (force, opts) { + const that = this + + logger('end :: (%s)', this.options.clientId) + + if (force == null || typeof force !== 'boolean') { + cb = opts || nop + opts = force + force = false + if (typeof opts !== 'object') { + cb = opts + opts = null + if (typeof cb !== 'function') { + cb = nop + } + } + } + + if (typeof opts !== 'object') { + cb = opts + opts = null + } + + logger('end :: cb? %s', !!cb) + cb = cb || nop + + function closeStores () { + logger('end :: closeStores: closing incoming and outgoing stores') + that.disconnected = true + that.incomingStore.close(function (e1) { + that.outgoingStore.close(function (e2) { + logger('end :: closeStores: emitting end') + that.emit('end') + if (cb) { + const err = e1 || e2 + logger('end :: closeStores: invoking callback with args') + cb(err) + } + }) + }) + if (that._deferredReconnect) { + that._deferredReconnect() + } + } + + function finish () { + // defer closesStores of an I/O cycle, + // just to make sure things are + // ok for websockets + logger('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force) + that._cleanUp(force, () => { + logger('end :: finish :: calling process.nextTick on closeStores') + // var boundProcess = nextTick.bind(null, closeStores) + nextTick(closeStores.bind(that)) + }, opts) + } + + if (this.disconnecting) { + cb() + return this + } + + this._clearReconnect() + + this.disconnecting = true + + if (!force && Object.keys(this.outgoing).length > 0) { + // wait 10ms, just to be sure we received all of it + logger('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId) + this.once('outgoingEmpty', setTimeout.bind(null, finish, 10)) + } else { + logger('end :: (%s) :: immediately calling finish', that.options.clientId) + finish() + } + + return this + } + + /** + * removeOutgoingMessage - remove a message in outgoing store + * the outgoing callback will be called withe Error('Message removed') if the message is removed + * + * @param {Number} messageId - messageId to remove message + * @returns {MqttClient} this - for chaining + * @api public + * + * @example client.removeOutgoingMessage(client.getLastAllocated()); + */ + removeOutgoingMessage (messageId) { + const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + delete this.outgoing[messageId] + this.outgoingStore.del({ messageId: messageId }, function () { + cb(new Error('Message removed')) + }) + return this + } + + reconnect (opts) { + logger('client reconnect') + const that = this + const f = function () { + if (opts) { + that.options.incomingStore = opts.incomingStore + that.options.outgoingStore = opts.outgoingStore + } else { + that.options.incomingStore = null + that.options.outgoingStore = null + } + that.incomingStore = that.options.incomingStore || new Store() + that.outgoingStore = that.options.outgoingStore || new Store() + that.disconnecting = false + that.disconnected = false + that._deferredReconnect = null + that._reconnect() + } + + if (this.disconnecting && !this.disconnected) { + this._deferredReconnect = f + } else { + f() + } + return this + } + + _reconnect () { + logger('_reconnect: emitting reconnect to client') + this.emit('reconnect') + if (this.connected) { + this.end(() => { this._setupStream() }) + logger('client already connected. disconnecting first.') + } else { + logger('_reconnect: calling _setupStream') + this._setupStream() + } + } + + _setupReconnect () { + if (!this.disconnecting && !this.reconnectTimer && (this.options.reconnectPeriod > 0)) { + if (!this.reconnecting) { + logger('_setupReconnect :: emit `offline` state') + this.emit('offline') + logger('_setupReconnect :: set `reconnecting` to `true`') + this.reconnecting = true + } + logger('_setupReconnect :: setting reconnectTimer for %d ms', this.options.reconnectPeriod) + this.reconnectTimer = setInterval(() => { + logger('reconnectTimer :: reconnect triggered!') + this._reconnect() + }, this.options.reconnectPeriod) + } else { + logger('_setupReconnect :: doing nothing...') + } + } + + _clearReconnect () { + logger('_clearReconnect : clearing reconnect timer') + if (this.reconnectTimer) { + clearInterval(this.reconnectTimer) + this.reconnectTimer = null + } + } + + async _cleanUp (forced) { + const opts = arguments[2] + if (done) { + logger('_cleanUp :: done callback provided for on stream close') + this.stream.on('close', done) + } + + logger('_cleanUp :: forced? %s', forced) + if (forced) { + if ((this.options.reconnectPeriod === 0) && this.options.clean) { + flush(this.outgoing) + } + logger('_cleanUp :: (%s) :: destroying stream', this.options.clientId) + this.stream.destroy() + } else { + const packet = xtend({ cmd: 'disconnect' }, opts) + logger('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId) + this._sendPacket( + packet, + setImmediate.bind( + null, + this.stream.end.bind(this.stream) + ) + ) + } + + if (!this.disconnecting) { + logger('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.') + this._clearReconnect() + this._setupReconnect() + } + + if (this.pingTimer !== null) { + logger('_cleanUp :: clearing pingTimer') + this.pingTimer.clear() + this.pingTimer = null + } + + if (done && !this.connected) { + logger('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId) + this.stream.removeListener('close', done) + done() + } + } + + async _sendPacket (packet) { + logger('_sendPacket :: (%s) :: start', this.options.clientId) + cbStorePut = cbStorePut || nop + + if (!this.connected) { + logger('_sendPacket :: client not connected. Storing packet offline.') + this._storePacket(packet, cb, cbStorePut) + return + } + + // When sending a packet, reschedule the ping timer + this._shiftPingInterval() + + switch (packet.cmd) { + case 'publish': + break + case 'pubrel': + storeAndSend(this, packet, cb, cbStorePut) + return + default: + sendPacket(this, packet, cb) + return + } + + switch (packet.qos) { + case 2: + case 1: + storeAndSend(this, packet, cb, cbStorePut) + break + /** + * no need of case here since it will be caught by default + * and jshint comply that before default it must be a break + * anyway it will result in -1 evaluation + */ + case 0: + /* falls through */ + default: + sendPacket(this, packet, cb) + break + } + } + + _storePacket (packet) { + cbStorePut = cbStorePut || nop + + // check that the packet is not a qos of 0, or that the command is not a publish + if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') { + this.queue.push({ packet: packet, cb: cb }) + } else if (packet.qos > 0) { + cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null + this.outgoingStore.put(packet, function (err) { + if (err) { + return cb && cb(err) + } + cbStorePut() + }) + } else if (cb) { + cb(new Error('No connection to broker')) + } + } } diff --git a/lib/handlers/connack.js b/lib/handlers/connack.js index 234f4b241..c036953bf 100644 --- a/lib/handlers/connack.js +++ b/lib/handlers/connack.js @@ -1,3 +1,36 @@ -export async function handleConnAck () { - return true +export async function handleConnAck (packet) { + this.connackTimer = setTimeout(function () { + debug('!!connectTimeout hit!! Calling _cleanUp with force `true`') + that._cleanUp(true) + }, this.options.connectTimeout) + + var options = this.options + var version = options.protocolVersion + var rc = version === 5 ? packet.reasonCode : packet.returnCode + + clearTimeout(this.connackTimer) + + if (packet.properties) { + if (packet.properties.topicAliasMaximum) { + if (!options.properties) { options.properties = {} } + options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum + } + if (packet.properties.serverKeepAlive && options.keepalive) { + options.keepalive = packet.properties.serverKeepAlive + this._shiftPingInterval() + } + if (packet.properties.maximumPacketSize) { + if (!options.properties) { options.properties = {} } + options.properties.maximumPacketSize = packet.properties.maximumPacketSize + } + } + + if (rc === 0) { + this.reconnecting = false + this._onConnect(packet) + } else if (rc > 0) { + var err = new Error('Connection refused: ' + errors[rc]) + err.code = rc + this.emit('error', err) + } } diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index a89e5c072..29b43b1ca 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -1,6 +1,108 @@ export async function handleConnect (client, packet) { - client.connecting = true - // Connection logic - return true + if (this.disconnected) { + this.emit('connect', packet) + return + } + + var that = this + + this.messageIdProvider.clear() + this._setupPingTimer() + this._resubscribe(packet) + + this.connected = true + + function startStreamProcess () { + var outStore = that.outgoingStore.createStream() + + function clearStoreProcessing () { + that._storeProcessing = false + that._packetIdsDuringStoreProcessing = {} + } + + that.once('close', remove) + outStore.on('error', function (err) { + clearStoreProcessing() + that._flushStoreProcessingQueue() + that.removeListener('close', remove) + that.emit('error', err) + }) + + function remove () { + outStore.destroy() + outStore = null + that._flushStoreProcessingQueue() + clearStoreProcessing() + } + + function storeDeliver () { + // edge case, we wrapped this twice + if (!outStore) { + return + } + that._storeProcessing = true + + var packet = outStore.read(1) + + var cb + + if (!packet) { + // read when data is available in the future + outStore.once('readable', storeDeliver) + return + } + + // Skip already processed store packets + if (that._packetIdsDuringStoreProcessing[packet.messageId]) { + storeDeliver() + return + } + + // Avoid unnecessary stream read operations when disconnected + if (!that.disconnecting && !that.reconnectTimer) { + cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null + that.outgoing[packet.messageId] = { + volatile: false, + cb: function (err, status) { + // Ensure that the original callback passed in to publish gets invoked + if (cb) { + cb(err, status) + } + + storeDeliver() + } + } + that._packetIdsDuringStoreProcessing[packet.messageId] = true + if (that.messageIdProvider.register(packet.messageId)) { + that._sendPacket(packet) + } else { + debug('messageId: %d has already used.', packet.messageId) + } + } else if (outStore.destroy) { + outStore.destroy() + } + } + + outStore.on('end', function () { + var allProcessed = true + for (var id in that._packetIdsDuringStoreProcessing) { + if (!that._packetIdsDuringStoreProcessing[id]) { + allProcessed = false + break + } + } + if (allProcessed) { + clearStoreProcessing() + that.removeListener('close', remove) + that._invokeAllStoreProcessingQueue() + that.emit('connect', packet) + } else { + startStreamProcess() + } + }) + storeDeliver() + } + // start flowing + startStreamProcess() } diff --git a/lib/handlers/disconnect.js b/lib/handlers/disconnect.js index 2bed1ca5b..cae2553c4 100644 --- a/lib/handlers/disconnect.js +++ b/lib/handlers/disconnect.js @@ -1,3 +1,3 @@ -export async function handleDisconnect () { - return true +export async function handleDisconnect (client) { + client.emit('disconnect', packet) } diff --git a/lib/handlers/index.js b/lib/handlers/index.js index 705b741b1..32138753e 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -1,18 +1,20 @@ -import {handleConnect} from './connect' -import {handleConnAck} from './connack' -import {handleDisconnect} from './disconnect' -import {handlePing} from './ping' -import {handlePingReq} from './pingreq' -import {handlePingResp} from './pingresp' -import {handlePub} from './pub' -import {handlePubRec} from './pubrec' -import {handlePubRel} from './pubrel' -import {handlePubComp} from './pubcomp' -import {handlePubAck} from './puback' -import {handleSub} from './sub' -import {handleSubAck} from './suback' -import {handleUnsub} from './unsub' -import {handleUnsubAck} from './unsuback' +'use strict' + +import { handleConnect } from './connect' +import { handleConnAck } from './connack' +import { handleDisconnect } from './disconnect' +import { handlePing } from './ping' +import { handlePingReq } from './pingreq' +import { handlePingResp } from './pingresp' +import { handlePub } from './pub' +import { handlePubRec } from './pubrec' +import { handlePubRel } from './pubrel' +import { handlePubComp } from './pubcomp' +import { handlePubAck } from './puback' +import { handleSub } from './sub' +import { handleSubAck } from './suback' +import { handleUnsub } from './unsub' +import { handleUnsubAck } from './unsuback' import { handleAuth } from './auth' export async function handle (client, packet) { diff --git a/lib/handlers/ping.js b/lib/handlers/ping.js index cfede6bd8..c80e22e68 100644 --- a/lib/handlers/ping.js +++ b/lib/handlers/ping.js @@ -1,3 +1,27 @@ + export async function handlePing () { - return true + debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive) + + if (!this.pingTimer && this.options.keepalive) { + this.pingResp = true + this.pingTimer = reInterval(() => { + checkPing() + }, this.options.keepalive * 1000) + } } + +export async function shiftPingInterval () { + if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) { + this.pingTimer.reschedule(this.options.keepalive * 1000) + } +} + +function checkPing () { + if (this.pingResp) { + this.pingResp = false + this._sendPacket({ cmd: 'pingreq' }) + } else { + // do a forced cleanup since socket will be in bad shape + this._cleanUp(true) + } +} \ No newline at end of file diff --git a/lib/handlers/pingresp.js b/lib/handlers/pingresp.js index db82e662f..3d555804e 100644 --- a/lib/handlers/pingresp.js +++ b/lib/handlers/pingresp.js @@ -1,3 +1,7 @@ export async function handlePingResp () { + // TODO: In old implementation, pingResp is a state in + // the client that is toggled to true when the pingResp + // is received. How does aedes do this? is there a + // better way than a boolean toggle? return true } diff --git a/lib/handlers/pub.js b/lib/handlers/pub.js index 269f53d84..e2c690850 100644 --- a/lib/handlers/pub.js +++ b/lib/handlers/pub.js @@ -1,3 +1,89 @@ -export async function handlePub () { - return true -} + +/** + * _handlePublish + * + * @param {Object} packet + * @api private + */ +/* +those late 2 case should be rewrite to comply with coding style: +case 1: +case 0: + // do not wait sending a puback + // no callback passed + if (1 === qos) { + this._sendPacket({ + cmd: 'puback', + messageId: messageId + }); + } + // emit the message event for both qos 1 and 0 + this.emit('message', topic, message, packet); + this.handleMessage(packet, done); + break; +default: + // do nothing but every switch mus have a default + // log or throw an error about unknown qos + break; +for now i just suppressed the warnings +*/ +export async function handlePub (packet) { + debug('_handlePublish: packet %o', packet) + done = typeof done !== 'undefined' ? done : nop + var topic = packet.topic.toString() + var message = packet.payload + var qos = packet.qos + var messageId = packet.messageId + var that = this + var options = this.options + var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153] + debug('_handlePublish: qos %d', qos) + switch (qos) { + case 2: { + options.customHandleAcks(topic, message, packet, function (error, code) { + if (!(error instanceof Error)) { + code = error + error = null + } + if (error) { return that.emit('error', error) } + if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) } + if (code) { + that._sendPacket({cmd: 'pubrec', messageId: messageId, reasonCode: code}, done) + } else { + that.incomingStore.put(packet, function () { + that._sendPacket({cmd: 'pubrec', messageId: messageId}, done) + }) + } + }) + break + } + case 1: { + // emit the message event + options.customHandleAcks(topic, message, packet, function (error, code) { + if (!(error instanceof Error)) { + code = error + error = null + } + if (error) { return that.emit('error', error) } + if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) } + if (!code) { that.emit('message', topic, message, packet) } + that.handleMessage(packet, function (err) { + if (err) { + return done && done(err) + } + that._sendPacket({cmd: 'puback', messageId: messageId, reasonCode: code}, done) + }) + }) + break + } + case 0: + // emit the message event + this.emit('message', topic, message, packet) + this.handleMessage(packet, done) + break + default: + // do nothing + debug('_handlePublish: unknown QoS. Doing nothing.') + // log or throw an error about unknown qos + break + } diff --git a/lib/handlers/puback.js b/lib/handlers/puback.js index b3e83976c..66a903ef6 100644 --- a/lib/handlers/puback.js +++ b/lib/handlers/puback.js @@ -1,3 +1,80 @@ -export async function handlePubAck () { - return true -} +export async function handlePubAck (packet) { + /* eslint no-fallthrough: "off" */ + var messageId = packet.messageId + var type = packet.cmd + var response = null + var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + var that = this + var err + + if (!cb) { + debug('_handleAck :: Server sent an ack in error. Ignoring.') + // Server sent an ack in error, ignore it. + return + } + + // Process + debug('_handleAck :: packet type', type) + switch (type) { + case 'pubcomp': + // same thing as puback for QoS 2 + case 'puback': + var pubackRC = packet.reasonCode + // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } + delete this.outgoing[messageId] + this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + break + case 'pubrec': + response = { + cmd: 'pubrel', + qos: 2, + messageId: messageId + } + var pubrecRC = packet.reasonCode + + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } + break + case 'suback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[messageId] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + } + } + this._invokeStoreProcessingQueue() + cb(null, packet) + break + case 'unsuback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + cb(null) + break + default: + that.emit('error', new Error('unrecognized packet type')) + } + + if (this.disconnecting && + Object.keys(this.outgoing).length === 0) { + this.emit('outgoingEmpty') + }} diff --git a/lib/handlers/pubcomp.js b/lib/handlers/pubcomp.js index f88556a74..350d65326 100644 --- a/lib/handlers/pubcomp.js +++ b/lib/handlers/pubcomp.js @@ -1,3 +1,80 @@ -export async function handlePubComp () { - return true -} +export async function handlePubComp (packet) { + /* eslint no-fallthrough: "off" */ + var messageId = packet.messageId + var type = packet.cmd + var response = null + var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + var that = this + var err + + if (!cb) { + debug('_handleAck :: Server sent an ack in error. Ignoring.') + // Server sent an ack in error, ignore it. + return + } + + // Process + debug('_handleAck :: packet type', type) + switch (type) { + case 'pubcomp': + // same thing as puback for QoS 2 + case 'puback': + var pubackRC = packet.reasonCode + // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } + delete this.outgoing[messageId] + this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + break + case 'pubrec': + response = { + cmd: 'pubrel', + qos: 2, + messageId: messageId + } + var pubrecRC = packet.reasonCode + + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } + break + case 'suback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[messageId] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + } + } + this._invokeStoreProcessingQueue() + cb(null, packet) + break + case 'unsuback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + cb(null) + break + default: + that.emit('error', new Error('unrecognized packet type')) + } + + if (this.disconnecting && + Object.keys(this.outgoing).length === 0) { + this.emit('outgoingEmpty') + }} diff --git a/lib/handlers/pubrec.js b/lib/handlers/pubrec.js index b312f60e3..8f9a1666b 100644 --- a/lib/handlers/pubrec.js +++ b/lib/handlers/pubrec.js @@ -1,3 +1,81 @@ -export async function handlePubRec () { - return true +export async function handlePubRec (packet) { + /* eslint no-fallthrough: "off" */ + var messageId = packet.messageId + var type = packet.cmd + var response = null + var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + var that = this + var err + + if (!cb) { + debug('_handleAck :: Server sent an ack in error. Ignoring.') + // Server sent an ack in error, ignore it. + return + } + + // Process + debug('_handleAck :: packet type', type) + switch (type) { + case 'pubcomp': + // same thing as puback for QoS 2 + case 'puback': + var pubackRC = packet.reasonCode + // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } + delete this.outgoing[messageId] + this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + break + case 'pubrec': + response = { + cmd: 'pubrel', + qos: 2, + messageId: messageId + } + var pubrecRC = packet.reasonCode + + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } + break + case 'suback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[messageId] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + } + } + this._invokeStoreProcessingQueue() + cb(null, packet) + break + case 'unsuback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + cb(null) + break + default: + that.emit('error', new Error('unrecognized packet type')) + } + + if (this.disconnecting && + Object.keys(this.outgoing).length === 0) { + this.emit('outgoingEmpty') + } } diff --git a/lib/handlers/pubrel.js b/lib/handlers/pubrel.js index ae73babd8..77bcfc7d2 100644 --- a/lib/handlers/pubrel.js +++ b/lib/handlers/pubrel.js @@ -1,3 +1,23 @@ -export async function handlePubRel () { - return true +export async function handlePubRel (packet) { + debug('handling pubrel packet') + callback = typeof callback !== 'undefined' ? callback : nop + var messageId = packet.messageId + var that = this + + var comp = {cmd: 'pubcomp', messageId: messageId} + + that.incomingStore.get(packet, function (err, pub) { + if (!err) { + that.emit('message', pub.topic, pub.payload, pub) + that.handleMessage(pub, function (err) { + if (err) { + return callback(err) + } + that.incomingStore.del(pub, nop) + that._sendPacket(comp, callback) + }) + } else { + that._sendPacket(comp, callback) + } + }) } diff --git a/lib/handlers/sub.js b/lib/handlers/sub.js index c5af630ea..c913e7098 100644 --- a/lib/handlers/sub.js +++ b/lib/handlers/sub.js @@ -1,3 +1,182 @@ + +/** + * subscribe - subscribe to + * + * @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos} + * @param {Object} [opts] - optional subscription options, includes: + * {Number} qos - subscribe qos level + * @param {Function} [callback] - function(err, granted){} where: + * {Error} err - subscription error (none at the moment!) + * {Array} granted - array of {topic: 't', qos: 0} + * @returns {MqttClient} this - for chaining + * @api public + * @example client.subscribe('topic'); + * @example client.subscribe('topic', {qos: 1}); + * @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log); + * @example client.subscribe('topic', console.log); + */ export async function handleSub () { - return true + var that = this + var args = new Array(arguments.length) + for (var i = 0; i < arguments.length; i++) { + args[i] = arguments[i] + } + var subs = [] + var obj = args.shift() + var resubscribe = obj.resubscribe + var callback = args.pop() || nop + var opts = args.pop() + var version = this.options.protocolVersion + + delete obj.resubscribe + + if (typeof obj === 'string') { + obj = [obj] + } + + if (typeof callback !== 'function') { + opts = callback + callback = nop + } + + var invalidTopic = validations.validateTopics(obj) + if (invalidTopic !== null) { + setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) + return this + } + + if (this._checkDisconnecting(callback)) { + debug('subscribe: discconecting true') + return this + } + + var defaultOpts = { + qos: 0 + } + if (version === 5) { + defaultOpts.nl = false + defaultOpts.rap = false + defaultOpts.rh = 0 + } + opts = xtend(defaultOpts, opts) + + if (Array.isArray(obj)) { + obj.forEach(function (topic) { + debug('subscribe: array topic %s', topic) + if (!that._resubscribeTopics.hasOwnProperty(topic) || + that._resubscribeTopics[topic].qos < opts.qos || + resubscribe) { + var currentOpts = { + topic: topic, + qos: opts.qos + } + if (version === 5) { + currentOpts.nl = opts.nl + currentOpts.rap = opts.rap + currentOpts.rh = opts.rh + currentOpts.properties = opts.properties + } + debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos) + subs.push(currentOpts) + } + }) + } else { + Object + .keys(obj) + .forEach(function (k) { + debug('subscribe: object topic %s', k) + if (!that._resubscribeTopics.hasOwnProperty(k) || + that._resubscribeTopics[k].qos < obj[k].qos || + resubscribe) { + var currentOpts = { + topic: k, + qos: obj[k].qos + } + if (version === 5) { + currentOpts.nl = obj[k].nl + currentOpts.rap = obj[k].rap + currentOpts.rh = obj[k].rh + currentOpts.properties = opts.properties + } + debug('subscribe: pushing `%s` to subs list', currentOpts) + subs.push(currentOpts) + } + }) + } + + if (!subs.length) { + callback(null, []) + return this + } + + var subscribeProc = function () { + var messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + + var packet = { + cmd: 'subscribe', + subscriptions: subs, + qos: 1, + retain: false, + dup: false, + messageId: messageId + } + + if (opts.properties) { + packet.properties = opts.properties + } + + // subscriptions to resubscribe to in case of disconnect + if (that.options.resubscribe) { + debug('subscribe :: resubscribe true') + var topics = [] + subs.forEach(function (sub) { + if (that.options.reconnectPeriod > 0) { + var topic = { qos: sub.qos } + if (version === 5) { + topic.nl = sub.nl || false + topic.rap = sub.rap || false + topic.rh = sub.rh || 0 + topic.properties = sub.properties + } + that._resubscribeTopics[sub.topic] = topic + topics.push(sub.topic) + } + }) + that.messageIdToTopic[packet.messageId] = topics + } + + that.outgoing[packet.messageId] = { + volatile: true, + cb: function (err, packet) { + if (!err) { + var granted = packet.granted + for (var i = 0; i < granted.length; i += 1) { + subs[i].qos = granted[i] + } + } + + callback(err, subs) + } + } + debug('subscribe :: call _sendPacket') + that._sendPacket(packet) + return true + } + + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': subscribeProc, + 'callback': callback + } + ) + } else { + subscribeProc() + } + + return this } diff --git a/lib/handlers/suback.js b/lib/handlers/suback.js index 0049e3f92..2f9cee01f 100644 --- a/lib/handlers/suback.js +++ b/lib/handlers/suback.js @@ -1,3 +1,80 @@ -export async function handleSubAck () { - return true -} +export async function handleSubAck (packet) { + /* eslint no-fallthrough: "off" */ + var messageId = packet.messageId + var type = packet.cmd + var response = null + var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + var that = this + var err + + if (!cb) { + debug('_handleAck :: Server sent an ack in error. Ignoring.') + // Server sent an ack in error, ignore it. + return + } + + // Process + debug('_handleAck :: packet type', type) + switch (type) { + case 'pubcomp': + // same thing as puback for QoS 2 + case 'puback': + var pubackRC = packet.reasonCode + // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } + delete this.outgoing[messageId] + this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + break + case 'pubrec': + response = { + cmd: 'pubrel', + qos: 2, + messageId: messageId + } + var pubrecRC = packet.reasonCode + + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } + break + case 'suback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[messageId] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + } + } + this._invokeStoreProcessingQueue() + cb(null, packet) + break + case 'unsuback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + cb(null) + break + default: + that.emit('error', new Error('unrecognized packet type')) + } + + if (this.disconnecting && + Object.keys(this.outgoing).length === 0) { + this.emit('outgoingEmpty') + }} diff --git a/lib/handlers/unsub.js b/lib/handlers/unsub.js index 0ffb6af9f..db4691927 100644 --- a/lib/handlers/unsub.js +++ b/lib/handlers/unsub.js @@ -1,3 +1,93 @@ +/** + * unsubscribe - unsubscribe from topic(s) + * + * @param {String, Array} topic - topics to unsubscribe from + * @param {Object} [opts] - optional subscription options, includes: + * {Object} properties - properties of unsubscribe packet + * @param {Function} [callback] - callback fired on unsuback + * @returns {MqttClient} this - for chaining + * @api public + * @example client.unsubscribe('topic'); + * @example client.unsubscribe('topic', console.log); + */ + export async function handleUnsub () { - return true + var that = this + var args = new Array(arguments.length) + for (var i = 0; i < arguments.length; i++) { + args[i] = arguments[i] + } + var topic = args.shift() + var callback = args.pop() || nop + var opts = args.pop() + if (typeof topic === 'string') { + topic = [topic] + } + + if (typeof callback !== 'function') { + opts = callback + callback = nop + } + + var invalidTopic = validations.validateTopics(topic) + if (invalidTopic !== null) { + setImmediate(callback, new Error('Invalid topic ' + invalidTopic)) + return this + } + + if (that._checkDisconnecting(callback)) { + return this + } + + var unsubscribeProc = function () { + var messageId = that._nextId() + if (messageId === null) { + debug('No messageId left') + return false + } + var packet = { + cmd: 'unsubscribe', + qos: 1, + messageId: messageId + } + + if (typeof topic === 'string') { + packet.unsubscriptions = [topic] + } else if (Array.isArray(topic)) { + packet.unsubscriptions = topic + } + + if (that.options.resubscribe) { + packet.unsubscriptions.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + + if (typeof opts === 'object' && opts.properties) { + packet.properties = opts.properties + } + + that.outgoing[packet.messageId] = { + volatile: true, + cb: callback + } + + debug('unsubscribe: call _sendPacket') + that._sendPacket(packet) + + return true + } + + if (this._storeProcessing || this._storeProcessingQueue.length > 0) { + this._storeProcessingQueue.push( + { + 'invoke': unsubscribeProc, + 'callback': callback + } + ) + } else { + unsubscribeProc() + } + + return this } diff --git a/lib/handlers/unsuback.js b/lib/handlers/unsuback.js index a21275ede..06fcf1f1e 100644 --- a/lib/handlers/unsuback.js +++ b/lib/handlers/unsuback.js @@ -1,3 +1,80 @@ -export async function handleUnsubAck () { - return true -} +export async function handleUnsubAck (packet) { + /* eslint no-fallthrough: "off" */ + var messageId = packet.messageId + var type = packet.cmd + var response = null + var cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null + var that = this + var err + + if (!cb) { + debug('_handleAck :: Server sent an ack in error. Ignoring.') + // Server sent an ack in error, ignore it. + return + } + + // Process + debug('_handleAck :: packet type', type) + switch (type) { + case 'pubcomp': + // same thing as puback for QoS 2 + case 'puback': + var pubackRC = packet.reasonCode + // Callback - we're done + if (pubackRC && pubackRC > 0 && pubackRC !== 16) { + err = new Error('Publish error: ' + errors[pubackRC]) + err.code = pubackRC + cb(err, packet) + } + delete this.outgoing[messageId] + this.outgoingStore.del(packet, cb) + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + break + case 'pubrec': + response = { + cmd: 'pubrel', + qos: 2, + messageId: messageId + } + var pubrecRC = packet.reasonCode + + if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) { + err = new Error('Publish error: ' + errors[pubrecRC]) + err.code = pubrecRC + cb(err, packet) + } else { + this._sendPacket(response) + } + break + case 'suback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + for (var grantedI = 0; grantedI < packet.granted.length; grantedI++) { + if ((packet.granted[grantedI] & 0x80) !== 0) { + // suback with Failure status + var topics = this.messageIdToTopic[messageId] + if (topics) { + topics.forEach(function (topic) { + delete that._resubscribeTopics[topic] + }) + } + } + } + this._invokeStoreProcessingQueue() + cb(null, packet) + break + case 'unsuback': + delete this.outgoing[messageId] + this.messageIdProvider.deallocate(messageId) + this._invokeStoreProcessingQueue() + cb(null) + break + default: + that.emit('error', new Error('unrecognized packet type')) + } + + if (this.disconnecting && + Object.keys(this.outgoing).length === 0) { + this.emit('outgoingEmpty') + }} diff --git a/lib/index.js b/lib/index.js index 15a0b3aac..521677e73 100644 --- a/lib/index.js +++ b/lib/index.js @@ -10,4 +10,65 @@ import { Store } from './store' import { DefaultMessageIdProvider } from './default-message-id-provider' import { UniqueMessageIdProvider } from './unique-message-id-provider' + +/** + * connect() + * Connect will: + * 1) Validate the options provided by the user. + * 2) Instantiate a new client. + * 3) Return the client to the user. + */ +function connect (options) { + _validateProtocol(opts) +} + +function _validateProtocol(opts) { + if (opts.cert && opts.key) { + if (opts.protocol) { + if (['mqtts', 'wss', 'wxs', 'alis'].indexOf(opts.protocol) === -1) { + switch (opts.protocol) { + case 'mqtt': + opts.protocol = 'mqtts' + break + case 'ws': + opts.protocol = 'wss' + break + case 'wx': + opts.protocol = 'wxs' + break + case 'ali': + opts.protocol = 'alis' + break + default: + throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!') + } + } + } else { + // A cert and key was provided, however no protocol was specified, so we will throw an error. + throw new Error('Missing secure protocol key') + } + } + + if (!protocols[opts.protocol]) { + var isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 + // I don't like that we are manipulating the opts object here. + opts.protocol = [ + 'mqtt', + 'mqtts', + 'ws', + 'wss', + 'wx', + 'wxs', + 'ali', + 'alis' + ].filter(function (key, index) { + if (isSecure && index % 2 === 0) { + // Skip insecure protocols when requesting a secure one. + return false + } + return (typeof protocols[key] === 'function') + })[0] + } +} + export {connect, Store, DefaultMessageIdProvider, UniqueMessageIdProvider} diff --git a/package.json b/package.json index 3be7ba77e..3f2ff2f24 100644 --- a/package.json +++ b/package.json @@ -24,20 +24,11 @@ "scripts": { "test": "node_modules/.bin/nyc --reporter=lcov --reporter=text ./node_modules/mocha/bin/_mocha", "pretest": "standard | snazzy", - "tslint": "tslint types/**/*.d.ts", - "typescript-compile-test": "tsc -p test/typescript/tsconfig.json", - "typescript-compile-execute": "node test/typescript/*.js", - "typescript-test": "npm run typescript-compile-test && npm run typescript-compile-execute", "browser-build": "rimraf dist/ && mkdirp dist/ && browserify mqtt.js --standalone mqtt > dist/mqtt.js && uglifyjs dist/mqtt.js --compress --mangle --output dist/mqtt.min.js", - "prepare": "npm run browser-build", "browser-test": "airtap --server test/browser/server.js --local --open test/browser/test.js", "sauce-test": "airtap --server test/browser/server.js -- test/browser/test.js", - "ci": "npm run tslint && npm run typescript-compile-test && npm run test && codecov" + "ci": "npm run test && codecov" }, - "pre-commit": [ - "pretest", - "tslint" - ], "bin": { "mqtt_pub": "./bin/pub.js", "mqtt_sub": "./bin/sub.js", @@ -62,44 +53,48 @@ "net": false }, "dependencies": { - "commist": "^1.0.0", + "commist": "^2.0.0", "concat-stream": "^2.0.0", - "debug": "^4.1.1", - "duplexify": "^4.1.1", + "debug": "^4.3.2", + "duplexify": "^4.1.2", "help-me": "^3.0.0", - "inherits": "^2.0.3", + "inherits": "^2.0.4", "minimist": "^1.2.5", - "mqtt-packet": "^6.8.0", + "mqtt-packet": "^7.0.0", "number-allocator": "^1.0.7", "pump": "^3.0.0", "readable-stream": "^3.6.0", "reinterval": "^1.1.0", - "split2": "^3.1.0", - "ws": "^7.5.0", + "split2": "^3.2.2", + "ws": "^8.2.0", "xtend": "^4.0.2" }, "devDependencies": { - "@types/node": "^10.0.0", - "airtap": "^3.0.0", - "browserify": "^16.5.0", - "chai": "^4.2.0", - "codecov": "^3.0.4", - "end-of-stream": "^1.4.1", - "global": "^4.3.2", - "aedes": "^0.42.5", - "mkdirp": "^0.5.1", - "mocha": "^4.1.0", - "mqtt-connection": "^4.0.0", - "nyc": "^15.0.1", - "pre-commit": "^1.2.2", + "@types/node": "^16.7.1", + "aedes": "^0.46.1", + "airtap": "^4.0.3", + "browserify": "^17.0.0", + "chai": "^4.3.4", + "codecov": "^3.8.3", + "end-of-stream": "^1.4.4", + "eslint": "^7.32.0", + "eslint-config-standard": "^16.0.3", + "eslint-plugin-import": "^2.24.1", + "eslint-plugin-node": "^11.1.0", + "eslint-plugin-promise": "^5.1.0", + "global": "^4.4.0", + "mkdirp": "^1.0.4", + "mocha": "^9.1.0", + "mqtt-connection": "^4.1.0", + "nyc": "^15.1.0", "rimraf": "^3.0.2", - "should": "^13.2.1", - "sinon": "^9.0.0", - "snazzy": "^8.0.0", - "standard": "^11.0.1", - "tslint": "^5.11.0", + "should": "^13.2.3", + "sinon": "^11.1.2", + "snazzy": "^9.0.0", + "standard": "^16.0.3", + "tslint": "^5.20.1", "tslint-config-standard": "^8.0.1", - "typescript": "^3.2.2", + "typescript": "^4.3.5", "uglify-es": "^3.3.9" }, "standard": { diff --git a/tslint.json b/tslint.json deleted file mode 100644 index 45052adc8..000000000 --- a/tslint.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "extends": "tslint-config-standard" -}