From 83e8037f349918f18f673fbca226b0abb583c278 Mon Sep 17 00:00:00 2001 From: taoqf Date: Wed, 2 Aug 2017 10:10:15 +0800 Subject: [PATCH] add support for weapp --- .gitignore | 1 + .jshintrc | 1 - README.md | 22 +++ lib/connect/index.js | 300 +++++++++++++++++----------------- lib/connect/wx.js | 126 ++++++++++++++ package.json | 4 +- test/browser/test.js | 2 +- test/browser/wx.js | 95 +++++++++++ test/helpers/wx.js | 28 ++++ test/mqtt.js | 29 +++- types/lib/client-options.d.ts | 2 +- 11 files changed, 459 insertions(+), 151 deletions(-) create mode 100644 lib/connect/wx.js create mode 100644 test/browser/wx.js create mode 100644 test/helpers/wx.js diff --git a/.gitignore b/.gitignore index efa540758..33bb22aeb 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ coverage test/typescript/.idea/* test/typescript/*.js test/typescript/*.map +package-lock.json diff --git a/.jshintrc b/.jshintrc index 2e6e265b0..cf7833626 100644 --- a/.jshintrc +++ b/.jshintrc @@ -17,6 +17,5 @@ "mocha": true, "indent": 2, "latedef": true, - "immed": true, "shadow": false } \ No newline at end of file diff --git a/README.md b/README.md index 28f031c4b..9a06048a7 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ in JavaScript for node.js and the browser. * [Command Line Tools](#cli) * [API](#api) * [Browser](#browser) +* [Weapp](#weapp) * [About QoS](#qos) * [TypeScript](#typescript) * [Contributing](#contributing) @@ -433,6 +434,27 @@ The MQTT.js bundle is available through http://unpkg.com, specifically at https://unpkg.com/mqtt/dist/mqtt.min.js. See http://unpkg.com for the full documentation on version ranges. + +## Wexin App +Surport [Weixin App](https://mp.weixin.qq.com/). See [Doc](https://mp.weixin.qq.com/debug/wxadoc/dev/api/network-socket.html). + +## Example +```js +var mqtt = require('mqtt') +var client = mqtt.connect('wxs://test.mosquitto.org') + +client.on('connect', function () { + client.subscribe('presence') + client.publish('presence', 'Hello mqtt') +}) + +client.on('message', function (topic, message) { + // message is Buffer + console.log(message.toString()) + client.end() +}) +``` + ### Browserify diff --git a/lib/connect/index.js b/lib/connect/index.js index 2d8debca7..0e671d84a 100644 --- a/lib/connect/index.js +++ b/lib/connect/index.js @@ -1,146 +1,154 @@ -'use strict' - -var MqttClient = require('../client') -var url = require('url') -var xtend = require('xtend') -var protocols = {} - -if (process.title !== 'browser') { - protocols.mqtt = require('./tcp') - protocols.tcp = require('./tcp') - protocols.ssl = require('./tls') - protocols.tls = require('./tls') - protocols.mqtts = require('./tls') -} - -protocols.ws = require('./ws') -protocols.wss = require('./ws') - -/** - * Parse the auth attribute and merge username and password in the options object. - * - * @param {Object} [opts] option object - */ -function parseAuthOptions (opts) { - var matches - if (opts.auth) { - matches = opts.auth.match(/^(.+):(.+)$/) - if (matches) { - opts.username = matches[1] - opts.password = matches[2] - } else { - opts.username = opts.auth - } - } -} - -/** - * connect - connect to an MQTT broker. - * - * @param {String} [brokerUrl] - url of the broker, optional - * @param {Object} opts - see MqttClient#constructor - */ -function connect (brokerUrl, opts) { - if ((typeof brokerUrl === 'object') && !opts) { - opts = brokerUrl - brokerUrl = null - } - - opts = opts || {} - - if (brokerUrl) { - var parsed = url.parse(brokerUrl, true) - if (parsed.port != null) { - parsed.port = Number(parsed.port) - } - - opts = xtend(parsed, opts) - - if (opts.protocol === null) { - throw new Error('Missing protocol') - } - opts.protocol = opts.protocol.replace(/:$/, '') - } - - // merge in the auth options if supplied - parseAuthOptions(opts) - - // support clientId passed in the query string of the url - if (opts.query && typeof opts.query.clientId === 'string') { - opts.clientId = opts.query.clientId - } - - if (opts.cert && opts.key) { - if (opts.protocol) { - if (['mqtts', 'wss'].indexOf(opts.protocol) === -1) { - /* - * jshint and eslint - * complains that break from default cannot be reached after throw - * it is a foced exit from a control structure - * maybe add a check after switch to see if it went through default - * and then throw the error - */ - /* jshint -W027 */ - /* eslint no-unreachable:1 */ - switch (opts.protocol) { - case 'mqtt': - opts.protocol = 'mqtts' - break - case 'ws': - opts.protocol = 'wss' - break - default: - throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!') - break - } - /* eslint no-unreachable:0 */ - /* jshint +W027 */ - } - } else { - // don't know what protocol he want to use, mqtts or wss - throw new Error('Missing secure protocol key') - } - } - - if (!protocols[opts.protocol]) { - var isSecure = ['mqtts', 'wss'].indexOf(opts.protocol) !== -1 - opts.protocol = [ - 'mqtt', - 'mqtts', - 'ws', - 'wss' - ].filter(function (key, index) { - if (isSecure && index % 2 === 0) { - // Skip insecure protocols when requesting a secure one. - return false - } - return (typeof protocols[key] === 'function') - })[0] - } - - if (opts.clean === false && !opts.clientId) { - throw new Error('Missing clientId for unclean clients') - } - - function wrapper (client) { - if (opts.servers) { - if (!client._reconnectCount || client._reconnectCount === opts.servers.length) { - client._reconnectCount = 0 - } - - opts.host = opts.servers[client._reconnectCount].host - opts.port = opts.servers[client._reconnectCount].port - opts.hostname = opts.host - - client._reconnectCount++ - } - - return protocols[opts.protocol](client, opts) - } - - return new MqttClient(wrapper, opts) -} - -module.exports = connect -module.exports.connect = connect -module.exports.MqttClient = MqttClient +'use strict' + +var MqttClient = require('../client') +var url = require('url') +var xtend = require('xtend') +var protocols = {} + +if (process.title !== 'browser') { + protocols.mqtt = require('./tcp') + protocols.tcp = require('./tcp') + protocols.ssl = require('./tls') + protocols.tls = require('./tls') + protocols.mqtts = require('./tls') +} else { + protocols.wx = require('./wx') + protocols.wxs = require('./wx') +} + +protocols.ws = require('./ws') +protocols.wss = require('./ws') + +/** + * Parse the auth attribute and merge username and password in the options object. + * + * @param {Object} [opts] option object + */ +function parseAuthOptions (opts) { + var matches + if (opts.auth) { + matches = opts.auth.match(/^(.+):(.+)$/) + if (matches) { + opts.username = matches[1] + opts.password = matches[2] + } else { + opts.username = opts.auth + } + } +} + +/** + * connect - connect to an MQTT broker. + * + * @param {String} [brokerUrl] - url of the broker, optional + * @param {Object} opts - see MqttClient#constructor + */ +function connect (brokerUrl, opts) { + if ((typeof brokerUrl === 'object') && !opts) { + opts = brokerUrl + brokerUrl = null + } + + opts = opts || {} + + if (brokerUrl) { + var parsed = url.parse(brokerUrl, true) + if (parsed.port != null) { + parsed.port = Number(parsed.port) + } + + opts = xtend(parsed, opts) + + if (opts.protocol === null) { + throw new Error('Missing protocol') + } + opts.protocol = opts.protocol.replace(/:$/, '') + } + + // merge in the auth options if supplied + parseAuthOptions(opts) + + // support clientId passed in the query string of the url + if (opts.query && typeof opts.query.clientId === 'string') { + opts.clientId = opts.query.clientId + } + + if (opts.cert && opts.key) { + if (opts.protocol) { + if (['mqtts', 'wss', 'wxs'].indexOf(opts.protocol) === -1) { + /* + * jshint and eslint + * complains that break from default cannot be reached after throw + * it is a foced exit from a control structure + * maybe add a check after switch to see if it went through default + * and then throw the error + */ + /* jshint -W027 */ + /* eslint no-unreachable:1 */ + switch (opts.protocol) { + case 'mqtt': + opts.protocol = 'mqtts' + break + case 'ws': + opts.protocol = 'wss' + break + case 'wx': + opts.protocol = 'wxs' + break + default: + throw new Error('Unknown protocol for secure connection: "' + opts.protocol + '"!') + break + } + /* eslint no-unreachable:0 */ + /* jshint +W027 */ + } + } else { + // don't know what protocol he want to use, mqtts or wss + throw new Error('Missing secure protocol key') + } + } + + if (!protocols[opts.protocol]) { + var isSecure = ['mqtts', 'wss', 'wxs'].indexOf(opts.protocol) !== -1 + opts.protocol = [ + 'mqtt', + 'mqtts', + 'ws', + 'wss', + 'wx', + 'wxs' + ].filter(function (key, index) { + if (isSecure && index % 2 === 0) { + // Skip insecure protocols when requesting a secure one. + return false + } + return (typeof protocols[key] === 'function') + })[0] + } + + if (opts.clean === false && !opts.clientId) { + throw new Error('Missing clientId for unclean clients') + } + + function wrapper (client) { + if (opts.servers) { + if (!client._reconnectCount || client._reconnectCount === opts.servers.length) { + client._reconnectCount = 0 + } + + opts.host = opts.servers[client._reconnectCount].host + opts.port = opts.servers[client._reconnectCount].port + opts.hostname = opts.host + + client._reconnectCount++ + } + + return protocols[opts.protocol](client, opts) + } + + return new MqttClient(wrapper, opts) +} + +module.exports = connect +module.exports.connect = connect +module.exports.MqttClient = MqttClient diff --git a/lib/connect/wx.js b/lib/connect/wx.js new file mode 100644 index 000000000..daf406e68 --- /dev/null +++ b/lib/connect/wx.js @@ -0,0 +1,126 @@ +'use strict' + +/* global wx */ +var socketOpen = false +var socketMsgQueue = [] + +function sendSocketMessage (msg) { + if (socketOpen) { + wx.sendSocketMessage({ + data: msg + }) + } else { + socketMsgQueue.push(msg) + } +} + +function WebSocket (url, protocols) { + console.log('creating WebSocket...', arguments) + + var ws = { + OPEN: 1, + CLOSING: 2, + CLOSED: 3, + readyState: socketOpen ? 1 : 0, + send: sendSocketMessage, + close: wx.closeSocket, + onopen: null, + onmessage: null, + onclose: null, + onerror: null + } + + wx.connectSocket({ + url: url, + protocols: protocols + }) + wx.onSocketOpen(function (res) { + ws.readyState = ws.OPEN + socketOpen = true + for (var i = 0; i < socketMsgQueue.length; i++) { + sendSocketMessage(socketMsgQueue[i]) + } + socketMsgQueue = [] + + ws.onopen && ws.onopen.apply(ws, arguments) + }) + wx.onSocketMessage(function (res) { + ws.onmessage && ws.onmessage.apply(ws, arguments) + }) + wx.onSocketClose(function () { + ws.readyState = ws.CLOSED + ws.onclose && ws.onclose.apply(ws, arguments) + }) + wx.onSocketError(function () { + ws.onerror && ws.onerror.apply(ws, arguments) + }) + + return ws +} + +var websocket = require('websocket-stream') +var urlModule = require('url') + +function buildUrl (opts, client) { + var protocol = opts.protocol === 'wxs' ? 'wss' : 'ws' + var url = protocol + '://' + opts.hostname + ':' + opts.port + opts.path + if (typeof (opts.transformWsUrl) === 'function') { + url = opts.transformWsUrl(url, opts, client) + } + return url +} + +function setDefaultOpts (opts) { + if (!opts.hostname) { + opts.hostname = 'localhost' + } + if (!opts.port) { + if (opts.protocol === 'wss') { + opts.port = 443 + } else { + opts.port = 80 + } + } + if (!opts.path) { + opts.path = '/' + } + + if (!opts.wsOptions) { + opts.wsOptions = {} + } +} + +function createWebSocket (client, opts) { + var websocketSubProtocol = + (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) + ? 'mqttv3.1' + : 'mqtt' + + setDefaultOpts(opts) + var url = buildUrl(opts, client) + return websocket(WebSocket(url, [websocketSubProtocol])) +} + +function buildBuilder (client, opts) { + if (!opts.hostname) { + opts.hostname = opts.host + } + + if (!opts.hostname) { + // Throwing an error in a Web Worker if no `hostname` is given, because we + // can not determine the `hostname` automatically. If connecting to + // localhost, please supply the `hostname` as an argument. + if (typeof (document) === 'undefined') { + throw new Error('Could not determine host. Specify host manually.') + } + var parsed = urlModule.parse(document.URL) + opts.hostname = parsed.hostname + + if (!opts.port) { + opts.port = parsed.port + } + } + return createWebSocket(client, opts) +} + +module.exports = buildBuilder diff --git a/package.json b/package.json index b63d172fd..925415b86 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,8 @@ "prepare": "nsp check && npm run browser-build", "browser-build": "rimraf dist/ && mkdirp dist/ && browserify mqtt.js -s mqtt > dist/mqtt.js && uglifyjs < dist/mqtt.js > dist/mqtt.min.js", "browser-test": "zuul --server test/browser/server.js --local --open test/browser/test.js", - "sauce-test": "zuul --server test/browser/server.js --tunnel ngrok -- test/browser/test.js", + "weapp-test": "zuul --server test/browser/server.js --local --open test/browser/wx.js", + "sauce-test": "zuul --no-coverage --server test/browser/server.js --tunnel ngrok -- test/browser/wx.js", "ci": "npm run tslint && npm run typescript-test && npm run test && codecov" }, "pre-commit": [ @@ -80,6 +81,7 @@ "@types/node": "*", "browserify": "^14.4.0", "codecov": "^2.0.0", + "global": "^4.3.2", "istanbul": "^0.4.5", "mkdirp": "^0.5.1", "mocha": "^3.2.0", diff --git a/test/browser/test.js b/test/browser/test.js index 048aad0a7..4d3bfe29f 100644 --- a/test/browser/test.js +++ b/test/browser/test.js @@ -60,7 +60,7 @@ function suiteFactory (configName, opts) { }) }) - if (parsed.host === 'localhost') { + if (parsed.hostname === 'localhost') { describe('specifying a port', function () { clientTests(function () { return mqtt.connect(setVersion({ protocol: protocol, port: port })) diff --git a/test/browser/wx.js b/test/browser/wx.js new file mode 100644 index 000000000..28c2bf9c8 --- /dev/null +++ b/test/browser/wx.js @@ -0,0 +1,95 @@ +'use strict' + +var mqtt = require('../../lib/connect') +var _URL = require('url') +var xtend = require('xtend') +var parsed = _URL.parse(document.URL) +var isHttps = parsed.protocol === 'https:' +var port = parsed.port || (isHttps ? 443 : 80) +var host = parsed.hostname +var protocol = isHttps ? 'wxs' : 'wx' +require('../helpers/wx') + +function clientTests (buildClient) { + var client + + beforeEach(function () { + client = buildClient() + client.on('offline', function () { + console.log('client offline') + }) + client.on('connect', function () { + console.log('client connect') + }) + client.on('reconnect', function () { + console.log('client reconnect') + }) + }) + + afterEach(function (done) { + client.once('close', function () { + done() + }) + client.end() + }) + + it('should connect', function (done) { + client.on('connect', function () { + done() + }) + }) + + it('should publish and subscribe', function (done) { + client.subscribe('hello', function () { + done() + }).publish('hello', 'world') + }) +} + +function suiteFactory (configName, opts) { + function setVersion (base) { + return xtend(base || {}, opts) + } + + var suiteName = 'MqttClient(' + configName + '=' + JSON.stringify(opts) + ')' + describe(suiteName, function () { + this.timeout(10000) + + describe('specifying nothing', function () { + clientTests(function () { + return mqtt.connect(setVersion()) + }) + }) + + if (parsed.hostname === 'localhost') { + describe('specifying a port', function () { + clientTests(function () { + return mqtt.connect(setVersion({ protocol: protocol, port: port })) + }) + }) + } + + describe('specifying a port and host', function () { + clientTests(function () { + return mqtt.connect(setVersion( + { protocol: protocol, port: port, host: host })) + }) + }) + + describe('specifying a URL', function () { + clientTests(function () { + return mqtt.connect(protocol + '://' + host + ':' + port, setVersion()) + }) + }) + + describe('specifying a URL with a path', function () { + clientTests(function () { + return mqtt.connect(protocol + '://' + host + ':' + port + '/mqtt', + setVersion()) + }) + }) + }) +} + +suiteFactory('v3', {protocolId: 'MQIsdp', protocolVersion: 3}) +suiteFactory('default', {}) diff --git a/test/helpers/wx.js b/test/helpers/wx.js new file mode 100644 index 000000000..9f6362dd1 --- /dev/null +++ b/test/helpers/wx.js @@ -0,0 +1,28 @@ +var global = require('global') + +var socket +global.wx = { + connectSocket: function (opts) { + if (!socket) { + socket = new global.WebSocket(opts.url, opts.protocols) + socket.binaryType = 'arraybuffer' + } + }, + onSocketOpen: function (callback) { + socket.onopen = callback + }, + onSocketMessage: function (callback) { + socket.onmessage = callback + }, + onSocketClose: function (callback) { + socket.onclose = callback + }, + onSocketError: function (callback) { + socket.onerror = callback + }, + sendSocketMessage: function (p) { + socket.send(p.data) + }, + closeSocket: function () { + } +} diff --git a/test/mqtt.js b/test/mqtt.js index 213c4297a..13cccb71f 100644 --- a/test/mqtt.js +++ b/test/mqtt.js @@ -3,6 +3,7 @@ var fs = require('fs') var path = require('path') var mqtt = require('../') +require('./helpers/wx') describe('mqtt', function () { describe('#connect', function () { @@ -106,6 +107,32 @@ describe('mqtt', function () { c.should.be.instanceOf(mqtt.MqttClient) }) + it.skip('should return an MqttClient when connect is called with wx:/ url', function () { + try { + var c = mqtt.connect('wx://localhost', sslOpts) + + c.options.should.have.property('protocol', 'wx') + + c.on('error', function () {}) + + c.should.be.instanceOf(mqtt.MqttClient) + } catch (e) { + } + }) + + it.skip('should return an MqttClient when connect is called with wxs:/ url', function () { + try { + var c = mqtt.connect('wxs://localhost', sslOpts) + + c.options.should.have.property('protocol', 'wxs') + + c.on('error', function () {}) + + c.should.be.instanceOf(mqtt.MqttClient) + } catch (e) { + } + }) + sslOpts2 = { key: fs.readFileSync(path.join(__dirname, 'helpers', 'private-key.pem')), cert: fs.readFileSync(path.join(__dirname, 'helpers', 'public-cert.pem')), @@ -120,7 +147,7 @@ describe('mqtt', function () { }).should.throw('Missing secure protocol key') }) - it('should throw an error when it is called with cert and key set and protocol other than allowed: mqtt,mqtts,ws,wss', function () { + it('should throw an error when it is called with cert and key set and protocol other than allowed: mqtt,mqtts,ws,wss,wx,wxs', function () { (function () { sslOpts2.protocol = 'UNKNOWNPROTOCOL' var c = mqtt.connect(sslOpts2) diff --git a/types/lib/client-options.d.ts b/types/lib/client-options.d.ts index c91e98ce8..45888667e 100644 --- a/types/lib/client-options.d.ts +++ b/types/lib/client-options.d.ts @@ -7,7 +7,7 @@ export interface IClientOptions extends ISecureClientOptions { host?: string // host does NOT include port hostname?: string path?: string - protocol?: 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' + protocol?: 'wss' | 'ws' | 'mqtt' | 'mqtts' | 'tcp' | 'ssl' | 'wx' | 'wxs' wsOptions?: { [x: string]: any;