From b8556eccfa51c4d7110090f40fd6c934621aa8cf Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Fri, 24 Jan 2020 18:12:53 +0000 Subject: [PATCH] fix: update to async iterators (#73) * chore: update to latest js-IPFS rc * refactor: updates to latest async iterator version of ipfs and libp2p Needs `ipfs.id` to return a `CID` before tests will pass. * chore: only build master branch and prs * refactor: ipfs will return peer ids as strings instead * refactor: remove ipfs and just use libp2p * fix: fix linting --- .aegir.js | 37 ++++++++ .travis.yml | 44 ++++++++++ README.md | 77 +++++++++-------- circle.yml | 15 ---- package.json | 22 ++--- src/connection.js | 138 +++++++++++++++++++----------- src/direct-connection-handler.js | 39 ++++----- src/index.js | 123 +++++++++----------------- src/libp2p.js | 5 -- src/peer-id.js | 8 -- test/concurrent-rooms.spec.js | 117 ++++++++++--------------- test/room.spec.js | 112 ++++++++++-------------- test/same-node.spec.js | 46 +++------- test/utils/clean.js | 15 ---- test/utils/create-libp2p.js | 68 +++++++++++++++ test/utils/create-repo-browser.js | 27 ------ test/utils/create-repo-node.js | 34 -------- test/utils/testCommon.js | 79 ----------------- 18 files changed, 449 insertions(+), 557 deletions(-) create mode 100644 .aegir.js create mode 100644 .travis.yml delete mode 100644 circle.yml delete mode 100644 src/libp2p.js delete mode 100644 src/peer-id.js delete mode 100644 test/utils/clean.js create mode 100644 test/utils/create-libp2p.js delete mode 100644 test/utils/create-repo-browser.js delete mode 100644 test/utils/create-repo-node.js delete mode 100644 test/utils/testCommon.js diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 0000000..70e1748 --- /dev/null +++ b/.aegir.js @@ -0,0 +1,37 @@ +'use strict' + +const Libp2p = require('libp2p') +const PeerInfo = require('peer-info') +const { config } = require('./test/utils/create-libp2p') + +let relay + +module.exports = { + hooks: { + pre: async () => { + const peerInfo = await PeerInfo.create() + peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/24642/ws') + + const defaultConfig = await config() + + relay = new Libp2p({ + ...defaultConfig, + peerInfo, + config: { + ...defaultConfig.config, + relay: { + enabled: true, + hop: { + enabled: true + } + } + } + }) + + await relay.start() + }, + post: async () => { + await relay.stop() + } + } +} diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..5c40af9 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,44 @@ +language: node_js +branches: + only: + - master + - /^release\/.*$/ +cache: npm +stages: + - check + - test + - cov + +node_js: + - '12' + +os: + - linux + - osx + - windows + +script: npx nyc -s npm run test:node -- --bail +after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov + +jobs: + include: + - stage: check + script: + - npx aegir commitlint --travis + - npx aegir dep-check + - npm run lint + + - stage: test + name: chrome + addons: + chrome: stable + script: npx aegir test -t browser -t webworker + + - stage: test + name: firefox + addons: + firefox: latest + script: npx aegir test -t browser -t webworker -- --browsers FirefoxHeadless + +notifications: + email: false diff --git a/README.md b/README.md index 17f07c0..fa3d41c 100644 --- a/README.md +++ b/README.md @@ -3,16 +3,12 @@ [![made by Protocol Labs](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai) [![Freenode](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) -[![Build Status](https://travis-ci.org/ipfs-shipyard/ipfs-pubsub-room.svg?branch=master)](https://travis-ci.org/ipfs-shipyard/ipfs-pubsub-room) +[![Build Status](https://travis-ci.com/ipfs-shipyard/ipfs-pubsub-room.svg?branch=master)](https://travis-ci.com/ipfs-shipyard/ipfs-pubsub-room) -> Creates a room based on an IPFS pub-sub channel. Emits membership events, listens for messages, broadcast and direct messeges to peers. +> Creates a room based on a LibP2P pub-sub channel. Emits membership events, listens for messages, broadcast and direct messeges to peers. ([Demo video](https://t.co/HNYQGE4D4P)) -## js-ipfs - -This package has been tested with js-ipfs version __0.32.0__. - ## Install ```bash @@ -21,59 +17,64 @@ $ npm install ipfs-pubsub-room ## Use +Creating a pubsub room from a LibP2P node + +```js +const Room = require('ipfs-pubsub-room') +const Libp2p = require('libp2p') + +const libp2p = new Libp2p({ ... }) +await libp2p.start() + +// libp2p node is ready, so we can start using ipfs-pubsub-room +const room = Room(libp2p, 'room-name') +``` + +Creating a pubsub room from an IPFS node + ```js const Room = require('ipfs-pubsub-room') const IPFS = require('ipfs') -const ipfs = new IPFS({ - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -}) -// IPFS node is ready, so we can start using ipfs-pubsub-room -ipfs.on('ready', () => { - const room = Room(ipfs, 'room-name') +const ipfs = await IPFS.create({ ... }) +const room = Room(ipfs.libp2p, 'room-name') +``` + +Once we have a room we can listen for messages - room.on('peer joined', (peer) => { - console.log('Peer joined the room', peer) - }) +```js +room.on('peer joined', (peer) => { + console.log('Peer joined the room', peer) +}) - room.on('peer left', (peer) => { - console.log('Peer left...', peer) - }) +room.on('peer left', (peer) => { + console.log('Peer left...', peer) +}) - // now started to listen to room - room.on('subscribed', () => { - console.log('Now connected!') - }) +// now started to listen to room +room.on('subscribed', () => { + console.log('Now connected!') }) ``` ## API -### Room (ipfs:IPFS, roomName:string, options:object) +### Room (libp2p:LibP2P, roomName:string, options:object) -* `ipfs`: IPFS object. Must have pubsub activated +* `libp2p`: LibP2P node. Must have pubsub activated * `roomName`: string, global identifier for the room * `options`: object: * `pollInterval`: interval for polling the pubsub peers, in ms. Defaults to 1000. ```js -const room = Room(ipfs, 'some-room-name') +const room = Room(libp2p, 'some-room-name') ``` ## room.broadcast(message) Broacasts message (string or buffer). -## room.sendTo(peer, message) +## room.sendTo(cid, message) Sends message (string or buffer) to peer. @@ -85,7 +86,7 @@ Leaves room, stopping everything. Returns an array of peer identifiers (strings). -## room.hasPeer(peer) +## room.hasPeer(cid) Returns a boolean indicating if the given peer is present in the room. @@ -96,11 +97,11 @@ Listens for messages. A `message` is an object containing the following properti * `from` (string): peer id * `data` (Buffer): message content -## room.on('peer joined', (peer) => {}) +## room.on('peer joined', (cid) => {}) Once a peer has joined the room. -## room.on('peer left', (peer) => {}) +## room.on('peer left', (cid) => {}) Once a peer has left the room. diff --git a/circle.yml b/circle.yml deleted file mode 100644 index 0009693..0000000 --- a/circle.yml +++ /dev/null @@ -1,15 +0,0 @@ -# Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -machine: - node: - version: stable - -dependencies: - pre: - - google-chrome --version - - curl -L -o google-chrome.deb https://dl.google.com/linux/direct/google-chrome-stable_current_amd64.deb - - sudo dpkg -i google-chrome.deb || true - - sudo apt-get update - - sudo apt-get install -f - - sudo apt-get install --only-upgrade lsb-base - - sudo dpkg -i google-chrome.deb - - google-chrome --version diff --git a/package.json b/package.json index 71745af..f7595c6 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "lint": "aegir lint", "test": "aegir test", "test:node": "aegir test -t node", - "test:browser": "aegir test - browser" + "test:browser": "aegir test -t browser" }, "repository": { "type": "git", @@ -28,18 +28,20 @@ "homepage": "https://github.com/ipfs-shipyard/ipfs-pubsub-room#readme", "dependencies": { "hyperdiff": "^2.0.5", - "lodash.clonedeep": "^4.5.0", - "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.9" + "it-pipe": "^1.1.0", + "lodash.clonedeep": "^4.5.0" }, "devDependencies": { - "async": "^2.6.1", + "aegir": "^20.5.1", "chai": "^4.2.0", + "delay": "^4.3.0", "dirty-chai": "^2.0.1", - "aegir": "^18.0.3", - "ipfs": "~0.34.4" - }, - "browser": { - "./test/utils/create-repo-node.js": "./test/utils/create-repo-browser.js" + "libp2p": "0.27.0-rc.0", + "libp2p-gossipsub": "0.2.1", + "libp2p-mplex": "^0.9.3", + "libp2p-secio": "^0.12.2", + "libp2p-websockets": "^0.13.2", + "peer-info": "^0.17.1", + "rimraf": "^3.0.0" } } diff --git a/src/connection.js b/src/connection.js index 62bb3d6..e20ec42 100644 --- a/src/connection.js +++ b/src/connection.js @@ -1,19 +1,16 @@ 'use strict' const EventEmitter = require('events') -const pull = require('pull-stream') -const Pushable = require('pull-pushable') +const pipe = require('it-pipe') const PROTOCOL = require('./protocol') const encoding = require('./encoding') -const getPeerId = require('./peer-id') -const libp2p = require('./libp2p') module.exports = class Connection extends EventEmitter { - constructor (id, ipfs, room) { + constructor (remoteId, libp2p, room) { super() - this._id = id - this._ipfs = ipfs + this._remoteId = remoteId + this._libp2p = libp2p this._room = room this._connection = null this._connecting = false @@ -22,11 +19,16 @@ module.exports = class Connection extends EventEmitter { push (message) { if (this._connection) { this._connection.push(encoding(message)) - } else { - this.once('connect', () => this.push(message)) - if (!this._connecting) { - this._getConnection() - } + + return + } + + this.once('connect', () => { + this.push(message) + }) + + if (!this._connecting) { + this._connect() } } @@ -36,52 +38,92 @@ module.exports = class Connection extends EventEmitter { } } - _getConnection () { + async _connect () { this._connecting = true - this._getPeerAddresses(this._id, (err, peerAddresses) => { - if (err) { - this.emit('error', err) - return // early - } - if (!peerAddresses.length) { + if (!this._isConnectedToRemote()) { + this.emit('disconnect') + this._connecting = false + return // early + } + + const peerInfo = this._libp2p.peerStore.get(this._remoteId) + const { stream } = await this._libp2p.dialProtocol(peerInfo, PROTOCOL) + this._connection = new FiFoMessageQueue() + + pipe(this._connection, stream, async (source) => { + this._connecting = false + this.emit('connect', this._connection) + + for await (const message of source) { + this.emit('message', message) + } + }) + .then(() => { this.emit('disconnect') - return // early + }, (err) => { + this.emit('error', err) + }) + } + + _isConnectedToRemote () { + for (const peerId of this._libp2p.connections.keys()) { + if (peerId === this._remoteId) { + return true } + } + } +} + +class FiFoMessageQueue { + constructor () { + this._queue = [] + } + + [Symbol.asyncIterator] () { + return this + } - libp2p(this._ipfs).dialProtocol(peerAddresses[0], PROTOCOL, (err, conn) => { - if (err) { - this.emit('disconnect') - return // early - } - this._connecting = false - const pushable = Pushable() - this._connection = pushable - pull( - pushable, - conn, - pull.onEnd(() => { - delete this._connection - this.emit('disconnect') - }) - ) - this.emit('connect', pushable) + push (message) { + if (this._ended) { + throw new Error('Message queue ended') + } + + if (this._resolve) { + return this._resolve({ + done: false, + value: message }) - }) + } + + this._queue.push(message) + } + + end () { + this._ended = true + if (this._resolve) { + this._resolve({ + done: true + }) + } } - _getPeerAddresses (peerId, callback) { - this._ipfs.swarm.peers((err, peersAddresses) => { - if (err) { - callback(err) - return // early + next () { + if (this._ended) { + return { + done: true } + } + + if (this._queue.length) { + return { + done: false, + value: this._queue.shift() + } + } - callback( - null, - peersAddresses - .filter((peerAddress) => getPeerId(peerAddress.peer) === peerId) - .map(peerAddress => peerAddress.peer)) + return new Promise((resolve) => { + this._resolve = resolve }) } } diff --git a/src/direct-connection-handler.js b/src/direct-connection-handler.js index e6633fd..1e14b00 100644 --- a/src/direct-connection-handler.js +++ b/src/direct-connection-handler.js @@ -1,39 +1,35 @@ 'use strict' -const pull = require('pull-stream') const EventEmitter = require('events') +const pipe = require('it-pipe') const emitter = new EventEmitter() -function handler (protocol, conn) { - conn.getPeerInfo((err, peerInfo) => { - if (err) { - console.log(err) - return - } - - const peerId = peerInfo.id.toB58String() +function handler ({ connection, stream }) { + const peerId = connection.remotePeer.toB58String() - pull( - conn, - pull.map((message) => { + pipe( + stream, + async function (source) { + for await (const message of source) { let msg + try { msg = JSON.parse(message.toString()) } catch (err) { emitter.emit('warning', err.message) - return // early + continue // early } - if (peerId !== msg.from) { + if (peerId !== msg.from.toString()) { emitter.emit('warning', 'no peerid match ' + msg.from) - return // early + continue // early } const topicIDs = msg.topicIDs if (!Array.isArray(topicIDs)) { emitter.emit('warning', 'no topic IDs') - return // early + continue // early } msg.data = Buffer.from(msg.data, 'hex') @@ -42,14 +38,9 @@ function handler (protocol, conn) { topicIDs.forEach((topic) => { emitter.emit(topic, msg) }) - - return msg - }), - pull.onEnd(() => { - // do nothing - }) - ) - }) + } + } + ) } exports = module.exports = { diff --git a/src/index.js b/src/index.js index 4ff9559..bf31114 100644 --- a/src/index.js +++ b/src/index.js @@ -2,45 +2,46 @@ const diff = require('hyperdiff') const EventEmitter = require('events') -const timers = require('timers') const clone = require('lodash.clonedeep') const PROTOCOL = require('./protocol') const Connection = require('./connection') const encoding = require('./encoding') const directConnection = require('./direct-connection-handler') -const libp2p = require('./libp2p') const DEFAULT_OPTIONS = { pollInterval: 1000 } -module.exports = (ipfs, topic, options) => { - return new PubSubRoom(ipfs, topic, options) -} +let index = 0 class PubSubRoom extends EventEmitter { - constructor (ipfs, topic, options) { + constructor (libp2p, topic, options) { super() - this._ipfs = ipfs + this._libp2p = libp2p this._topic = topic this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options)) this._peers = [] this._connections = {} this._handleDirectMessage = this._handleDirectMessage.bind(this) + this._handleMessage = this._onMessage.bind(this) - if (!this._ipfs.pubsub) { - throw new Error('This IPFS node does not have pubsub.') + if (!this._libp2p.pubsub) { + throw new Error('pubsub has not been configured') } - if (this._ipfs.isOnline()) { - this._start() - } else { - this._ipfs.on('ready', this._start.bind(this)) - } + this._interval = setInterval( + this._pollPeers.bind(this), + this._options.pollInterval + ) + + this._libp2p.handle(PROTOCOL, directConnection.handler) + directConnection.emitter.on(this._topic, this._handleDirectMessage) + + this._libp2p.pubsub.subscribe(this._topic, this._handleMessage) - this._ipfs.on('stop', this.leave.bind(this)) + this._idx = index++ } getPeers () { @@ -48,41 +49,35 @@ class PubSubRoom extends EventEmitter { } hasPeer (peer) { - return this._peers.indexOf(peer) >= 0 + return Boolean(this._peers.find(p => p.toString() === peer.toString())) } - leave () { - return new Promise((resolve, reject) => { - timers.clearInterval(this._interval) - Object.keys(this._connections).forEach((peer) => { - this._connections[peer].stop() - }) - directConnection.emitter.removeListener(this._topic, this._handleDirectMessage) - this.once('stopped', () => resolve()) - this.emit('stopping') + async leave () { + clearInterval(this._interval) + Object.keys(this._connections).forEach((peer) => { + this._connections[peer].stop() }) + directConnection.emitter.removeListener(this._topic, this._handleDirectMessage) + this._libp2p.unhandle(PROTOCOL, directConnection.handler) + await this._libp2p.pubsub.unsubscribe(this._topic, this._handleMessage) } - broadcast (_message) { - let message = encoding(_message) + async broadcast (_message) { + const message = encoding(_message) - this._ipfs.pubsub.publish(this._topic, message, (err) => { - if (err) { - this.emit('error', err) - } - }) + await this._libp2p.pubsub.publish(this._topic, message) } sendTo (peer, message) { let conn = this._connections[peer] if (!conn) { - conn = new Connection(peer, this._ipfs, this) + conn = new Connection(peer, this._libp2p, this) conn.on('error', (err) => this.emit('error', err)) this._connections[peer] = conn conn.once('disconnect', () => { delete this._connections[peer] - this._peers = this._peers.filter((p) => p !== peer) + this._peers = this._peers.filter((p) => p.toString() !== peer.toString()) this.emit('peer left', peer) }) } @@ -97,65 +92,29 @@ class PubSubRoom extends EventEmitter { const msg = { to: peer, - from: this._ipfs._peerInfo.id.toB58String(), + from: this._libp2p.peerInfo.id.toB58String(), data: Buffer.from(message).toString('hex'), seqno: seqno.toString('hex'), - topicIDs: [ this._topic ], - topicCIDs: [ this._topic ] + topicIDs: [this._topic], + topicCIDs: [this._topic] } conn.push(Buffer.from(JSON.stringify(msg))) } - _start () { - this._interval = timers.setInterval( - this._pollPeers.bind(this), - this._options.pollInterval) - - const listener = this._onMessage.bind(this) - this._ipfs.pubsub.subscribe(this._topic, listener, {}, (err) => { - if (err) { - this.emit('error', err) - } else { - this.emit('subscribed', this._topic) - } - }) - - this.once('stopping', () => { - this._ipfs.pubsub.unsubscribe(this._topic, listener, (err) => { - if (err) { - this.emit('error', err) - } else { - this.emit('stopped') - } - }) - }) - - libp2p(this._ipfs).handle(PROTOCOL, directConnection.handler) - - directConnection.emitter.on(this._topic, this._handleDirectMessage) - } - - _pollPeers () { - this._ipfs.pubsub.peers(this._topic, (err, _newPeers) => { - if (err) { - this.emit('error', err) - return // early - } - - const newPeers = _newPeers.sort() + async _pollPeers () { + const newPeers = (await this._libp2p.pubsub.getSubscribers(this._topic)).sort() - if (this._emitChanges(newPeers)) { - this._peers = newPeers - } - }) + if (this._emitChanges(newPeers)) { + this._peers = newPeers + } } _emitChanges (newPeers) { const differences = diff(this._peers, newPeers) - differences.added.forEach((addedPeer) => this.emit('peer joined', addedPeer)) - differences.removed.forEach((removedPeer) => this.emit('peer left', removedPeer)) + differences.added.forEach((peer) => this.emit('peer joined', peer)) + differences.removed.forEach((peer) => this.emit('peer left', peer)) return differences.added.length > 0 || differences.removed.length > 0 } @@ -165,10 +124,12 @@ class PubSubRoom extends EventEmitter { } _handleDirectMessage (message) { - if (message.to === this._ipfs._peerInfo.id.toB58String()) { + if (message.to.toString() === this._libp2p.peerInfo.id.toB58String()) { const m = Object.assign({}, message) delete m.to this.emit('message', m) } } } + +module.exports = PubSubRoom diff --git a/src/libp2p.js b/src/libp2p.js deleted file mode 100644 index 2391985..0000000 --- a/src/libp2p.js +++ /dev/null @@ -1,5 +0,0 @@ -'use strict' - -module.exports = (ipfs) => { - return ipfs._libp2pNode || ipfs.libp2p -} diff --git a/src/peer-id.js b/src/peer-id.js deleted file mode 100644 index 9b368be..0000000 --- a/src/peer-id.js +++ /dev/null @@ -1,8 +0,0 @@ -'use strict' - -module.exports = (peer) => { - if (peer.id && (typeof peer.id.toB58String === 'function')) { - peer = peer.id - } - return peer.toB58String() -} diff --git a/test/concurrent-rooms.spec.js b/test/concurrent-rooms.spec.js index 0fc5da7..69c151b 100644 --- a/test/concurrent-rooms.spec.js +++ b/test/concurrent-rooms.spec.js @@ -6,80 +6,52 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const IPFS = require('ipfs') -const each = require('async/each') -const clone = require('lodash.clonedeep') +const delay = require('delay') -const Room = require('../') -const createRepo = require('./utils/create-repo-node') +const PubSubRoom = require('../') +const createLibp2p = require('./utils/create-libp2p') const topic = 'pubsub-room-concurrency-test-' + Date.now() + '-' + Math.random() -const ipfsOptions = { - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -} - describe('concurrent rooms', function () { this.timeout(30000) - const repos = [] let node1, node2 let id1, id2 let room1A, room1B, room2A, room2B const topicA = topic + '-A' const topicB = topic + '-B' - before((done) => { - const repo = createRepo() - repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node1 = new IPFS(options) - node1.once('ready', () => { - node1.id((err, info) => { - expect(err).to.not.exist() - id1 = info.id - done() - }) - }) + before(async () => { + node1 = await createLibp2p() + id1 = node1.peerInfo.id.toB58String() }) - before((done) => { - const repo = createRepo() - repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node2 = new IPFS(options) - node2.once('ready', () => { - node2.id((err, info) => { - expect(err).to.not.exist() - id2 = info.id - done() - }) - }) + before(async () => { + node2 = await createLibp2p(node1) + id2 = node2.peerInfo.id.toB58String() + }) + + after(() => { + return Promise.all([ + room1A.leave(), + room1B.leave(), + room2A.leave(), + room2B.leave() + ]) }) - after((done) => each(repos, (repo, cb) => { repo.teardown(cb) }, done)) + after(() => { + return Promise.all([ + node1.stop(), + node2.stop() + ]) + }) - it('can create a room, and they find each other', (done) => { - room1A = Room(node1, topicA) - room2A = Room(node2, topicA) - room1B = Room(node1, topicB) - room2B = Room(node2, topicB) - room1A.on('warning', console.log) - room2A.on('warning', console.log) - room1B.on('warning', console.log) - room2B.on('warning', console.log) + it('can create a room, and they find each other', async () => { + room1A = new PubSubRoom(node1, topicA) + room2A = new PubSubRoom(node2, topicA) + room1B = new PubSubRoom(node1, topicB) + room2B = new PubSubRoom(node2, topicB) const roomNodes = [ [room1A, id2], @@ -88,14 +60,19 @@ describe('concurrent rooms', function () { [room2A, id1] ] - each(roomNodes, (roomNode, cb) => { - const room = roomNode[0] - const waitingFor = roomNode[1] - room.once('peer joined', (id) => { - expect(id).to.equal(waitingFor) - cb() + await Promise.all( + roomNodes.map(async (roomNode) => { + const room = roomNode[0] + const waitingFor = roomNode[1] + + await new Promise((resolve) => { + room.once('peer joined', (peer) => { + expect(peer).to.equal(waitingFor) + resolve() + }) + }) }) - }, done) + ) }) it('has peer', (done) => { @@ -115,7 +92,7 @@ describe('concurrent rooms', function () { throw new Error('double message') } gotMessage = true - expect(message.from).to.equal(id2) + expect(message.from.toString()).to.equal(id2.toString()) expect(message.data.toString()).to.equal('message 1') room1B.removeListener('message', crash) @@ -129,7 +106,7 @@ describe('concurrent rooms', function () { room2B.on('message', crash) room2A.once('message', (message) => { - expect(message.from).to.equal(id1) + expect(message.from.toString()).to.equal(id1.toString()) expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString()) expect(message.topicIDs).to.deep.equal([topicA]) expect(message.topicCIDs).to.deep.equal([topicA]) @@ -142,17 +119,17 @@ describe('concurrent rooms', function () { it('can leave room', (done) => { room1A.once('peer left', (peer) => { - expect(peer).to.equal(id2) + expect(peer.toString()).to.equal(id2.toString()) done() }) room2A.leave() }) - it('after leaving, it does not receive more messages', (done) => { + it('after leaving, it does not receive more messages', async () => { room2A.on('message', Crash('should not receive this')) - room2A.leave() + await room2A.leave() room1A.broadcast('message 3') - setTimeout(done, 3000) + await delay(3000) }) }) diff --git a/test/room.spec.js b/test/room.spec.js index 84496eb..600650e 100644 --- a/test/room.spec.js +++ b/test/room.spec.js @@ -6,87 +6,54 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const IPFS = require('ipfs') -const each = require('async/each') -const clone = require('lodash.clonedeep') - -const Room = require('../') -const createRepo = require('./utils/create-repo-node') +const PubSubRoom = require('../') +const createLibp2p = require('./utils/create-libp2p') const topicBase = 'pubsub-room-test-' + Date.now() + '-' + Math.random() -const ipfsOptions = { - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -} - describe('room', function () { this.timeout(30000) - const repos = [] let node1, node2 let id1, id2 - before((done) => { - const repo = createRepo() - repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node1 = new IPFS(options) - node1.once('ready', () => { - node1.id((err, info) => { - expect(err).to.not.exist() - id1 = info.id - done() - }) - }) + before(async () => { + node1 = await createLibp2p() + id1 = node1.peerInfo.id.toB58String() }) - before((done) => { - const repo = createRepo() - repos.push(repo) - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node2 = new IPFS(options) - node2.once('ready', () => { - node2.id((err, info) => { - expect(err).to.not.exist() - id2 = info.id - done() - }) - }) + before(async () => { + node2 = await createLibp2p(node1) + id2 = node2.peerInfo.id.toB58String() }) - after((done) => each(repos, (repo, cb) => { repo.teardown(cb) }, done)) + const rooms = [] ;([1, 2].forEach((n) => { const topic = topicBase + '-' + n - let room1, room2 + + after('after topic ' + n, () => { + return Promise.all([ + rooms[n].a.leave(), + rooms[n].b.leave() + ]) + }) + describe('topic ' + n, () => { it('can create a room, and they find each other', (done) => { - room1 = Room(node1, topic) - room2 = Room(node2, topic) - room1.on('warning', console.log) - room2.on('warning', console.log) + rooms[n] = { + a: new PubSubRoom(node1, topic), + b: new PubSubRoom(node2, topic) + } let left = 2 - room1.once('peer joined', (id) => { - expect(id).to.equal(id2) + rooms[n].a.once('peer joined', (id) => { + expect(id).to.deep.equal(id2) if (--left === 0) { done() } }) - room2.once('peer joined', (id) => { - expect(id).to.equal(id1) + rooms[n].b.once('peer joined', (id) => { + expect(id).to.deep.equal(id1) if (--left === 0) { done() } @@ -94,50 +61,57 @@ describe('room', function () { }) it('has peer', (done) => { - expect(room1.getPeers()).to.deep.equal([id2]) - expect(room2.getPeers()).to.deep.equal([id1]) + expect(rooms[n].a.getPeers()).to.deep.equal([id2]) + expect(rooms[n].b.getPeers()).to.deep.equal([id1]) done() }) it('can broadcast', (done) => { let gotMessage = false - room1.on('message', (message) => { + rooms[n].a.on('message', (message) => { if (gotMessage) { throw new Error('double message:' + message.data.toString()) } gotMessage = true - expect(message.from).to.equal(id2) + expect(message.from).to.deep.equal(id2) expect(message.data.toString()).to.equal('message 1') done() }) - room2.broadcast('message 1') + rooms[n].b.broadcast('message 1') }) it('can send private message', (done) => { let gotMessage = false - room2.on('message', (message) => { + rooms[n].b.on('message', (message) => { if (gotMessage) { throw new Error('double message') } gotMessage = true - expect(message.from).to.equal(id1) + expect(message.from).to.deep.equal(id1) expect(message.seqno.toString()).to.equal(Buffer.from([0]).toString()) expect(message.topicIDs).to.deep.equal([topic]) expect(message.topicCIDs).to.deep.equal([topic]) expect(message.data.toString()).to.equal('message 2') done() }) - room1.sendTo(id2, 'message 2') + rooms[n].a.sendTo(id2, 'message 2') }) it('can leave room', (done) => { - room1.once('peer left', (peer) => { - expect(peer).to.equal(id2) + rooms[n].a.once('peer left', (peer) => { + expect(peer).to.deep.equal(id2) done() }) - room2.leave() + rooms[n].b.leave() }) }) })) + + after(() => { + return Promise.all([ + node1.stop(), + node2.stop() + ]) + }) }) diff --git a/test/same-node.spec.js b/test/same-node.spec.js index 16ef7cb..e51aba2 100644 --- a/test/same-node.spec.js +++ b/test/same-node.spec.js @@ -6,55 +6,33 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const IPFS = require('ipfs') -const clone = require('lodash.clonedeep') - -const Room = require('../') -const createRepo = require('./utils/create-repo-node') +const PubSubRoom = require('../') +const createLibp2p = require('./utils/create-libp2p') const topic = 'pubsub-same-node-test-' + Date.now() + '-' + Math.random() -const ipfsOptions = { - EXPERIMENTAL: { - pubsub: true - }, - config: { - Addresses: { - Swarm: [ - '/dns4/ws-star.discovery.libp2p.io/tcp/443/wss/p2p-websocket-star' - ] - } - } -} - describe('same node', function () { this.timeout(30000) - let repo let node - let rooms = [] + const rooms = [] - before((done) => { - repo = createRepo() - const options = Object.assign({}, clone(ipfsOptions), { - repo: repo - }) - node = new IPFS(options) - node.once('ready', () => { - done() - }) + before(async () => { + node = await createLibp2p() }) before(() => { for (let i = 0; i < 2; i++) { - rooms.push(Room(node, topic)) + rooms.push(new PubSubRoom(node, topic)) } }) - after(() => rooms.forEach((room) => room.leave())) - - after((done) => node.stop(done)) + after(() => { + return Promise.all( + rooms.map(room => room.leave()) + ) + }) - after((done) => repo.teardown(done)) + after(() => node.stop()) it('mirrors broadcast', (done) => { rooms[0].once('message', (message) => { diff --git a/test/utils/clean.js b/test/utils/clean.js deleted file mode 100644 index 13752b5..0000000 --- a/test/utils/clean.js +++ /dev/null @@ -1,15 +0,0 @@ -'use strict' - -const rimraf = require('rimraf') -const fs = require('fs') - -module.exports = (dir) => { - try { - fs.accessSync(dir) - } catch (err) { - // Does not exist so all good - return - } - - rimraf.sync(dir) -} diff --git a/test/utils/create-libp2p.js b/test/utils/create-libp2p.js new file mode 100644 index 0000000..2322164 --- /dev/null +++ b/test/utils/create-libp2p.js @@ -0,0 +1,68 @@ +'use strict' + +const Libp2p = require('libp2p') +const WS = require('libp2p-websockets') +const Multiplex = require('libp2p-mplex') +const SECIO = require('libp2p-secio') +const GossipSub = require('libp2p-gossipsub') +const PeerInfo = require('peer-info') + +const RELAY_MULTIADDR = '/ip4/127.0.0.1/tcp/24642/ws' + +const config = async () => { + return { + peerInfo: await PeerInfo.create(), + dialer: { + maxParallelDials: 150, // 150 total parallel multiaddr dials + maxDialsPerPeer: 4, // Allow 4 multiaddrs to be dialed per peer in parallel + dialTimeout: 10e3 // 10 second dial timeout per peer dial + }, + modules: { + transport: [ + WS + ], + streamMuxer: [ + Multiplex + ], + connEncryption: [ + SECIO + ], + pubsub: GossipSub + }, + config: { + peerDiscovery: { + autoDial: false, + bootstrap: { + enabled: false + } + }, + pubsub: { + enabled: true, + emitSelf: true + } + } + } +} + +module.exports = async (otherNode) => { + const node = new Libp2p(await config()) + + await node.start() + + // connect to relay peer + await node.dial(RELAY_MULTIADDR) + + // both nodes created, get them to dial each other via the relay + if (otherNode) { + const relayId = node.connections.keys().next().value + const otherNodeId = otherNode.peerInfo.id.toB58String() + const nodeId = node.peerInfo.id.toB58String() + + await node.dial(`${RELAY_MULTIADDR}/p2p/${relayId}/p2p-circuit/p2p/${otherNodeId}`) + await otherNode.dial(`${RELAY_MULTIADDR}/p2p/${relayId}/p2p-circuit/p2p/${nodeId}`) + } + + return node +} + +module.exports.config = config diff --git a/test/utils/create-repo-browser.js b/test/utils/create-repo-browser.js deleted file mode 100644 index e72fec1..0000000 --- a/test/utils/create-repo-browser.js +++ /dev/null @@ -1,27 +0,0 @@ -/* global self */ -'use strict' - -const IPFSRepo = require('ipfs-repo') - -const idb = self.indexedDB || - self.mozIndexedDB || - self.webkitIndexedDB || - self.msIndexedDB - -function createTempRepo (repoPath) { - repoPath = repoPath || '/tmp/ipfs-test-' + Math.random().toString().substring(2, 8) - - const repo = new IPFSRepo(repoPath) - - repo.teardown = (done) => { - repo.close(() => { - idb.deleteDatabase(repoPath) - idb.deleteDatabase(repoPath + '/blocks') - done() - }) - } - - return repo -} - -module.exports = createTempRepo diff --git a/test/utils/create-repo-node.js b/test/utils/create-repo-node.js deleted file mode 100644 index 118a0a1..0000000 --- a/test/utils/create-repo-node.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict' - -const IPFSRepo = require('ipfs-repo') -const clean = require('./clean') -const series = require('async/series') - -function createTempRepo () { - const repoPath = '/tmp/ipfs-test-' + Math.random().toString().substring(2, 8) - let destroyed = false - - const repo = new IPFSRepo(repoPath) - - repo.teardown = (done) => { - if (destroyed) { - return - } - destroyed = true - - series([ - // ignore err, might have been closed already - (cb) => { - repo.close(() => cb()) - }, - (cb) => { - clean(repoPath) - cb() - } - ], done) - } - - return repo -} - -module.exports = createTempRepo diff --git a/test/utils/testCommon.js b/test/utils/testCommon.js deleted file mode 100644 index 98c72dc..0000000 --- a/test/utils/testCommon.js +++ /dev/null @@ -1,79 +0,0 @@ -'use strict' - -const path = require('path') -const fs = !process.browser && require('fs') -const rimraf = !process.browser && require('rimraf') - -let dbidx = 0 - -function location () { - return path.join(__dirname, '_leveldown_test_db_' + dbidx++) -} - -function lastLocation () { - return path.join(__dirname, '_leveldown_test_db_' + dbidx) -} - -function cleanup (callback) { - if (process.browser) { return callback() } - - fs.readdir(__dirname, function (err, list) { - if (err) return callback(err) - - list = list.filter(function (f) { - return (/^_leveldown_test_db_/).test(f) - }) - - if (!list.length) { return callback() } - - let ret = 0 - - list.forEach(function (f) { - rimraf(path.join(__dirname, f), function (err) { - if (err) { - callback(err) - return // early - } - if (++ret === list.length) { - callback() - } - }) - }) - }) -} - -function setUp (t) { - cleanup(function (err) { - t.error(err, 'cleanup returned an error') - t.end() - }) -} - -function tearDown (t) { - setUp(t) // same cleanup! -} - -function collectEntries (iterator, callback) { - const data = [] - const next = function () { - iterator.next(function (err, key, value) { - if (err) return callback(err) - if (!arguments.length) { - callback(err, data) - } else { - data.push({ key: key, value: String(value) }) - setTimeout(next, 0) - } - }) - } - next() -} - -module.exports = { - location: location, - cleanup: cleanup, - lastLocation: lastLocation, - setUp: setUp, - tearDown: tearDown, - collectEntries: collectEntries -}