diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index 1cbcf2b..0000000 --- a/.aegir.js +++ /dev/null @@ -1,71 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const parallel = require('async/parallel') -const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous') - -const Node = require('./test/utils/nodejs-bundle.js') -const { - getPeerRelay, - WS_RENDEZVOUS_MULTIADDR -} = require('./test/utils/constants') - -let wsRendezvous -let node - -const before = (done) => { - parallel([ - (cb) => { - WebSocketStarRendezvous.start({ - port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, - refreshPeerListIntervalMS: 1000, - strictMultiaddr: false, - cryptoChallenge: true - }, (err, _server) => { - if (err) { - return cb(err) - } - wsRendezvous = _server - cb() - }) - }, - (cb) => { - getPeerRelay((err, peerInfo) => { - if (err) { - return done(err) - } - - node = new Node({ - peerInfo, - config: { - relay: { - enabled: true, - hop: { - enabled: true, - active: true - } - } - } - }) - - node.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) - node.start(cb) - }) - } - ], done) -} - -const after = (done) => { - setTimeout(() => - parallel( - [node, wsRendezvous].map((s) => (cb) => s.stop(cb)), - done), - 2000) -} - -module.exports = { - hooks: { - pre: before, - post: after - } -} diff --git a/LICENSE b/LICENSE index 7789aab..58b2056 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ -MIT License +The MIT License (MIT) -Copyright (c) 2016 libp2p +Copyright (c) 2019 Protocol Labs, Inc. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal @@ -9,13 +9,13 @@ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md index 707c73c..1823e50 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ js-libp2p-floodsub - [Install](#install) - [Usage](#usage) - [API](#api) +- [Events](#events) - [Contribute](#contribute) - [License](#license) @@ -31,26 +32,39 @@ js-libp2p-floodsub > npm install libp2p-floodsub ``` -## Examples +## Usage ```JavaScript const FloodSub = require('libp2p-floodsub') -const fsub = new FloodSub(node) +// registrar is provided by libp2p +const fsub = new FloodSub(peerInfo, registrar, options) -fsub.start((err) => { - if (err) { - console.log('Upsy', err) - } - fsub.on('fruit', (data) => { - console.log(data) - }) - fsub.subscribe('fruit') +await fsub.start() - fsub.publish('fruit', new Buffer('banana')) +fsub.on('fruit', (data) => { + console.log(data) }) +fsub.subscribe('fruit') + +fsub.publish('fruit', new Buffer('banana')) +``` + +## API + +### Create a floodsub implementation + +```js +const options = {…} +const floodsub = new Floodsub(peerInfo, registrar, options) ``` +Options is an optional object with the following key-value pairs: + +* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**). + +For the remaining API, see https://github.com/libp2p/js-libp2p-pubsub + ## Events Floodsub emits two kinds of events: @@ -68,28 +82,12 @@ Floodsub emits two kinds of events: - `changes`: an array of `{ topicID: , subscribe: }` eg `[ { topicID: 'fruit', subscribe: true }, { topicID: 'vegetables': false } ]` - -## API - -### Create a floodsub implementation - -```js -const options = {…} -const floodsub = new Floodsub(libp2pNode, options) -``` - -Options is an optional object with the following key-value pairs: - -* **`emitSelf`**: boolean identifying whether the node should emit to self on publish, in the event of the topic being subscribed (defaults to **false**). - -For more, see https://libp2p.github.io/js-libp2p-floodsub - ## Contribute -PRs are welcome! +Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js-libp2p-pubsub/issues)! -Small note: If editing the Readme, please conform to the [standard-readme](https://github.com/RichardLitt/standard-readme) specification. +This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). ## License -MIT © David Dias +Copyright (c) Protocol Labs, Inc. under the **MIT License**. See [LICENSE file](./LICENSE) for details. diff --git a/benchmarks/index.js b/benchmarks/index.js index 475b355..2ad540f 100644 --- a/benchmarks/index.js +++ b/benchmarks/index.js @@ -2,81 +2,78 @@ const Benchmark = require('benchmark') const crypto = require('crypto') -const map = require('async/map') -const parallel = require('async/parallel') -const series = require('async/series') -const PSG = require('../src') -const utils = require('../test/utils') +const DuplexPair = require('it-pair/duplex') + +const Floodsub = require('../src') +const { multicodec } = require('../src') +const { createPeerInfo } = require('../test/utils') const suite = new Benchmark.Suite('pubsub') // Simple benchmark, how many messages can we send from // one node to another. -map([0, 1], (i, cb) => { - utils.createNode((err, node) => { - if (err) { - return cb(err) +;(async () => { + const registrarRecordA = {} + const registrarRecordB = {} + + const registrar = (registrarRecord) => ({ + register: (multicodec, handlers) => { + registrarRecord[multicodec] = handlers + }, + unregister: (multicodec) => { + delete registrarRecord[multicodec] } + }) - const ps = new PSG(node) + const [peerInfoA, peerInfoB] = await Promise.all([ + createPeerInfo(), + createPeerInfo() + ]) - series([ - (cb) => node.start(cb), - (cb) => ps.start(cb) - ], (err) => { - if (err) { - return cb(err) - } + const fsA = new Floodsub(peerInfoA, registrar(registrarRecordA)) + const fsB = new Floodsub(peerInfoB, registrar(registrarRecordB)) - cb(null, { - libp2p: node, - ps - }) - }) - }) -}, (err, peers) => { - if (err) { - throw err - } - - parallel([ - (cb) => peers[0].libp2p.dial(peers[1].libp2p.peerInfo, cb), - (cb) => setTimeout(() => { - peers[0].ps.subscribe('Z', () => {}, () => {}) - peers[1].ps.subscribe('Z', () => {}, () => {}) - cb(null, peers) - }, 200) - ], (err, res) => { - if (err) { - throw err - } + // Start pubsub + await Promise.all([ + fsA.start(), + fsB.start() + ]) - const peers = res[1] + // Connect floodsub nodes + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect - suite.add('publish and receive', (deferred) => { - const onMsg = (msg) => { - deferred.resolve() - peers[1].ps.removeListener('Z', onMsg) - } + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) - peers[1].ps.on('Z', onMsg) + fsA.subscribe('Z') + fsB.subscribe('Z') - peers[0].ps.publish('Z', crypto.randomBytes(1024)) - }, { - defer: true - }) + suite.add('publish and receive', (deferred) => { + const onMsg = (msg) => { + deferred.resolve() + fsB.removeListener('Z', onMsg) + } + + fsB.on('Z', onMsg) - suite - .on('cycle', (event) => { - console.log(String(event.target)) // eslint-disable-line - }) - .on('complete', () => { - process.exit() - }) - .run({ - async: true - }) + fsA.publish('Z', crypto.randomBytes(1024)) + }, { + defer: true }) -}) + + suite + .on('cycle', (event) => { + console.log(String(event.target)) // eslint-disable-line + }) + .on('complete', () => { + process.exit() + }) + .run({ + async: true + }) +})() diff --git a/examples/pub-sub-1-topic/publisher.js b/examples/pub-sub-1-topic/publisher.js deleted file mode 100644 index 8fd5fb1..0000000 --- a/examples/pub-sub-1-topic/publisher.js +++ /dev/null @@ -1,54 +0,0 @@ -'use strict' - -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const multiaddr = require('multiaddr') -const Node = require('libp2p-ipfs-nodejs') -const FloodSub = require('../../src') -const series = require('async/series') - -const privateKey = 'CAASpgkwggSiAgEAAoIBAQC2SKo/HMFZeBml1AF3XijzrxrfQXdJzjePBZAbdxqKR1Mc6juRHXij6HXYPjlAk01BhF1S3Ll4Lwi0cAHhggf457sMg55UWyeGKeUv0ucgvCpBwlR5cQ020i0MgzjPWOLWq1rtvSbNcAi2ZEVn6+Q2EcHo3wUvWRtLeKz+DZSZfw2PEDC+DGPJPl7f8g7zl56YymmmzH9liZLNrzg/qidokUv5u1pdGrcpLuPNeTODk0cqKB+OUbuKj9GShYECCEjaybJDl9276oalL9ghBtSeEv20kugatTvYy590wFlJkkvyl+nPxIH0EEYMKK9XRWlu9XYnoSfboiwcv8M3SlsjAgMBAAECggEAZtju/bcKvKFPz0mkHiaJcpycy9STKphorpCT83srBVQi59CdFU6Mj+aL/xt0kCPMVigJw8P3/YCEJ9J+rS8BsoWE+xWUEsJvtXoT7vzPHaAtM3ci1HZd302Mz1+GgS8Epdx+7F5p80XAFLDUnELzOzKftvWGZmWfSeDnslwVONkL/1VAzwKy7Ce6hk4SxRE7l2NE2OklSHOzCGU1f78ZzVYKSnS5Ag9YrGjOAmTOXDbKNKN/qIorAQ1bovzGoCwx3iGIatQKFOxyVCyO1PsJYT7JO+kZbhBWRRE+L7l+ppPER9bdLFxs1t5CrKc078h+wuUr05S1P1JjXk68pk3+kQKBgQDeK8AR11373Mzib6uzpjGzgNRMzdYNuExWjxyxAzz53NAR7zrPHvXvfIqjDScLJ4NcRO2TddhXAfZoOPVH5k4PJHKLBPKuXZpWlookCAyENY7+Pd55S8r+a+MusrMagYNljb5WbVTgN8cgdpim9lbbIFlpN6SZaVjLQL3J8TWH6wKBgQDSChzItkqWX11CNstJ9zJyUE20I7LrpyBJNgG1gtvz3ZMUQCn3PxxHtQzN9n1P0mSSYs+jBKPuoSyYLt1wwe10/lpgL4rkKWU3/m1Myt0tveJ9WcqHh6tzcAbb/fXpUFT/o4SWDimWkPkuCb+8j//2yiXk0a/T2f36zKMuZvujqQKBgC6B7BAQDG2H2B/ijofp12ejJU36nL98gAZyqOfpLJ+FeMz4TlBDQ+phIMhnHXA5UkdDapQ+zA3SrFk+6yGk9Vw4Hf46B+82SvOrSbmnMa+PYqKYIvUzR4gg34rL/7AhwnbEyD5hXq4dHwMNsIDq+l2elPjwm/U9V0gdAl2+r50HAoGALtsKqMvhv8HucAMBPrLikhXP/8um8mMKFMrzfqZ+otxfHzlhI0L08Bo3jQrb0Z7ByNY6M8epOmbCKADsbWcVre/AAY0ZkuSZK/CaOXNX/AhMKmKJh8qAOPRY02LIJRBCpfS4czEdnfUhYV/TYiFNnKRj57PPYZdTzUsxa/yVTmECgYBr7slQEjb5Onn5mZnGDh+72BxLNdgwBkhO0OCdpdISqk0F0Pxby22DFOKXZEpiyI9XYP1C8wPiJsShGm2yEwBPWXnrrZNWczaVuCbXHrZkWQogBDG3HGXNdU4MAWCyiYlyinIBpPpoAJZSzpGLmWbMWh28+RJS6AQX6KHrK1o2uw==' - -let nodePublisher -let psPublisher - -function bootNode (next) { - PeerId.createFromPrivKey(privateKey, (err, idPublisher) => { - if (err) { - throw err - } - const peerPublisher = new PeerInfo(idPublisher) - peerPublisher.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/12345')) - nodePublisher = new Node(peerPublisher) - nodePublisher.start((err) => { - console.log('Publisher listening on:') - - peerPublisher.multiaddrs.forEach((ma) => { - console.log(ma.toString() + '/ipfs/' + idPublisher.toB58String()) - }) - next(err) - }) - }) -} - -function setUpPS (next) { - console.log('attaching pubsub') - psPublisher = new FloodSub(nodePublisher) - psPublisher.start(next) -} - -function publishMsg (err) { - if (err) { - throw err - } - - setInterval(() => { - process.stdout.write('.') - psPublisher.publish('interop', Buffer.from('hey, how is it going?')) - }, 300) -} - -series([ - bootNode, - setUpPS -], publishMsg) diff --git a/examples/pub-sub-1-topic/subscriber.js b/examples/pub-sub-1-topic/subscriber.js deleted file mode 100644 index ccacec3..0000000 --- a/examples/pub-sub-1-topic/subscriber.js +++ /dev/null @@ -1 +0,0 @@ -'use strict' diff --git a/package.json b/package.json index 85ea9f7..45ba8f3 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,6 @@ "coverage": "aegir coverage", "coverage-publish": "aegir coverage --provider coveralls" }, - "browser": { - "test/utils/nodejs-bundle": "./test/utils/browser-bundle.js" - }, "files": [ "src", "dist" @@ -45,35 +42,29 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-floodsub#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.4.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", - "libp2p": "~0.24.4", - "libp2p-secio": "~0.11.1", - "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.0", - "libp2p-websocket-star": "~0.10.2", - "libp2p-websocket-star-rendezvous": "~0.3.0", - "lodash": "^4.17.11", - "multiaddr": "^6.0.6", - "peer-id": "~0.12.2", - "peer-info": "~0.15.1", - "sinon": "^7.3.2" + "it-pair": "^1.0.0", + "lodash": "^4.17.15", + "multiaddr": "^7.1.0", + "p-defer": "^3.0.0", + "peer-id": "~0.13.3", + "peer-info": "~0.17.0", + "sinon": "^7.5.0" }, "dependencies": { - "async": "^2.6.2", - "bs58": "^4.0.1", + "async.nexttick": "^0.5.2", "debug": "^4.1.1", - "length-prefixed-stream": "^2.0.0", - "libp2p-crypto": "~0.16.1", - "libp2p-pubsub": "~0.2.0", + "it-length-prefixed": "^2.0.0", + "it-pipe": "^1.0.1", + "libp2p-pubsub": "~0.3.0", + "p-map": "^3.0.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.2", - "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.9" + "time-cache": "^0.3.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index e5d5e7f..62f7dbf 100644 --- a/src/index.js +++ b/src/index.js @@ -1,18 +1,23 @@ 'use strict' -const pull = require('pull-stream') -const lp = require('pull-length-prefixed') const assert = require('assert') - +const debug = require('debug') +const debugName = 'libp2p:floodsub' +const log = debug(debugName) +log.error = debug(`${debugName}:error`) + +const pipe = require('it-pipe') +const lp = require('it-length-prefixed') +const pMap = require('p-map') +const TimeCache = require('time-cache') +const nextTick = require('async.nexttick') + +const PeerInfo = require('peer-info') const BaseProtocol = require('libp2p-pubsub') const { message, utils } = require('libp2p-pubsub') -const config = require('./config') +const { multicodec } = require('./config') -const multicodec = config.multicodec const ensureArray = utils.ensureArray -const setImmediate = require('async/setImmediate') -const asyncMap = require('async/map') -const noop = () => {} /** * FloodSub (aka dumbsub is an implementation of pubsub focused on @@ -21,13 +26,30 @@ const noop = () => {} */ class FloodSub extends BaseProtocol { /** - * @param {Object} libp2p an instance of Libp2p + * @param {PeerInfo} peerInfo instance of the peer's PeerInfo + * @param {Object} registrar + * @param {function} registrar.handle + * @param {function} registrar.register + * @param {function} registrar.unregister * @param {Object} [options] * @param {boolean} options.emitSelf if publish should emit to self, if subscribed, defaults to false * @constructor */ - constructor (libp2p, options = {}) { - super('libp2p:floodsub', multicodec, libp2p, options) + constructor (peerInfo, registrar, options = {}) { + assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') + + // registrar handling + assert(registrar && typeof registrar.handle === 'function', 'a handle function must be provided in registrar') + assert(registrar && typeof registrar.register === 'function', 'a register function must be provided in registrar') + assert(registrar && typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') + + super({ + debugName: debugName, + multicodecs: multicodec, + peerInfo: peerInfo, + registrar: registrar, + ...options + }) /** * List of our subscriptions @@ -35,6 +57,13 @@ class FloodSub extends BaseProtocol { */ this.subscriptions = new Set() + /** + * Cache of seen messages + * + * @type {TimeCache} + */ + this.seenCache = new TimeCache() + /** * Pubsub options */ @@ -42,26 +71,26 @@ class FloodSub extends BaseProtocol { emitSelf: false, ...options } + + this._onRpc = this._onRpc.bind(this) } /** - * Dial a received peer. + * Peer connected successfully with pubsub protocol. * @override * @param {PeerInfo} peerInfo peer info * @param {Connection} conn connection to the peer - * @param {function} callback + * @returns {Promise} */ - _onDial (peerInfo, conn, callback) { - super._onDial(peerInfo, conn, (err) => { - if (err) return callback(err) - const idB58Str = peerInfo.id.toB58String() - const peer = this.peers.get(idB58Str) - if (peer && peer.isWritable) { - // Immediately send my own subscriptions to the newly established conn - peer.sendSubscriptions(this.subscriptions) - } - setImmediate(() => callback()) - }) + async _onPeerConnected (peerInfo, conn) { + await super._onPeerConnected(peerInfo, conn) + const idB58Str = peerInfo.id.toB58String() + const peer = this.peers.get(idB58Str) + + if (peer && peer.isWritable) { + // Immediately send my own subscriptions to the newly established conn + peer.sendSubscriptions(this.subscriptions) + } } /** @@ -71,19 +100,26 @@ class FloodSub extends BaseProtocol { * @param {string} idB58Str peer id string in base58 * @param {Connection} conn connection * @param {PeerInfo} peer peer info - * @returns {undefined} + * @returns {void} * */ - _processConnection (idB58Str, conn, peer) { - pull( - conn, - lp.decode(), - pull.map((data) => message.rpc.RPC.decode(data)), - pull.drain( - (rpc) => this._onRpc(idB58Str, rpc), - (err) => this._onConnectionEnd(idB58Str, peer, err) + async _processMessages (idB58Str, conn, peer) { + const onRpcFunc = this._onRpc + try { + await pipe( + conn, + lp.decode(), + async function (source) { + for await (const data of source) { + const rpc = Buffer.isBuffer(data) ? data : data.slice() + + onRpcFunc(idB58Str, message.rpc.RPC.decode(rpc)) + } + } ) - ) + } catch (err) { + this._onPeerDisconnected(peer, err) + } } /** @@ -97,20 +133,19 @@ class FloodSub extends BaseProtocol { return } - this.log('rpc from', idB58Str) + log('rpc from', idB58Str) const subs = rpc.subscriptions const msgs = rpc.msgs if (msgs && msgs.length) { - rpc.msgs.forEach((msg) => this._processRpcMessage(msg)) + msgs.forEach((msg) => this._processRpcMessage(msg)) } - if (subs && subs.length) { - const peer = this.peers.get(idB58Str) - if (peer) { - peer.updateSubscriptions(subs) - this.emit('floodsub:subscription-change', peer.info, peer.topics, subs) - } + const peer = this.peers.get(idB58Str) + + if (peer && subs && subs.length) { + peer.updateSubscriptions(subs) + this.emit('floodsub:subscription-change', peer.info, peer.topics, subs) } } @@ -119,7 +154,7 @@ class FloodSub extends BaseProtocol { * @param {rpc.RPC.Message} message The message to process * @returns {void} */ - _processRpcMessage (message) { + async _processRpcMessage (message) { const msg = utils.normalizeInRpcMessage(message) const seqno = utils.msgId(msg.from, msg.seqno) // 1. check if I've seen the message, if yes, ignore @@ -128,19 +163,27 @@ class FloodSub extends BaseProtocol { } this.seenCache.put(seqno) + // 2. validate the message (signature verification) - this.validate(message, (err, isValid) => { - if (err || !isValid) { - this.log('Message could not be validated, dropping it. isValid=%s', isValid, err) - return - } + let isValid + let error - // 3. if message is valid, emit to self - this._emitMessages(msg.topicIDs, [msg]) + try { + isValid = await this.validate(message) + } catch (err) { + error = err + } - // 4. if message is valid, propagate msg to others - this._forwardMessages(msg.topicIDs, [msg]) - }) + if (error || !isValid) { + log('Message could not be validated, dropping it. isValid=%s', isValid, error) + return + } + + // 3. if message is valid, emit to self + this._emitMessages(msg.topicIDs, [msg]) + + // 4. if message is valid, propagate msg to others + this._forwardMessages(msg.topicIDs, [msg]) } _emitMessages (topics, messages) { @@ -163,23 +206,19 @@ class FloodSub extends BaseProtocol { peer.sendMessages(utils.normalizeOutRpcMessages(messages)) - this.log('publish msgs on topics', topics, peer.info.id.toB58String()) + log('publish msgs on topics', topics, peer.info.id.toB58String()) }) } /** * Unmounts the floodsub protocol and shuts down every connection * @override - * @param {Function} callback - * @returns {undefined} - * + * @returns {Promise} */ - stop (callback) { - super.stop((err) => { - if (err) return callback(err) - this.subscriptions = new Set() - callback() - }) + async stop () { + await super.stop() + + this.subscriptions = new Set() } /** @@ -187,22 +226,19 @@ class FloodSub extends BaseProtocol { * @override * @param {Array|string} topics * @param {Array|any} messages - * @param {function(Error)} callback - * @returns {undefined} - * + * @returns {Promise} */ - publish (topics, messages, callback) { + async publish (topics, messages) { assert(this.started, 'FloodSub is not started') - callback = callback || noop - this.log('publish', topics, messages) + log('publish', topics, messages) topics = ensureArray(topics) messages = ensureArray(messages) - const from = this.libp2p.peerInfo.id.toB58String() + const from = this.peerInfo.id.toB58String() - const buildMessage = (msg, cb) => { + const buildMessage = (msg) => { const seqno = utils.randomSeqno() this.seenCache.put(utils.msgId(from, seqno)) @@ -216,33 +252,29 @@ class FloodSub extends BaseProtocol { // Emit to self if I'm interested and it is enabled this._options.emitSelf && this._emitMessages(topics, [message]) - this._buildMessage(message, cb) + return this._buildMessage(message) } - asyncMap(messages, buildMessage, (err, msgObjects) => { - if (err) return callback(err) + const msgObjects = await pMap(messages, buildMessage) - // send to all the other peers - this._forwardMessages(topics, msgObjects) - - callback(null) - }) + // send to all the other peers + this._forwardMessages(topics, msgObjects) } /** * Subscribe to the given topic(s). * @override * @param {Array|string} topics - * @returns {undefined} + * @returns {void} */ subscribe (topics) { assert(this.started, 'FloodSub is not started') topics = ensureArray(topics) - topics.forEach((topic) => this.subscriptions.add(topic)) this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) + // make sure that FloodSub is already mounted function sendSubscriptionsOnceReady (peer) { if (peer && peer.isWritable) { @@ -261,13 +293,10 @@ class FloodSub extends BaseProtocol { * Unsubscribe from the given topic(s). * @override * @param {Array|string} topics - * @returns {undefined} + * @returns {void} */ unsubscribe (topics) { - // Avoid race conditions, by quietly ignoring unsub when shutdown. - if (!this.started) { - return - } + assert(this.started, 'FloodSub is not started') topics = ensureArray(topics) @@ -279,10 +308,21 @@ class FloodSub extends BaseProtocol { if (peer && peer.isWritable) { peer.sendUnsubscriptions(topics) } else { - setImmediate(checkIfReady.bind(peer)) + nextTick(checkIfReady.bind(peer)) } } } + + /** + * Get the list of topics which the peer is subscribed to. + * @override + * @returns {Array} + */ + getTopics () { + assert(this.started, 'FloodSub is not started') + + return Array.from(this.subscriptions) + } } module.exports = FloodSub diff --git a/test/2-nodes.spec.js b/test/2-nodes.spec.js index da898b8..456d0e7 100644 --- a/test/2-nodes.spec.js +++ b/test/2-nodes.spec.js @@ -6,76 +6,84 @@ const chai = require('chai') chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect -const parallel = require('async/parallel') -const series = require('async/series') + +const pDefer = require('p-defer') const times = require('lodash/times') const FloodSub = require('../src') -const utils = require('./utils') -const first = utils.first -const createNode = utils.createNode -const expectSet = utils.expectSet +const { multicodec } = require('../src') +const { + defOptions, + first, + createPeerInfo, + createMockRegistrar, + expectSet, + ConnectionPair +} = require('./utils') + +function shouldNotHappen (_) { + expect.fail() +} describe('basics between 2 nodes', () => { describe('fresh nodes', () => { - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) { - return done(err) - } - nodeA = nodes[0] - nodeB = nodes[1] - done() + let peerInfoA, peerInfoB + let fsA, fsB + + const registrarRecordA = {} + const registrarRecordB = {} + + // Mount pubsub protocol + before(async () => { + [peerInfoA, peerInfoB] = await Promise.all([ + createPeerInfo(), + createPeerInfo() + ]) + + fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions) + + expect(fsA.peers.size).to.be.eql(0) + expect(fsA.subscriptions.size).to.eql(0) + expect(fsB.peers.size).to.be.eql(0) + expect(fsB.subscriptions.size).to.eql(0) + }) + + // Start pubsub + before(() => Promise.all([ + fsA.start(), + fsB.start() + ])) + + // Connect floodsub nodes + before(async () => { + const onConnectA = registrarRecordA[multicodec].onConnect + const handleB = registrarRecordB[multicodec].handler + + // Notice peers of connection + const [c0, c1] = ConnectionPair() + await onConnectA(peerInfoB, c0) + + await handleB({ + protocol: multicodec, + stream: c1.stream, + remotePeer: peerInfoA.id }) - }) - - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) - }) - - it('Mount the pubsub protocol', (done) => { - fsA = new FloodSub(nodeA, { emitSelf: true }) - fsB = new FloodSub(nodeB, { emitSelf: true }) - setTimeout(() => { - expect(fsA.peers.size).to.be.eql(0) - expect(fsA.subscriptions.size).to.eql(0) - expect(fsB.peers.size).to.be.eql(0) - expect(fsB.subscriptions.size).to.eql(0) - done() - }, 50) + expect(fsA.peers.size).to.be.eql(1) + expect(fsB.peers.size).to.be.eql(1) }) - it('start both FloodSubs', (done) => { - parallel([ - (cb) => fsA.start(cb), - (cb) => fsB.start(cb) - ], done) + after(() => { + return Promise.all([ + fsA.started && fsA.stop(), + fsB.started && fsB.stop() + ]) }) - it('Dial from nodeA to nodeB', (done) => { - series([ - (cb) => nodeA.dial(nodeB.peerInfo, cb), - (cb) => setTimeout(() => { - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) - cb() - }, 1000) - ], done) - }) + it('Subscribe to a topic:Z in nodeA', () => { + const defer = pDefer() - it('Subscribe to a topic:Z in nodeA', (done) => { fsA.subscribe('Z') fsB.once('floodsub:subscription-change', (changedPeerInfo, changedTopics, changedSubs) => { expectSet(fsA.subscriptions, ['Z']) @@ -84,23 +92,31 @@ describe('basics between 2 nodes', () => { expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String()) expectSet(changedTopics, ['Z']) expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: true }]) - done() + defer.resolve() }) + + return defer.promise }) - it('Publish to a topic:Z in nodeA', (done) => { + it('Publish to a topic:Z in nodeA', () => { + const defer = pDefer() + fsA.once('Z', (msg) => { expect(msg.data.toString()).to.equal('hey') fsB.removeListener('Z', shouldNotHappen) - done() + defer.resolve() }) fsB.once('Z', shouldNotHappen) fsA.publish('Z', Buffer.from('hey')) + + return defer.promise }) - it('Publish to a topic:Z in nodeB', (done) => { + it('Publish to a topic:Z in nodeB', () => { + const defer = pDefer() + fsA.once('Z', (msg) => { fsA.once('Z', shouldNotHappen) expect(msg.data.toString()).to.equal('banana') @@ -108,64 +124,74 @@ describe('basics between 2 nodes', () => { setTimeout(() => { fsA.removeListener('Z', shouldNotHappen) fsB.removeListener('Z', shouldNotHappen) - done() + + defer.resolve() }, 100) }) fsB.once('Z', shouldNotHappen) fsB.publish('Z', Buffer.from('banana')) + + return defer.promise }) - it('Publish 10 msg to a topic:Z in nodeB', (done) => { + it('Publish 10 msg to a topic:Z in nodeB', () => { + const defer = pDefer() let counter = 0 fsB.once('Z', shouldNotHappen) - fsA.on('Z', receivedMsg) function receivedMsg (msg) { expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String()) + expect(msg.from).to.be.eql(fsB.peerInfo.id.toB58String()) expect(Buffer.isBuffer(msg.seqno)).to.be.true() expect(msg.topicIDs).to.be.eql(['Z']) if (++counter === 10) { fsA.removeListener('Z', receivedMsg) fsB.removeListener('Z', shouldNotHappen) - done() + + defer.resolve() } } - times(10, () => fsB.publish('Z', Buffer.from('banana'))) + + return defer.promise }) - it('Publish 10 msg to a topic:Z in nodeB as array', (done) => { + it('Publish 10 msg to a topic:Z in nodeB as array', () => { + const defer = pDefer() let counter = 0 fsB.once('Z', shouldNotHappen) - fsA.on('Z', receivedMsg) function receivedMsg (msg) { expect(msg.data.toString()).to.equal('banana') - expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String()) + expect(msg.from).to.be.eql(fsB.peerInfo.id.toB58String()) expect(Buffer.isBuffer(msg.seqno)).to.be.true() expect(msg.topicIDs).to.be.eql(['Z']) if (++counter === 10) { fsA.removeListener('Z', receivedMsg) fsB.removeListener('Z', shouldNotHappen) - done() + + defer.resolve() } } - let msgs = [] + const msgs = [] times(10, () => msgs.push(Buffer.from('banana'))) fsB.publish('Z', msgs) + + return defer.promise }) - it('Unsubscribe from topic:Z in nodeA', (done) => { + it('Unsubscribe from topic:Z in nodeA', () => { + const defer = pDefer() + fsA.unsubscribe('Z') expect(fsA.subscriptions.size).to.equal(0) @@ -175,415 +201,99 @@ describe('basics between 2 nodes', () => { expect(changedPeerInfo.id.toB58String()).to.equal(first(fsB.peers).info.id.toB58String()) expectSet(changedTopics, []) expect(changedSubs).to.be.eql([{ topicID: 'Z', subscribe: false }]) - done() + + defer.resolve() }) + + return defer.promise }) - it('Publish to a topic:Z in nodeA nodeB', (done) => { + it('Publish to a topic:Z in nodeA nodeB', () => { + const defer = pDefer() + fsA.once('Z', shouldNotHappen) fsB.once('Z', shouldNotHappen) setTimeout(() => { fsA.removeListener('Z', shouldNotHappen) fsB.removeListener('Z', shouldNotHappen) - done() + defer.resolve() }, 100) fsB.publish('Z', Buffer.from('banana')) fsA.publish('Z', Buffer.from('banana')) - }) - it('stop both FloodSubs', (done) => { - parallel([ - (cb) => fsA.stop(cb), - (cb) => fsB.stop(cb) - ], done) + return defer.promise }) }) describe('nodes send state on connection', () => { - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - parallel([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - expect(err).to.not.exist() - - nodeA = nodes[0] - nodeB = nodes[1] - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - parallel([ - (cb) => fsA.start(cb), - (cb) => fsB.start(cb) - ], next) - - function next () { - fsA.subscribe('Za') - fsB.subscribe('Zb') - - expect(fsA.peers.size).to.equal(0) - expectSet(fsA.subscriptions, ['Za']) - expect(fsB.peers.size).to.equal(0) - expectSet(fsB.subscriptions, ['Zb']) - done() - } - }) - }) - - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) - }) - - it('existing subscriptions are sent upon peer connection', (done) => { - parallel([ - cb => fsA.once('floodsub:subscription-change', () => cb()), - cb => fsB.once('floodsub:subscription-change', () => cb()) - ], () => { - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) - - expectSet(fsA.subscriptions, ['Za']) - expect(fsB.peers.size).to.equal(1) - expectSet(first(fsB.peers).topics, ['Za']) - - expectSet(fsB.subscriptions, ['Zb']) - expect(fsA.peers.size).to.equal(1) - expectSet(first(fsA.peers).topics, ['Zb']) - - done() - }) - - nodeA.dial(nodeB.peerInfo, (err) => { - expect(err).to.not.exist() - }) - }) - - it('stop both FloodSubs', (done) => { - parallel([ - (cb) => fsA.stop(cb), - (cb) => fsB.stop(cb) - ], done) - }) - }) - - describe('nodes handle connection errors', () => { - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (cb, nodes) => { - nodeA = nodes[0] - nodeB = nodes[1] - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - parallel([ - (cb) => fsA.start(cb), - (cb) => fsB.start(cb) - ], next) - - function next () { - fsA.subscribe('Za') - fsB.subscribe('Zb') - - expect(fsA.peers.size).to.equal(0) - expectSet(fsA.subscriptions, ['Za']) - expect(fsB.peers.size).to.equal(0) - expectSet(fsB.subscriptions, ['Zb']) - done() - } - }) - }) - - // TODO understand why this test is failing - it.skip('peer is removed from the state when connection ends', (done) => { - nodeA.dial(nodeB.peerInfo, (err) => { - expect(err).to.not.exist() - setTimeout(() => { - expect(first(fsA.peers)._references).to.equal(2) - expect(first(fsB.peers)._references).to.equal(2) - - fsA.stop(() => setTimeout(() => { - expect(first(fsB.peers)._references).to.equal(1) - done() - }, 1000)) - }, 1000) - }) - }) - - it('stop one node', (done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) - }) - - it('nodes don\'t have peers in it', (done) => { - setTimeout(() => { - expect(fsA.peers.size).to.equal(0) - expect(fsB.peers.size).to.equal(0) - done() - }, 1000) - }) - }) - - describe('dial the pubsub protocol on mount', () => { - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (cb, nodes) => { - nodeA = nodes[0] - nodeB = nodes[1] - nodeA.dial(nodeB.peerInfo, () => setTimeout(done, 1000)) - }) - }) - - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) - }) + let peerInfoA, peerInfoB + let fsA, fsB - it('dial on floodsub on mount', (done) => { - fsA = new FloodSub(nodeA, { emitSelf: true }) - fsB = new FloodSub(nodeB, { emitSelf: true }) + const registrarRecordA = {} + const registrarRecordB = {} - parallel([ - (cb) => fsA.start(cb), - (cb) => fsB.start(cb) - ], next) - - function next () { - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) - done() - } - }) + // Mount pubsub protocol + before(async () => { + [peerInfoA, peerInfoB] = await Promise.all([ + createPeerInfo(), + createPeerInfo() + ]) - it('stop both FloodSubs', (done) => { - parallel([ - (cb) => fsA.stop(cb), - (cb) => fsB.stop(cb) - ], done) + fsA = new FloodSub(peerInfoA, createMockRegistrar(registrarRecordA), defOptions) + fsB = new FloodSub(peerInfoB, createMockRegistrar(registrarRecordB), defOptions) }) - }) - - describe('prevent concurrent dials', () => { - let sandbox - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - nodeA = nodes[0] - nodeB = nodes[1] + // Start pubsub + before(() => Promise.all([ + fsA.start(), + fsB.start() + ])) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) + // Make subscriptions prior to new nodes + before(() => { + fsA.subscribe('Za') + fsB.subscribe('Zb') - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - fsB.start(done) - }) + expect(fsA.peers.size).to.equal(0) + expectSet(fsA.subscriptions, ['Za']) + expect(fsB.peers.size).to.equal(0) + expectSet(fsB.subscriptions, ['Zb']) }) - after((done) => { - sandbox.restore() - - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + after(() => { + return Promise.all([ + fsA.started && fsA.stop(), + fsB.started && fsB.stop() + ]) }) - it('does not dial twice to same peer', (done) => { - sandbox.on(fsA, ['_onDial']) + it('existing subscriptions are sent upon peer connection', async () => { + const dial = async () => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - fsA.start(startComplete) - - // Simulate a connection coming in from peer B at the same time. This - // causes floodsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - function startComplete () { - // Check that only one dial was made - setTimeout(() => { - expect(fsA._onDial).to.have.been.called.once() - done() - }, 1000) + // Notice peers of connection + const [c0, c1] = ConnectionPair() + await onConnectA(peerInfoB, c0) + await onConnectB(peerInfoA, c1) } - }) - }) - - describe('allow dials even after error', () => { - let sandbox - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - nodeA = nodes[0] - nodeB = nodes[1] + await Promise.all([ + dial(), + new Promise((resolve) => fsA.once('floodsub:subscription-change', resolve)), + new Promise((resolve) => fsB.once('floodsub:subscription-change', resolve)) + ]) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) + expect(fsA.peers.size).to.equal(1) + expect(fsB.peers.size).to.equal(1) - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) + expectSet(fsA.subscriptions, ['Za']) + expectSet(first(fsB.peers).topics, ['Za']) - fsB.start(done) - }) - }) - - after((done) => { - sandbox.restore() - - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) - }) - - it('can dial again after error', (done) => { - let firstTime = true - const dialProtocol = fsA.libp2p.dialProtocol.bind(fsA.libp2p) - sandbox.on(fsA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { - // Return an error for the first dial - if (firstTime) { - firstTime = false - return cb(new Error('dial error')) - } - - // Subsequent dials proceed as normal - dialProtocol(peerInfo, multicodec, cb) - }) - - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - fsA.start(startComplete) - - function startComplete () { - // Simulate a connection coming in from peer B. This causes floodsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - // Check that both dials were made - setTimeout(() => { - expect(fsA.libp2p.dialProtocol).to.have.been.called.twice() - done() - }, 1000) - } - }) - }) - - describe('prevent processing dial after stop', () => { - let sandbox - let nodeA - let nodeB - let fsA - let fsB - - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] - - fsA = new FloodSub(nodeA) - fsB = new FloodSub(nodeB) - - parallel([ - (cb) => fsA.start(cb), - (cb) => fsB.start(cb) - ], done) - }) - }) - - after((done) => { - sandbox.restore() - - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) - }) - - it('does not process dial after stop', (done) => { - sandbox.on(fsA, ['_onDial']) - - // Simulate a connection coming in from peer B at the same time. This - // causes floodsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - // Stop floodsub before the dial can complete - fsA.stop(() => { - // Check that the dial was not processed - setTimeout(() => { - expect(fsA._onDial).to.not.have.been.called() - done() - }, 1000) - }) + expectSet(fsB.subscriptions, ['Zb']) + expectSet(first(fsA.peers).topics, ['Zb']) }) }) }) - -function shouldNotHappen (msg) { - expect.fail() -} diff --git a/test/emit-self.spec.js b/test/emit-self.spec.js index 2907ab6..cb99062 100644 --- a/test/emit-self.spec.js +++ b/test/emit-self.spec.js @@ -1,100 +1,68 @@ /* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ 'use strict' const chai = require('chai') chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect -const series = require('async/series') const FloodSub = require('../src') const { - createNode + createPeerInfo, mockRegistrar } = require('./utils') const shouldNotHappen = (_) => expect.fail() describe('emit self', () => { + let floodsub + let peerInfo const topic = 'Z' describe('enabled', () => { - let nodeA - let fsA - - before((done) => { - createNode((err, node) => { - if (err) { - return done(err) - } - nodeA = node - nodeA.start(done) - }) + before(async () => { + peerInfo = await createPeerInfo() + floodsub = new FloodSub(peerInfo, mockRegistrar, { emitSelf: true }) }) - before((done) => { - fsA = new FloodSub(nodeA, { emitSelf: true }) - fsA.start(done) - }) + before(async () => { + await floodsub.start() - before(() => { - fsA.subscribe(topic) + floodsub.subscribe(topic) }) - after((done) => { - series([ - (cb) => fsA.stop(cb), - (cb) => nodeA.stop(cb) - ], done) - }) + after(() => floodsub.stop()) - it('should emit to self on publish', async () => { - const promise = new Promise((resolve) => fsA.once(topic, resolve)) + it('should emit to self on publish', () => { + const promise = new Promise((resolve) => floodsub.once(topic, resolve)) - fsA.publish(topic, Buffer.from('hey')) + floodsub.publish(topic, Buffer.from('hey')) - await promise + return promise }) }) describe('disabled', () => { - let nodeA - let fsA - - before((done) => { - createNode((err, node) => { - if (err) { - return done(err) - } - nodeA = node - nodeA.start(done) - }) + before(async () => { + peerInfo = await createPeerInfo() + floodsub = new FloodSub(peerInfo, mockRegistrar, { emitSelf: false }) }) - before((done) => { - fsA = new FloodSub(nodeA, { emitSelf: false }) - fsA.start(done) - }) + before(async () => { + await floodsub.start() - before(() => { - fsA.subscribe(topic) + floodsub.subscribe(topic) }) - after((done) => { - series([ - (cb) => fsA.stop(cb), - (cb) => nodeA.stop(cb) - ], done) - }) + after(() => floodsub.stop()) - it('should emit to self on publish', async () => { - fsA.once(topic, (m) => shouldNotHappen) + it('should emit to self on publish', () => { + floodsub.once(topic, (m) => shouldNotHappen) - fsA.publish(topic, Buffer.from('hey')) + floodsub.publish(topic, Buffer.from('hey')) // Wait 1 second to guarantee that self is not noticed - await new Promise((resolve) => setTimeout(() => resolve(), 1000)) + return new Promise((resolve) => setTimeout(() => resolve(), 1000)) }) }) }) diff --git a/test/fixtures/test-peer.json b/test/fixtures/test-peer.json deleted file mode 100644 index 105046a..0000000 --- a/test/fixtures/test-peer.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "id": "Qmex1SSsueWFsUfjdkugJ5zhcnjddAt8TxcnDLUXKD9Sx7", - "privKey": "CAASqAkwggSkAgEAAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAECggEAdBUzV/GaQ0nmoQrWvOnUxmFIho7kCjkh1NwnNVPNc+Msa1r7pcI9wJNPwap8j1w4L/cZuYhOJgcg+o2mWFiuULKZ4F9Ro/M89gZ038457g2/2pPu43c/Xoi/2YcAHXg0Gr+OCe2zCIyITBWKAFqyAzL6DubAxrJW2Ezj1LrZ+EZgMyzbh/go/eEGSJaaGkINeAkY144DqDWWWvzyhKhryipsGkZGEkVy9xJgMEI3ipVvuPez2XAvoyyeuinBBLe+Z2vY5G50XXzbIMhIQGLncHf9MwTv6wt1ilyOSLOXK0BoQbB76J3R3is5dSULXXP9r8VocjLBEkmBuf4FXAKzoQKBgQDNNS4F1XE1gxD8LPkL+aB/hi6eVHVPhr+w0I/9ATikcLGeUfBM2Gd6cZRPFtNVrv1p6ZF1D1UyGDknGbDBSQd9wLUgb0fDoo3jKYMGWq6G+VvaP5rzWQeBV8YV2EhSmUk1i6kiYe2ZE8WyrPie7iwpQIY60e2A8Ly0GKZiBZUcHQKBgQC9YDAVsGnEHFVFkTDpvw5HwEzCgTb2A3NgkGY3rTYZ7L6AFjqCYmUwFB8Fmbyc4kdFWNh8wfmq5Qrvl49NtaeukiqWKUUlB8uPdztB1P0IahA2ks0owStZlRifmwfgYyMd4xE17lhaOgQQJZZPxmP0F6mdOvb3YJafNURCdMS51wKBgEvvIM+h0tmFXXSjQ6kNvzlRMtD92ccKysYn9xAdMpOO6/r0wSH+dhQWEVZO0PcE4NsfReb2PIVj90ojtIdhebcr5xpQc1LORQjJJKXmSmzBux6AqNrhl+hhzXfp56FA/Zkly/lgGWaqrV5XqUxOP+Mn8EO1yNgMvRc7g94DyNB1AoGBAKLBuXHalXwDsdHBUB2Eo3xNLGt6bEcRfia+0+sEBdxQGQWylQScFkU09dh1YaIf44sZKa5HdBFJGpYCVxo9hmjFnK5Dt/Z0daHOonIY4INLzLVqg8KECoLKXkhGEIXsDjFQhukn+G1LMVTDSSU055DQiWjlVX4UWD9qo0jOXIkvAoGBAMP50p2X6PsWWZUuuR7i1JOJHRyQZPWdHh9p8SSLnCtEpHYZfJr4INXNmhnSiB/3TUnHix2vVKjosjMTCk/CjfzXV2H41WPOLZ2/Pi3SxCicWIRj4kCcWhkEuIF2jGkg1+jmNiCl/zNMaBOAIP3QbDPtqOWbYlPd2YIzdj6WQ6R4", - "pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAE=" -} \ No newline at end of file diff --git a/test/multiple-nodes.spec.js b/test/multiple-nodes.spec.js index f47b590..522c875 100644 --- a/test/multiple-nodes.spec.js +++ b/test/multiple-nodes.spec.js @@ -5,15 +5,25 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const parallel = require('async/parallel') -const isNode = require('detect-node') +const pDefer = require('p-defer') const FloodSub = require('../src') -const utils = require('./utils') -const first = utils.first -const createNode = utils.createNode -const expectSet = utils.expectSet +const { multicodec } = require('../src') +const { + createPeerInfo, + createMockRegistrar, + first, + expectSet, + ConnectionPair +} = require('./utils') + +async function spawnPubSubNode (peerInfo, reg) { + const ps = new FloodSub(peerInfo, reg, { emitSelf: true }) + + await ps.start() + return ps +} describe('multiple nodes (more than 2)', () => { describe('every peer subscribes to the topic', () => { @@ -21,110 +31,117 @@ describe('multiple nodes (more than 2)', () => { // line // ◉────◉────◉ // a b c - let a - let b - let c - - before((done) => { - parallel([ - (cb) => spawnPubSubNode(cb), - (cb) => spawnPubSubNode(cb), - (cb) => spawnPubSubNode(cb) - ], (err, nodes) => { - if (err) { - return done(err) - } - a = nodes[0] - b = nodes[1] - c = nodes[2] - - done() - }) + let psA, psB, psC + let peerInfoA, peerInfoB, peerInfoC + + const registrarRecordA = {} + const registrarRecordB = {} + const registrarRecordC = {} + + before(async () => { + [peerInfoA, peerInfoB, peerInfoC] = await Promise.all([ + createPeerInfo(), + createPeerInfo(), + createPeerInfo() + ]); + + [psA, psB, psC] = await Promise.all([ + spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)) + ]) }) - after((done) => { - // note: setTimeout to avoid the tests finishing - // before swarm does its dials - setTimeout(() => { - parallel([ - (cb) => a.libp2p.stop(cb), - (cb) => b.libp2p.stop(cb), - (cb) => c.libp2p.stop(cb) - ], done) - }, 1000) - }) + // connect nodes + before(async () => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + const onConnectC = registrarRecordC[multicodec].onConnect - it('establish the connections', (done) => { - parallel([ - (cb) => a.libp2p.dial(b.libp2p.peerInfo, cb), - (cb) => b.libp2p.dial(c.libp2p.peerInfo, cb) - ], (err) => { - expect(err).to.not.exist() - // wait for the pubsub pipes to be established - setTimeout(done, 1000) - }) + // Notice peers of connection + const [d0, d1] = ConnectionPair() + await onConnectA(peerInfoB, d0) + await onConnectB(peerInfoA, d1) + + const [d2, d3] = ConnectionPair() + await onConnectB(peerInfoC, d2) + await onConnectC(peerInfoB, d3) }) - it('subscribe to the topic on node a', (done) => { - a.ps.subscribe('Z') - expectSet(a.ps.subscriptions, ['Z']) + after(() => Promise.all([ + psA.stop(), + psB.stop(), + psC.stop() + ])) - b.ps.once('floodsub:subscription-change', () => { - expect(b.ps.peers.size).to.equal(2) - const aPeerId = a.libp2p.peerInfo.id.toB58String() - const topics = b.ps.peers.get(aPeerId).topics + it('subscribe to the topic on node a', () => { + const defer = pDefer() + + psA.subscribe('Z') + expectSet(psA.subscriptions, ['Z']) + + psB.once('floodsub:subscription-change', () => { + expect(psB.peers.size).to.equal(2) + const aPeerId = psA.peerInfo.id.toB58String() + const topics = psB.peers.get(aPeerId).topics expectSet(topics, ['Z']) - expect(c.ps.peers.size).to.equal(1) - expectSet(first(c.ps.peers).topics, []) + expect(psC.peers.size).to.equal(1) + expectSet(first(psC.peers).topics, []) - done() + defer.resolve() }) + + return defer.promise }) - it('subscribe to the topic on node b', (done) => { - b.ps.subscribe('Z') - expectSet(b.ps.subscriptions, ['Z']) + it('subscribe to the topic on node b', async () => { + psB.subscribe('Z') + expectSet(psB.subscriptions, ['Z']) - parallel([ - cb => a.ps.once('floodsub:subscription-change', () => cb()), - cb => c.ps.once('floodsub:subscription-change', () => cb()) - ], () => { - expect(a.ps.peers.size).to.equal(1) - expectSet(first(a.ps.peers).topics, ['Z']) + await Promise.all([ + new Promise((resolve) => psA.once('floodsub:subscription-change', resolve)), + new Promise((resolve) => psC.once('floodsub:subscription-change', resolve)) + ]) - expect(c.ps.peers.size).to.equal(1) - expectSet(first(c.ps.peers).topics, ['Z']) + expect(psA.peers.size).to.equal(1) + expectSet(first(psA.peers).topics, ['Z']) - done() - }) + expect(psC.peers.size).to.equal(1) + expectSet(first(psC.peers).topics, ['Z']) }) - it('subscribe to the topic on node c', (done) => { - c.ps.subscribe('Z') - expectSet(c.ps.subscriptions, ['Z']) + it('subscribe to the topic on node c', () => { + const defer = pDefer() + + psC.subscribe('Z') + expectSet(psC.subscriptions, ['Z']) - b.ps.once('floodsub:subscription-change', () => { - expect(a.ps.peers.size).to.equal(1) - expectSet(first(a.ps.peers).topics, ['Z']) + psB.once('floodsub:subscription-change', () => { + expect(psA.peers.size).to.equal(1) + expectSet(first(psA.peers).topics, ['Z']) - expect(b.ps.peers.size).to.equal(2) - b.ps.peers.forEach((peer) => { + expect(psB.peers.size).to.equal(2) + psB.peers.forEach((peer) => { expectSet(peer.topics, ['Z']) }) - done() + defer.resolve() }) + + return defer.promise }) - it('publish on node a', (done) => { + it('publish on node a', () => { + const defer = pDefer() + let counter = 0 - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) - a.ps.publish('Z', Buffer.from('hey')) + psA.publish('Z', Buffer.from('hey')) function incMsg (msg) { expect(msg.data.toString()).to.equal('hey') @@ -133,22 +150,25 @@ describe('multiple nodes (more than 2)', () => { function check () { if (++counter === 3) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - done() + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + defer.resolve() } } + + return defer.promise }) - it('publish array on node a', (done) => { + it('publish array on node a', () => { + const defer = pDefer() let counter = 0 - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) - a.ps.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) + psA.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) function incMsg (msg) { expect(msg.data.toString()).to.equal('hey') @@ -157,12 +177,14 @@ describe('multiple nodes (more than 2)', () => { function check () { if (++counter === 6) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - done() + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + defer.resolve() } } + + return defer.promise }) // since the topology is the same, just the publish @@ -174,14 +196,15 @@ describe('multiple nodes (more than 2)', () => { // ◉─┘ └─◉ // a c - it('publish on node b', (done) => { + it('publish on node b', () => { + const defer = pDefer() let counter = 0 - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) - b.ps.publish('Z', Buffer.from('hey')) + psB.publish('Z', Buffer.from('hey')) function incMsg (msg) { expect(msg.data.toString()).to.equal('hey') @@ -190,125 +213,132 @@ describe('multiple nodes (more than 2)', () => { function check () { if (++counter === 3) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - done() + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + defer.resolve() } } + + return defer.promise }) }) }) - if (isNode) { - // TODO enable for browser - describe('2 level tree', () => { - // 2 levels tree - // ┌◉┐ - // │c│ - // ┌◉─┘ └─◉┐ - // │b d│ - // ◉─┘ └─◉ - // a e - - let a - let b - let c - let d - let e - - before((done) => { - parallel([ - (cb) => spawnPubSubNode(cb), - (cb) => spawnPubSubNode(cb), - (cb) => spawnPubSubNode(cb), - (cb) => spawnPubSubNode(cb), - (cb) => spawnPubSubNode(cb) - ], (err, nodes) => { - if (err) { - return done(err) - } - a = nodes[0] - b = nodes[1] - c = nodes[2] - d = nodes[3] - e = nodes[4] - - done() - }) - }) + describe('2 level tree', () => { + // 2 levels tree + // ┌◉┐ + // │c│ + // ┌◉─┘ └─◉┐ + // │b d│ + // ◉─┘ └─◉ + // a + let psA, psB, psC, psD, psE + let peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE + + const registrarRecordA = {} + const registrarRecordB = {} + const registrarRecordC = {} + const registrarRecordD = {} + const registrarRecordE = {} + + before(async () => { + [peerInfoA, peerInfoB, peerInfoC, peerInfoD, peerInfoE] = await Promise.all([ + createPeerInfo(), + createPeerInfo(), + createPeerInfo(), + createPeerInfo(), + createPeerInfo() + ]); + + [psA, psB, psC, psD, psE] = await Promise.all([ + spawnPubSubNode(peerInfoA, createMockRegistrar(registrarRecordA)), + spawnPubSubNode(peerInfoB, createMockRegistrar(registrarRecordB)), + spawnPubSubNode(peerInfoC, createMockRegistrar(registrarRecordC)), + spawnPubSubNode(peerInfoD, createMockRegistrar(registrarRecordD)), + spawnPubSubNode(peerInfoE, createMockRegistrar(registrarRecordE)) + ]) + }) - after((done) => { - // note: setTimeout to avoid the tests finishing - // before swarm does its dials - setTimeout(() => { - parallel([ - (cb) => a.libp2p.stop(cb), - (cb) => b.libp2p.stop(cb), - (cb) => c.libp2p.stop(cb), - (cb) => d.libp2p.stop(cb), - (cb) => e.libp2p.stop(cb) - ], done) - }, 1000) - }) + // connect nodes + before(async () => { + const onConnectA = registrarRecordA[multicodec].onConnect + const onConnectB = registrarRecordB[multicodec].onConnect + const onConnectC = registrarRecordC[multicodec].onConnect + const onConnectD = registrarRecordD[multicodec].onConnect + const onConnectE = registrarRecordE[multicodec].onConnect + + // Notice peers of connection + const [d0, d1] = ConnectionPair() // A <-> B + await onConnectA(peerInfoB, d0) + await onConnectB(peerInfoA, d1) + + const [d2, d3] = ConnectionPair() // B <-> C + await onConnectB(peerInfoC, d2) + await onConnectC(peerInfoB, d3) + + const [d4, d5] = ConnectionPair() // C <-> D + await onConnectC(peerInfoD, d4) + await onConnectD(peerInfoC, d5) + + const [d6, d7] = ConnectionPair() // C <-> D + await onConnectD(peerInfoE, d6) + await onConnectE(peerInfoD, d7) + }) - it('establish the connections', function (done) { - this.timeout(30 * 1000) - parallel([ - (cb) => a.libp2p.dial(b.libp2p.peerInfo, cb), - (cb) => b.libp2p.dial(c.libp2p.peerInfo, cb), - (cb) => c.libp2p.dial(d.libp2p.peerInfo, cb), - (cb) => d.libp2p.dial(e.libp2p.peerInfo, cb) - ], (err) => { - expect(err).to.not.exist() - // wait for the pubsub pipes to be established - setTimeout(done, 10000) - }) - }) + after(() => Promise.all([ + psA.stop(), + psB.stop(), + psC.stop(), + psD.stop(), + psE.stop() + ])) + + it('subscribes', () => { + psA.subscribe('Z') + expectSet(psA.subscriptions, ['Z']) + psB.subscribe('Z') + expectSet(psB.subscriptions, ['Z']) + psC.subscribe('Z') + expectSet(psC.subscriptions, ['Z']) + psD.subscribe('Z') + expectSet(psD.subscriptions, ['Z']) + psE.subscribe('Z') + expectSet(psE.subscriptions, ['Z']) + }) - it('subscribes', () => { - a.ps.subscribe('Z') - expectSet(a.ps.subscriptions, ['Z']) - b.ps.subscribe('Z') - expectSet(b.ps.subscriptions, ['Z']) - c.ps.subscribe('Z') - expectSet(c.ps.subscriptions, ['Z']) - d.ps.subscribe('Z') - expectSet(d.ps.subscriptions, ['Z']) - e.ps.subscribe('Z') - expectSet(e.ps.subscriptions, ['Z']) - }) + it('publishes from c', function () { + this.timeout(30 * 1000) + const defer = pDefer() + let counter = 0 - it('publishes from c', function (done) { - this.timeout(30 * 1000) - let counter = 0 + psA.on('Z', incMsg) + psB.on('Z', incMsg) + psC.on('Z', incMsg) + psD.on('Z', incMsg) + psE.on('Z', incMsg) - a.ps.on('Z', incMsg) - b.ps.on('Z', incMsg) - c.ps.on('Z', incMsg) - d.ps.on('Z', incMsg) - e.ps.on('Z', incMsg) + psC.publish('Z', Buffer.from('hey from c')) - c.ps.publish('Z', Buffer.from('hey from c')) + function incMsg (msg) { + expect(msg.data.toString()).to.equal('hey from c') + check() + } - function incMsg (msg) { - expect(msg.data.toString()).to.equal('hey from c') - check() + function check () { + if (++counter === 5) { + psA.removeListener('Z', incMsg) + psB.removeListener('Z', incMsg) + psC.removeListener('Z', incMsg) + psD.removeListener('Z', incMsg) + psE.removeListener('Z', incMsg) + defer.resolve() } + } - function check () { - if (++counter === 5) { - a.ps.removeListener('Z', incMsg) - b.ps.removeListener('Z', incMsg) - c.ps.removeListener('Z', incMsg) - d.ps.removeListener('Z', incMsg) - e.ps.removeListener('Z', incMsg) - done() - } - } - }) + return defer.promise }) - } + }) }) describe('only some nodes subscribe the networks', () => { @@ -317,8 +347,8 @@ describe('multiple nodes (more than 2)', () => { // ◉────◎────◉ // a b c - before((done) => {}) - after((done) => {}) + before(() => { }) + after(() => { }) }) describe('1 level tree', () => { @@ -328,8 +358,8 @@ describe('multiple nodes (more than 2)', () => { // ◎─┘ └─◉ // a c - before((done) => {}) - after((done) => {}) + before(() => { }) + after(() => { }) }) describe('2 level tree', () => { @@ -341,26 +371,8 @@ describe('multiple nodes (more than 2)', () => { // ◉─┘ └─◎ // a e - before((done) => {}) - after((done) => {}) + before(() => { }) + after(() => { }) }) }) }) - -function spawnPubSubNode (callback) { - createNode((err, node) => { - if (err) { - return callback(err) - } - const ps = new FloodSub(node, { emitSelf: true }) - ps.start((err) => { - if (err) { - return callback(err) - } - callback(null, { - libp2p: node, - ps: ps - }) - }) - }) -} diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index ce4734e..3c10460 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,92 +6,83 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect const sinon = require('sinon') -const nextTick = require('async/nextTick') const Floodsub = require('../src') -const { createNode } = require('./utils') +const { createPeerInfo, mockRegistrar } = require('./utils') const { utils } = require('libp2p-pubsub') +const defOptions = { + emitSelf: true +} + describe('pubsub', () => { let floodsub - let libp2p + let peerInfo - before((done) => { + before(async () => { expect(Floodsub.multicodec).to.exist() - createNode((err, node) => { - expect(err).to.not.exist() - libp2p = node - floodsub = new Floodsub(libp2p, { emitSelf: true }) - done(err) - }) + peerInfo = await createPeerInfo() + floodsub = new Floodsub(peerInfo, mockRegistrar, defOptions) }) - beforeEach(done => { - floodsub.start(done) + beforeEach(() => { + return floodsub.start() }) - afterEach(done => { + afterEach(() => { sinon.restore() - floodsub.stop(done) + return floodsub.stop() }) describe('publish', () => { - it('should emit non normalized messages', (done) => { + it('should emit non normalized messages', async () => { sinon.spy(floodsub, '_emitMessages') sinon.spy(utils, 'randomSeqno') const topic = 'my-topic' const message = Buffer.from('a neat message') - floodsub.publish(topic, message, (err) => { - expect(err).to.not.exist() - expect(floodsub._emitMessages.callCount).to.eql(1) - - const [topics, messages] = floodsub._emitMessages.getCall(0).args - expect(topics).to.eql([topic]) - expect(messages).to.eql([{ - from: libp2p.peerInfo.id.toB58String(), - data: message, - seqno: utils.randomSeqno.getCall(0).returnValue, - topicIDs: topics - }]) - done() - }) + await floodsub.publish(topic, message) + expect(floodsub._emitMessages.callCount).to.eql(1) + + const [topics, messages] = floodsub._emitMessages.getCall(0).args + expect(topics).to.eql([topic]) + expect(messages).to.eql([{ + from: peerInfo.id.toB58String(), + data: message, + seqno: utils.randomSeqno.getCall(0).returnValue, + topicIDs: topics + }]) }) - it('should forward normalized messages', (done) => { + it('should forward normalized messages', async () => { sinon.spy(floodsub, '_forwardMessages') sinon.spy(utils, 'randomSeqno') const topic = 'my-topic' const message = Buffer.from('a neat message') - floodsub.publish(topic, message, (err) => { - expect(err).to.not.exist() - expect(floodsub._forwardMessages.callCount).to.eql(1) - const [topics, messages] = floodsub._forwardMessages.getCall(0).args - - floodsub._buildMessage({ - from: libp2p.peerInfo.id.toB58String(), - data: message, - seqno: utils.randomSeqno.getCall(0).returnValue, - topicIDs: topics - }, (err, expected) => { - expect(err).to.not.exist() - - expect(topics).to.eql([topic]) - expect(messages).to.eql([ - expected - ]) - done() - }) + await floodsub.publish(topic, message) + expect(floodsub._forwardMessages.callCount).to.eql(1) + const [topics, messages] = floodsub._forwardMessages.getCall(0).args + + const expected = await floodsub._buildMessage({ + from: peerInfo.id.toB58String(), + data: message, + seqno: utils.randomSeqno.getCall(0).returnValue, + topicIDs: topics }) + + expect(topics).to.eql([topic]) + expect(messages).to.eql([ + expected + ]) }) }) describe('validate', () => { - it('should drop unsigned messages', (done) => { + it('should drop unsigned messages', () => { sinon.spy(floodsub, '_emitMessages') sinon.spy(floodsub, '_forwardMessages') sinon.spy(floodsub, 'validate') @@ -100,7 +91,7 @@ describe('pubsub', () => { const rpc = { subscriptions: [], msgs: [{ - from: libp2p.peerInfo.id.id, + from: peerInfo.id.id, data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), topicIDs: [topic] @@ -109,16 +100,18 @@ describe('pubsub', () => { floodsub._onRpc('QmAnotherPeer', rpc) - nextTick(() => { - expect(floodsub.validate.callCount).to.eql(1) - expect(floodsub._emitMessages.called).to.eql(false) - expect(floodsub._forwardMessages.called).to.eql(false) + return new Promise((resolve) => { + setTimeout(() => { + expect(floodsub.validate.callCount).to.eql(1) + expect(floodsub._emitMessages.called).to.eql(false) + expect(floodsub._forwardMessages.called).to.eql(false) - done() + resolve() + }, 50) }) }) - it('should not drop unsigned messages if strict signing is disabled', (done) => { + it('should not drop unsigned messages if strict signing is disabled', () => { sinon.spy(floodsub, '_emitMessages') sinon.spy(floodsub, '_forwardMessages') sinon.spy(floodsub, 'validate') @@ -128,7 +121,7 @@ describe('pubsub', () => { const rpc = { subscriptions: [], msgs: [{ - from: libp2p.peerInfo.id.id, + from: peerInfo.id.id, data: Buffer.from('an unsigned message'), seqno: utils.randomSeqno(), topicIDs: [topic] @@ -137,12 +130,14 @@ describe('pubsub', () => { floodsub._onRpc('QmAnotherPeer', rpc) - nextTick(() => { - expect(floodsub.validate.callCount).to.eql(1) - expect(floodsub._emitMessages.called).to.eql(true) - expect(floodsub._forwardMessages.called).to.eql(true) + return new Promise((resolve) => { + setTimeout(() => { + expect(floodsub.validate.callCount).to.eql(1) + expect(floodsub._emitMessages.called).to.eql(true) + expect(floodsub._forwardMessages.called).to.eql(true) - done() + resolve() + }, 50) }) }) }) diff --git a/test/utils/browser-bundle.js b/test/utils/browser-bundle.js deleted file mode 100644 index 117e0a0..0000000 --- a/test/utils/browser-bundle.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const WebSocketStar = require('libp2p-websocket-star') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -const { WS_STAR_MULTIADDR } = require('./constants') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const starOpts = { id: peerInfo.id } - const wsStar = new WebSocketStar(starOpts) - - peerInfo.multiaddrs.add(WS_STAR_MULTIADDR) - - const modules = { - transport: [wsStar], - streamMuxer: [spdy], - connEncryption: [secio] - } - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node diff --git a/test/utils/constants.js b/test/utils/constants.js deleted file mode 100644 index a836a03..0000000 --- a/test/utils/constants.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict' - -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const nextTick = require('async/nextTick') -const peerJSON = require('../fixtures/test-peer') -const multiaddr = require('multiaddr') - -let peerRelay = null - -/** - * Creates a `PeerInfo` that can be used across testing. Once the - * relay `PeerInfo` has been requested, it will be reused for each - * additional request. - * - * This is currently being used to create a relay on test bootstrapping - * so that it can be used by browser nodes during their test suite. This - * is necessary for running a TCP node during browser tests. - * @private - * @param {function(error, PeerInfo)} callback - * @returns {void} - */ -module.exports.getPeerRelay = (callback) => { - if (peerRelay) return nextTick(callback, null, peerRelay) - - PeerId.createFromJSON(peerJSON, (err, peerId) => { - if (err) { - return callback(err) - } - peerRelay = new PeerInfo(peerId) - - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') - - callback(null, peerRelay) - }) -} - -module.exports.WS_STAR_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/ws/p2p-websocket-star/') -module.exports.WS_RENDEZVOUS_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/wss') diff --git a/test/utils/index.js b/test/utils/index.js index 5b61800..f864e63 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -2,10 +2,9 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const Node = require('./nodejs-bundle') +const DuplexPair = require('it-pair/duplex') -const waterfall = require('async/waterfall') -const expect = require('chai').expect +const { expect } = require('chai') exports.first = (map) => map.values().next().value @@ -13,13 +12,58 @@ exports.expectSet = (set, subs) => { expect(Array.from(set.values())).to.eql(subs) } -exports.createNode = (callback) => { - waterfall([ - (cb) => PeerId.create({ bits: 1024 }, cb), - (id, cb) => PeerInfo.create(id, cb), - (peerInfo, cb) => { - cb(null, new Node({ peerInfo })) +exports.createPeerInfo = async () => { + const peerId = await PeerId.create({ bits: 1024 }) + + return PeerInfo.create(peerId) +} + +exports.mockRegistrar = { + handle: () => {}, + register: () => {}, + unregister: () => {} +} + +exports.createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } + }, + register: ({ multicodecs, _onConnect, _onDisconnect }) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + onConnect: _onConnect, + onDisconnect: _onDisconnect + } + + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] + } +}) + +exports.ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) }, - (node, cb) => node.start((err) => cb(err, node)) - ], callback) + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] +} + +exports.defOptions = { + emitSelf: true } diff --git a/test/utils/nodejs-bundle.js b/test/utils/nodejs-bundle.js deleted file mode 100644 index ef572d1..0000000 --- a/test/utils/nodejs-bundle.js +++ /dev/null @@ -1,26 +0,0 @@ -'use strict' - -const TCP = require('libp2p-tcp') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const modules = { - transport: [TCP], - streamMuxer: [spdy], - connEncryption: [secio] - } - - peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/0') - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node