From 9df6594f6def48b2c5871617479725c4092e591b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 30 Aug 2019 12:21:25 +0200 Subject: [PATCH 1/6] refactor: async BREAKING CHANGE: Switch to using async/await and async iterators. --- .aegir.js | 78 +++++----- README.md | 9 +- package.json | 24 ++-- src/index.js | 122 +++++++--------- src/message/sign.js | 81 ++++++----- src/peer.js | 16 +-- src/utils.js | 2 +- test/pubsub.spec.js | 309 ++++++++++++++++++---------------------- test/sign.spec.js | 97 +++++++------ test/utils.spec.js | 8 +- test/utils/constants.js | 26 ++-- test/utils/index.js | 26 ++-- 12 files changed, 370 insertions(+), 428 deletions(-) diff --git a/.aegir.js b/.aegir.js index 3a7b02d..6b18a9c 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,7 +1,6 @@ 'use strict' const pull = require('pull-stream') -const parallel = require('async/parallel') const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous') const Node = require('./test/utils/nodejs-bundle.js') @@ -13,54 +12,47 @@ const { let wsRendezvous let node -const before = (done) => { - parallel([ - (cb) => { - WebSocketStarRendezvous.start({ - port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, - refreshPeerListIntervalMS: 1000, - strictMultiaddr: false, - cryptoChallenge: true - }, (err, _server) => { - if (err) { - return cb(err) - } - wsRendezvous = _server - cb() - }) - }, - (cb) => { - getPeerRelay((err, peerInfo) => { - if (err) { - return done(err) - } - - node = new Node({ - peerInfo, - config: { - relay: { +const before = async () => { + [wsRendezvous, node] = await Promise.all([ + WebSocketStarRendezvous.start({ + port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, + refreshPeerListIntervalMS: 1000, + strictMultiaddr: false, + cryptoChallenge: true + }), + new Promise(async (resolve) => { + const peerInfo = await getPeerRelay() + const n = new Node({ + peerInfo, + config: { + relay: { + enabled: true, + hop: { enabled: true, - hop: { - enabled: true, - active: true - } + active: true } } - }) - - node.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) - node.start(cb) + } }) - } - ], done) + + n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) + await n.start() + + resolve(n) + }) + ]) } -const after = (done) => { - setTimeout(() => - parallel( - [node, wsRendezvous].map((s) => (cb) => s.stop(cb)), - done), - 2000) +const after = () => { + return new Promise((resolve) => { + setTimeout(async () => { + await Promise.all([ + node.stop(), + wsRendezvous.stop() + ]) + resolve() + }, 2000) + }) } module.exports = { diff --git a/README.md b/README.md index 170bd37..047d0e3 100644 --- a/README.md +++ b/README.md @@ -72,14 +72,19 @@ class PubsubImplementation extends Pubsub { Validates the signature of a message. -#### `pubsub.validate(message, callback)` +#### `pubsub.validate(message)` ##### Parameters | Name | Type | Description | |------|------|-------------| | message | `Message` | a pubsub message | -| callback | `function(Error, Boolean)` | calls back with true if the message is valid | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves to true if the message is valid | ## Implementations using this base protocol diff --git a/package.json b/package.json index a4df62a..5b1a5d3 100644 --- a/package.json +++ b/package.json @@ -45,34 +45,34 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-pubsub#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.3.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "dirty-chai": "^2.0.1", - "libp2p": "~0.24.4", + "libp2p": "~0.26.2", "libp2p-secio": "~0.11.1", "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.0", + "libp2p-tcp": "~0.13.1", "libp2p-websocket-star": "~0.10.2", - "libp2p-websocket-star-rendezvous": "~0.3.0", - "lodash": "^4.17.11", - "multiaddr": "^6.0.6", - "peer-id": "~0.12.5", + "libp2p-websocket-star-rendezvous": "~0.4.1", + "lodash": "^4.17.15", + "multiaddr": "^6.1.0", + "peer-id": "~0.12.2", "peer-info": "~0.15.1" }, "dependencies": { "async": "^2.6.2", "bs58": "^4.0.1", "debug": "^4.1.1", - "err-code": "^1.1.2", + "err-code": "^2.0.0", "length-prefixed-stream": "^2.0.0", - "libp2p-crypto": "~0.16.1", + "libp2p-crypto": "~0.17.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.1", + "pull-length-prefixed": "^1.3.3", "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.9", - "sinon": "^7.3.2", + "pull-stream": "^3.6.14", + "sinon": "^7.5.0", "time-cache": "~0.3.0" }, "contributors": [ diff --git a/src/index.js b/src/index.js index f5b1300..753f9d1 100644 --- a/src/index.js +++ b/src/index.js @@ -3,7 +3,6 @@ const EventEmitter = require('events') const pull = require('pull-stream/pull') const empty = require('pull-stream/sources/empty') -const asyncEach = require('async/each') const TimeCache = require('time-cache') const debug = require('debug') const errcode = require('err-code') @@ -16,8 +15,6 @@ const { } = require('./message/sign') const utils = require('./utils') -const nextTick = require('async/nextTick') - /** * PubsubBaseProtocol handles the peers and connections logic for pubsub routers */ @@ -136,47 +133,49 @@ class PubsubBaseProtocol extends EventEmitter { * Dial a received peer. * @private * @param {PeerInfo} peerInfo peer info - * @param {function} callback - * @returns {void} + * @returns {Promise} */ - _dialPeer (peerInfo, callback) { - callback = callback || function noop () { } + _dialPeer (peerInfo) { const idB58Str = peerInfo.id.toB58String() // If already have a PubSub conn, ignore const peer = this.peers.get(idB58Str) if (peer && peer.isConnected) { - return nextTick(() => callback()) + return Promise.resolve() } // If already dialing this peer, ignore if (this._dials.has(idB58Str)) { this.log('already dialing %s, ignoring dial attempt', idB58Str) - return nextTick(() => callback()) + return Promise.resolve() } this._dials.add(idB58Str) this.log('dialing %s', idB58Str) - this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { - this.log('dial to %s complete', idB58Str) - // If the dial is not in the set, it means that pubsub has been - // stopped - const pubsubStopped = !this._dials.has(idB58Str) - this._dials.delete(idB58Str) + return new Promise((resolve) => { + this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { + this.log('dial to %s complete', idB58Str) - if (err) { - this.log.err(err) - return callback() - } + // If the dial is not in the set, it means that pubsub has been + // stopped + const pubsubStopped = !this._dials.has(idB58Str) + this._dials.delete(idB58Str) - // pubsub has been stopped, so we should just bail out - if (pubsubStopped) { - this.log('pubsub was stopped, not processing dial to %s', idB58Str) - return callback() - } + if (err) { + this.log.err(err) + return resolve() + } + + // pubsub has been stopped, so we should just bail out + if (pubsubStopped) { + this.log('pubsub was stopped, not processing dial to %s', idB58Str) + return resolve() + } - this._onDial(peerInfo, conn, callback) + this._onDial(peerInfo, conn) + resolve() + }) }) } @@ -185,16 +184,13 @@ class PubsubBaseProtocol extends EventEmitter { * @private * @param {PeerInfo} peerInfo peer info * @param {Connection} conn connection to the peer - * @param {function} callback */ - _onDial (peerInfo, conn, callback) { + _onDial (peerInfo, conn) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) const peer = this._addPeer(new Peer(peerInfo)) peer.attachConnection(conn) - - nextTick(() => callback()) } /** @@ -252,14 +248,14 @@ class PubsubBaseProtocol extends EventEmitter { * Normalizes the message and signs it, if signing is enabled * * @param {Message} message - * @param {function(Error, Message)} callback + * @returns {Message} */ - _buildMessage (message, callback) { + _buildMessage (message) { const msg = utils.normalizeOutRpcMessage(message) if (this.peerId) { - signMessage(this.peerId, msg, callback) + return signMessage(this.peerId, msg) } else { - nextTick(callback, null, msg) + return message } } @@ -269,11 +265,10 @@ class PubsubBaseProtocol extends EventEmitter { * @abstract * @param {Array|string} topics * @param {Array|any} messages - * @param {function(Error)} callback * @returns {undefined} * */ - publish (topics, messages, callback) { + publish (topics, messages) { throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') } @@ -302,14 +297,11 @@ class PubsubBaseProtocol extends EventEmitter { /** * Mounts the pubsub protocol onto the libp2p node and sends our * subscriptions to every peer conneceted - * - * @param {Function} callback - * @returns {undefined} - * + * @returns {Promise} */ - start (callback) { + async start () { if (this.started) { - return nextTick(() => callback(new Error('already started'))) + throw errcode(new Error('already started'), 'ERR_ALREADY_STARTED') } this.log('starting') @@ -321,25 +313,19 @@ class PubsubBaseProtocol extends EventEmitter { // Dial already connected peers const peerInfos = Object.values(this.libp2p.peerBook.getAll()) - asyncEach(peerInfos, (peer, cb) => this._dialPeer(peer, cb), (err) => { - nextTick(() => { - this.log('started') - this.started = true - callback(err) - }) - }) + await Promise.all(peerInfos.map((peer) => this._dialPeer(peer))) + + this.log('started') + this.started = true } /** * Unmounts the pubsub protocol and shuts down every connection - * - * @param {Function} callback - * @returns {undefined} - * + * @returns {void} */ - stop (callback) { + stop () { if (!this.started) { - return nextTick(() => callback(new Error('not started yet'))) + throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') } this.libp2p.unhandle(this.multicodec) @@ -349,40 +335,30 @@ class PubsubBaseProtocol extends EventEmitter { this._dials = new Set() this.log('stopping') - asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { - if (err) { - return callback(err) - } + this.peers.forEach((peer) => peer.close()) - this.log('stopped') - this.peers = new Map() - this.started = false - callback() - }) + this.log('stopped') + this.peers = new Map() + this.started = false } /** * Validates the given message. The signature will be checked for authenticity. * @param {rpc.RPC.Message} message - * @param {function(Error, Boolean)} callback - * @returns {void} + * @returns {Promise} */ - validate (message, callback) { + async validate (message) { // eslint-disable-line require-await // If strict signing is on and we have no signature, abort if (this.strictSigning && !message.signature) { this.log('Signing required and no signature was present, dropping message:', message) - return nextTick(callback, null, false) + return Promise.resolve(false) } // Check the message signature if present if (message.signature) { - verifySignature(message, (err, valid) => { - if (err) return callback(err) - callback(null, valid) - }) + return verifySignature(message) } else { - // The message is valid - nextTick(callback, null, true) + return Promise.resolve(true) } } } diff --git a/src/message/sign.js b/src/message/sign.js index 1419bcc..7e7e70b 100644 --- a/src/message/sign.js +++ b/src/message/sign.js @@ -9,24 +9,25 @@ const SignPrefix = Buffer.from('libp2p-pubsub:') * * @param {PeerId} peerId * @param {Message} message - * @param {function(Error, Message)} callback - * @returns {void} + * @returns {Promise} */ -function signMessage (peerId, message, callback) { +function signMessage (peerId, message) { // Get the message in bytes, and prepend with the pubsub prefix const bytes = Buffer.concat([ SignPrefix, Message.encode(message) ]) - // Sign the bytes with the private key - peerId.privKey.sign(bytes, (err, signature) => { - if (err) return callback(err) + return new Promise((resolve, reject) => { + // Sign the bytes with the private key + peerId.privKey.sign(bytes, (err, signature) => { + if (err) return reject(err) - callback(null, { - ...message, - signature: signature, - key: peerId.pubKey.bytes + resolve({ + ...message, + signature: signature, + key: peerId.pubKey.bytes + }) }) }) } @@ -34,11 +35,11 @@ function signMessage (peerId, message, callback) { /** * Verifies the signature of the given message * @param {rpc.RPC.Message} message - * @param {function(Error, Boolean)} callback + * @returns {Promise} */ -function verifySignature (message, callback) { +async function verifySignature (message) { // Get message sans the signature - let baseMessage = { ...message } + const baseMessage = { ...message } delete baseMessage.signature delete baseMessage.key const bytes = Buffer.concat([ @@ -47,10 +48,16 @@ function verifySignature (message, callback) { ]) // Get the public key - messagePublicKey(message, (err, pubKey) => { - if (err) return callback(err, false) - // Verify the base message - pubKey.verify(bytes, message.signature, callback) + const pubKey = await messagePublicKey(message) + + // Verify the base message + return new Promise((resolve, reject) => { + pubKey.verify(bytes, message.signature, (err, res) => { + if (err) { + return reject(err) + } + resolve(res) + }) }) } @@ -59,28 +66,28 @@ function verifySignature (message, callback) { * If no, valid PublicKey can be retrieved an error will be returned. * * @param {Message} message - * @param {function(Error, PublicKey)} callback - * @returns {void} + * @returns {Promise} */ -function messagePublicKey (message, callback) { - if (message.key) { - PeerId.createFromPubKey(message.key, (err, peerId) => { - if (err) return callback(err, null) - // the key belongs to the sender, return the key - if (peerId.isEqual(message.from)) return callback(null, peerId.pubKey) - // We couldn't validate pubkey is from the originator, error - callback(new Error('Public Key does not match the originator')) - }) - return - } else { - // should be available in the from property of the message (peer id) - const from = PeerId.createFromBytes(message.from) - if (from.pubKey) { - return callback(null, from.pubKey) +function messagePublicKey (message) { + return new Promise((resolve, reject) => { + if (message.key) { + PeerId.createFromPubKey(message.key, (err, peerId) => { + if (err) return reject(err) + // the key belongs to the sender, return the key + if (peerId.isEqual(message.from)) return resolve(peerId.pubKey) + // We couldn't validate pubkey is from the originator, error + return reject(new Error('Public Key does not match the originator')) + }) + } else { + // should be available in the from property of the message (peer id) + const from = PeerId.createFromBytes(message.from) + if (from.pubKey) { + return resolve(from.pubKey) + } else { + reject(new Error('Could not get the public key from the originator id')) + } } - } - - callback(new Error('Could not get the public key from the originator id')) + }) } module.exports = { diff --git a/src/peer.js b/src/peer.js index 7179b4e..c1bb4ab 100644 --- a/src/peer.js +++ b/src/peer.js @@ -3,7 +3,6 @@ const lp = require('pull-length-prefixed') const Pushable = require('pull-pushable') const pull = require('pull-stream') -const setImmediate = require('async/setImmediate') const EventEmitter = require('events') const { RPC } = require('./message') @@ -162,11 +161,9 @@ class Peer extends EventEmitter { /** * Closes the open connection to peer - * - * @param {Function} callback - * @returns {undefined} + * @returns {void} */ - close (callback) { + close () { // Force removal of peer this._references = 1 @@ -175,12 +172,9 @@ class Peer extends EventEmitter { this.stream.end() } - setImmediate(() => { - this.conn = null - this.stream = null - this.emit('close') - callback() - }) + this.conn = null + this.stream = null + this.emit('close') } } diff --git a/src/utils.js b/src/utils.js index a2d6767..d9c6033 100644 --- a/src/utils.js +++ b/src/utils.js @@ -44,7 +44,7 @@ exports.anyMatch = (a, b) => { bHas = (val) => b.has(val) } - for (let val of a) { + for (const val of a) { if (bHas(val)) { return true } diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 11f10bc..44a2bd0 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,8 +6,6 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect const sinon = require('sinon') -const series = require('async/series') -const parallel = require('async/parallel') const PubsubBaseProtocol = require('../src') const { randomSeqno } = require('../src/utils') @@ -47,57 +45,53 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) { - return done(err) - } - nodeA = nodes[0] - nodeB = nodes[1] - done() - }) + before(async () => { + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) }) - before('mount the pubsub protocol', (done) => { + before('mount the pubsub protocol', () => { psA = new PubsubImplementation(nodeA) psB = new PubsubImplementation(nodeB) - setTimeout(() => { - expect(psA.peers.size).to.be.eql(0) - expect(psB.peers.size).to.be.eql(0) - done() - }, 50) + return new Promise((resolve) => { + setTimeout(() => { + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + resolve() + }, 50) + }) }) - before('start both Pubsub', (done) => { - parallel([ - (cb) => psA.start(cb), - (cb) => psB.start(cb) - ], done) + before('start both Pubsub', () => { + return Promise.all([ + psA.start(), + psB.start() + ]) }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) + after(() => { + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('Dial from nodeA to nodeB', (done) => { - series([ - (cb) => nodeA.dial(nodeB.peerInfo, cb), - (cb) => setTimeout(() => { + it('Dial from nodeA to nodeB', async () => { + await nodeA.dial(nodeB.peerInfo) + + return new Promise((resolve) => { + setTimeout(() => { expect(psA.peers.size).to.equal(1) expect(psB.peers.size).to.equal(1) - cb() + resolve() }, 1000) - ], done) + }) }) - it('_buildMessage normalizes and signs messages', (done) => { + it('_buildMessage normalizes and signs messages', async () => { const message = { from: psA.peerId.id, data: 'hello', @@ -105,17 +99,13 @@ describe('pubsub base protocol', () => { topicIDs: ['test-topic'] } - psA._buildMessage(message, (err, signedMessage) => { - expect(err).to.not.exist() + const signedMessage = await psA._buildMessage(message) + const verified = await psA.validate(signedMessage) - psA.validate(signedMessage, (err, verified) => { - expect(verified).to.eql(true) - done(err) - }) - }) + expect(verified).to.eql(true) }) - it('validate with strict signing off will validate a present signature', (done) => { + it('validate with strict signing off will validate a present signature', async () => { const message = { from: psA.peerId.id, data: 'hello', @@ -125,17 +115,13 @@ describe('pubsub base protocol', () => { sinon.stub(psA, 'strictSigning').value(false) - psA._buildMessage(message, (err, signedMessage) => { - expect(err).to.not.exist() + const signedMessage = await psA._buildMessage(message) + const verified = await psA.validate(signedMessage) - psA.validate(signedMessage, (err, verified) => { - expect(verified).to.eql(true) - done(err) - }) - }) + expect(verified).to.eql(true) }) - it('validate with strict signing requires a signature', (done) => { + it('validate with strict signing requires a signature', async () => { const message = { from: psA.peerId.id, data: 'hello', @@ -143,10 +129,9 @@ describe('pubsub base protocol', () => { topicIDs: ['test-topic'] } - psA.validate(message, (err, verified) => { - expect(verified).to.eql(false) - done(err) - }) + const verified = await psA.validate(message) + + expect(verified).to.eql(false) }) }) @@ -156,45 +141,39 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (cb, nodes) => { - nodeA = nodes[0] - nodeB = nodes[1] - nodeA.dial(nodeB.peerInfo, () => setTimeout(done, 1000)) - }) + before(async () => { + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) + + await nodeA.dial(nodeB.peerInfo) + await new Promise((resolve) => setTimeout(resolve, 1000)) }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) + after(() => { + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('dial on pubsub on mount', (done) => { + it('dial on pubsub on mount', async () => { psA = new PubsubImplementation(nodeA) psB = new PubsubImplementation(nodeB) - parallel([ - (cb) => psA.start(cb), - (cb) => psB.start(cb) - ], next) + await Promise.all([ + psA.start(), + psB.start() + ]) - function next () { - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) - done() - } + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) }) - it('stop both pubsubs', (done) => { - parallel([ - (cb) => psA.stop(cb), - (cb) => psB.stop(cb) - ], done) + it('stop both pubsubs', () => { + psA.stop() + psB.stop() }) }) @@ -205,57 +184,50 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] + before(async () => { + // sandbox = chai.spy.sandbox() + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + sandbox = chai.spy.sandbox() - psB.start(done) - }) + return psB.start() }) - after((done) => { + after(() => { sandbox.restore() - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('does not dial twice to same peer', (done) => { + it('does not dial twice to same peer', async () => { sandbox.on(psA, ['_onDial']) // When node A starts, it will dial all peers in its peer book, which // is just peer B - psA.start(startComplete) + await psA.start() // Simulate a connection coming in from peer B at the same time. This // causes pubsub to dial peer B nodeA.emit('peer:connect', nodeB.peerInfo) - function startComplete () { + return new Promise((resolve) => { // Check that only one dial was made setTimeout(() => { expect(psA._onDial).to.have.been.called.once() - done() + resolve() }, 1000) - } + }) }) }) @@ -266,40 +238,33 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] + before(async () => { + // sandbox = chai.spy.sandbox() + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + sandbox = chai.spy.sandbox() - psB.start(done) - }) + return psB.start() }) - after((done) => { + after(() => { sandbox.restore() - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('can dial again after error', (done) => { + it('can dial again after error', async () => { let firstTime = true const dialProtocol = psA.libp2p.dialProtocol.bind(psA.libp2p) sandbox.on(psA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { @@ -315,19 +280,19 @@ describe('pubsub base protocol', () => { // When node A starts, it will dial all peers in its peer book, which // is just peer B - psA.start(startComplete) + await psA.start() - function startComplete () { - // Simulate a connection coming in from peer B. This causes pubsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) + // Simulate a connection coming in from peer B. This causes pubsub + // to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + return new Promise((resolve) => { // Check that both dials were made setTimeout(() => { expect(psA.libp2p.dialProtocol).to.have.been.called.twice() - done() + resolve() }, 1000) - } + }) }) }) @@ -338,40 +303,34 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - sandbox = chai.spy.sandbox() + before(async () => { + // sandbox = chai.spy.sandbox() + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + sandbox = chai.spy.sandbox() - parallel([ - (cb) => psA.start(cb), - (cb) => psB.start(cb) - ], done) - }) + return Promise.all([ + psA.start(), + psB.start() + ]) }) - after((done) => { + after(() => { sandbox.restore() - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('does not process dial after stop', (done) => { + it('does not process dial after stop', () => { sandbox.on(psA, ['_onDial']) // Simulate a connection coming in from peer B at the same time. This @@ -379,11 +338,13 @@ describe('pubsub base protocol', () => { nodeA.emit('peer:connect', nodeB.peerInfo) // Stop pubsub before the dial can complete - psA.stop(() => { + psA.stop() + + return new Promise((resolve) => { // Check that the dial was not processed setTimeout(() => { expect(psA._onDial).to.not.have.been.called() - done() + resolve() }, 1000) }) }) diff --git a/test/sign.spec.js b/test/sign.spec.js index 9a65dc5..f9cadf1 100644 --- a/test/sign.spec.js +++ b/test/sign.spec.js @@ -17,16 +17,20 @@ const { randomSeqno } = require('../src/utils') describe('message signing', () => { let peerId - before((done) => { - peerId = PeerId.create({ - bits: 1024 - }, (err, id) => { - peerId = id - done(err) + before(async () => { + peerId = await new Promise((resolve, reject) => { + peerId = PeerId.create({ + bits: 1024 + }, (err, id) => { + if (err) { + reject(err) + } + resolve(id) + }) }) }) - it('should be able to sign and verify a message', (done) => { + it('should be able to sign and verify a message', () => { const message = { from: peerId.id, data: 'hello', @@ -36,63 +40,59 @@ describe('message signing', () => { const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { - if (err) return done(err) + return new Promise((resolve, reject) => { + peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { + if (err) return reject(err) - signMessage(peerId, message, (err, signedMessage) => { - if (err) return done(err) + const signedMessage = await signMessage(peerId, message) // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) expect(signedMessage.key).to.eql(peerId.pubKey.bytes) // Verify the signature - verifySignature(signedMessage, (err, verified) => { - expect(err).to.not.exist() - expect(verified).to.eql(true) - done(err) - }) + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + + resolve() }) }) }) - it('should be able to extract the public key from an inlined key', (done) => { - const testSecp256k1 = (peerId) => { - const message = { - from: peerId.id, - data: 'hello', - seqno: randomSeqno(), - topicIDs: ['test-topic'] - } + it('should be able to extract the public key from an inlined key', () => { + return new Promise((resolve, reject) => { + PeerId.create({ keyType: 'secp256k1', bits: 256 }, (err, secPeerId) => { + if (err) return reject(err) - const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { - if (err) return done(err) + const message = { + from: secPeerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } - signMessage(peerId, message, (err, signedMessage) => { - if (err) return done(err) + const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + + secPeerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { + if (err) return reject(err) + + const signedMessage = await signMessage(secPeerId, message) // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) signedMessage.key = undefined // Verify the signature - verifySignature(signedMessage, (err, verified) => { - expect(err).to.not.exist() - expect(verified).to.eql(true) - done(err) - }) + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + + resolve() }) }) - } - - PeerId.create({ keyType: 'secp256k1', bits: 256 }, (err, peerId) => { - expect(err).to.not.exist() - testSecp256k1(peerId) }) }) - it('should be able to extract the public key from the message', (done) => { + it('should be able to extract the public key from the message', () => { const message = { from: peerId.id, data: 'hello', @@ -102,22 +102,21 @@ describe('message signing', () => { const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { - if (err) return done(err) + return new Promise((resolve, reject) => { + peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { + if (err) return reject(err) - signMessage(peerId, message, (err, signedMessage) => { - if (err) return done(err) + const signedMessage = await signMessage(peerId, message) // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) expect(signedMessage.key).to.eql(peerId.pubKey.bytes) // Verify the signature - verifySignature(signedMessage, (err, verified) => { - expect(err).to.not.exist() - expect(verified).to.eql(true) - done(err) - }) + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + + resolve() }) }) }) diff --git a/test/utils.spec.js b/test/utils.spec.js index d573f06..22499ea 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -49,8 +49,8 @@ describe('utils', () => { }) it('converts an IN msg.from to b58', () => { - let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') - let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') + const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' const m = [ { from: binaryId }, { from: stringId } @@ -63,8 +63,8 @@ describe('utils', () => { }) it('converts an OUT msg.from to binary', () => { - let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') - let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') + const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' const m = [ { from: binaryId }, { from: stringId } diff --git a/test/utils/constants.js b/test/utils/constants.js index a836a03..93516b1 100644 --- a/test/utils/constants.js +++ b/test/utils/constants.js @@ -2,7 +2,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const nextTick = require('async/nextTick') const peerJSON = require('../fixtures/test-peer') const multiaddr = require('multiaddr') @@ -17,22 +16,23 @@ let peerRelay = null * so that it can be used by browser nodes during their test suite. This * is necessary for running a TCP node during browser tests. * @private - * @param {function(error, PeerInfo)} callback - * @returns {void} + * @returns {Promise} */ -module.exports.getPeerRelay = (callback) => { - if (peerRelay) return nextTick(callback, null, peerRelay) +module.exports.getPeerRelay = () => { + if (peerRelay) return peerRelay - PeerId.createFromJSON(peerJSON, (err, peerId) => { - if (err) { - return callback(err) - } - peerRelay = new PeerInfo(peerId) + return new Promise((resolve, reject) => { + PeerId.createFromJSON(peerJSON, (err, peerId) => { + if (err) { + return reject(err) + } + peerRelay = new PeerInfo(peerId) - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') + peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') + peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') - callback(null, peerRelay) + resolve(peerRelay) + }) }) } diff --git a/test/utils/index.js b/test/utils/index.js index e3cc5ef..3baadda 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -6,13 +6,21 @@ const Node = require('./nodejs-bundle') const waterfall = require('async/waterfall') -exports.createNode = (callback) => { - waterfall([ - (cb) => PeerId.create({ bits: 1024 }, cb), - (id, cb) => PeerInfo.create(id, cb), - (peerInfo, cb) => { - cb(null, new Node({ peerInfo })) - }, - (node, cb) => node.start((err) => cb(err, node)) - ], callback) +exports.createNode = () => { + return new Promise((resolve, reject) => { + waterfall([ + (cb) => PeerId.create({ bits: 1024 }, cb), + (id, cb) => PeerInfo.create(id, cb), + (peerInfo, cb) => { + cb(null, new Node({ peerInfo })) + }, + (node, cb) => node.start((err) => cb(err, node)) + ], (err, node) => { + if (err) { + return reject(err) + } + + resolve(node) + }) + }) } From f60da79937cdb008c471ee6a1d5594881733dc5b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 15 Oct 2019 12:42:24 +0200 Subject: [PATCH 2/6] feat: use registrar --- .aegir.js | 63 ------ README.md | 131 +++++++++++- package.json | 28 +-- src/index.js | 396 +++++++++++++++-------------------- src/message/sign.js | 64 +++--- src/peer.js | 31 ++- test/fixtures/test-peer.json | 5 - test/instance.spec.js | 72 +++++++ test/pubsub.spec.js | 382 ++++++++++----------------------- test/sign.spec.js | 104 ++++----- test/utils/browser-bundle.js | 31 --- test/utils/constants.js | 40 ---- test/utils/index.js | 76 +++++-- test/utils/nodejs-bundle.js | 26 --- 14 files changed, 622 insertions(+), 827 deletions(-) delete mode 100644 .aegir.js delete mode 100644 test/fixtures/test-peer.json create mode 100644 test/instance.spec.js delete mode 100644 test/utils/browser-bundle.js delete mode 100644 test/utils/constants.js delete mode 100644 test/utils/nodejs-bundle.js diff --git a/.aegir.js b/.aegir.js deleted file mode 100644 index 6b18a9c..0000000 --- a/.aegir.js +++ /dev/null @@ -1,63 +0,0 @@ -'use strict' - -const pull = require('pull-stream') -const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous') - -const Node = require('./test/utils/nodejs-bundle.js') -const { - getPeerRelay, - WS_RENDEZVOUS_MULTIADDR -} = require('./test/utils/constants.js') - -let wsRendezvous -let node - -const before = async () => { - [wsRendezvous, node] = await Promise.all([ - WebSocketStarRendezvous.start({ - port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, - refreshPeerListIntervalMS: 1000, - strictMultiaddr: false, - cryptoChallenge: true - }), - new Promise(async (resolve) => { - const peerInfo = await getPeerRelay() - const n = new Node({ - peerInfo, - config: { - relay: { - enabled: true, - hop: { - enabled: true, - active: true - } - } - } - }) - - n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) - await n.start() - - resolve(n) - }) - ]) -} - -const after = () => { - return new Promise((resolve) => { - setTimeout(async () => { - await Promise.all([ - node.stop(), - wsRendezvous.stop() - ]) - resolve() - }, 2000) - }) -} - -module.exports = { - hooks: { - pre: before, - post: after - } -} diff --git a/README.md b/README.md index 047d0e3..8ac7de9 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ js-libp2p-pubsub [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) [![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub) -> libp2p-pubsub consits on the base protocol for libp2p pubsub implementation. This module is responsible for all the logic regarding peer connections. +> libp2p-pubsub consists on the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol in libp2p, as well as all the logic regarding pubsub connections with other peers. ## Lead Maintainer @@ -22,6 +22,7 @@ js-libp2p-pubsub - [Install](#install) - [Usage](#usage) +- [API](#api) - [Contribute](#contribute) - [License](#license) @@ -33,23 +34,34 @@ js-libp2p-pubsub ## Usage -A pubsub implementation **MUST** override the `_processConnection`, `publish`, `subscribe` and `unsubscribe` functions. +`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algortithm, instead of also needing to create the setup for it. -Other functions, such as `_addPeer`, `_removePeer`, `_onDial`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for mounting the pubsub protocol onto the libp2p node and sending its' subscriptions to every peer connected, while the `stop` function is responsible for unmounting the pubsub protocol and shutting down every connection +A pubsub implementation **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions. + +Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for registering the pubsub protocol onto the libp2p node, while the `stop` function is responsible for unregistering the pubsub protocol and shutting down every connection All the remaining functions **MUST NOT** be overwritten. The following example aims to show how to create your pubsub implementation extending this base protocol. The pubsub implementation will handle the subscriptions logic. +TODO: add explanation for registrar! + ```JavaScript const Pubsub = require('libp2p-pubsub') class PubsubImplementation extends Pubsub { - constructor(libp2p) { - super('libp2p:pubsub', '/pubsub-implementation/1.0.0', libp2p) + constructor(peerInfo, registrar, options = {}) { + super({ + debugName: 'libp2p:pubsub', + multicodecs: '/pubsub-implementation/1.0.0', + peerInfo: peerInfo, + registrar: registrar, + signMessages: options.signMessages, + strictSigning: options.strictSigning + }) } - _processConnection(idB58Str, conn, peer) { + _processMessages(idB58Str, conn, peer) { // Required to be implemented by the subclass // Process each message accordingly } @@ -65,9 +77,114 @@ class PubsubImplementation extends Pubsub { unsubscribe() { // Required to be implemented by the subclass } + + getTopics() { + // Required to be implemented by the subclass + } } ``` +## API + +The following specified API should be the base API for a pubsub implementation on top of `libp2p`. + +### Start + +Start the pubsub subsystem. The protocol will be registered to `libp2p`, which will notify about peers being connected and disconnected with the protocol. + +#### `pubsub.start()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once pubsub starts | + +### Stop + +Stop the pubsub subsystem. The protocol will be unregistered to `libp2p`, which will remove all listeners for the protocol and the streams with other peers will be closed. + +#### `pubsub.stop()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once pubsub stops | + +### Publish + +Publish data messages to pubsub topics. + +#### `pubsub.publish(topics, messages)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topics | `Array|string` | set of pubsub topics | +| messages | `Array|any` | set of messages to publish | + +##### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves once messages are published to the network | + +### Subscribe + +Subscribe to the given topic(s). + +#### `pubsub.subscribe(topics)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topics | `Array|string` | set of pubsub topics | + +### Unsubscribe + +Unsubscribe from the given topic(s). + +#### `pubsub.unsubscribe(topics)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topics | `Array|string` | set of pubsub topics | + +### Get Topics + +Get the list of topics which the peer is subscribed to. + +#### `pubsub.getTopics()` + +##### Returns + +| Type | Description | +|------|-------------| +| `Array` | Array of subscribed topics | + +### Get Peers Subscribed to a topic + +Get a list of the peer-ids that are subscribed to one topic. + +#### `pubsub.getPeersSubscribed(topic)` + +##### Parameters + +| Name | Type | Description | +|------|------|-------------| +| topic | `string` | pubsub topic | + +##### Returns + +| Type | Description | +|------|-------------| +| `Array` | Array of base-58 peer id's | + ### Validate Validates the signature of a message. @@ -99,8 +216,6 @@ Feel free to join in. All welcome. Open an [issue](https://github.com/libp2p/js- This repository falls under the IPFS [Code of Conduct](https://github.com/ipfs/community/blob/master/code-of-conduct.md). -[![](https://cdn.rawgit.com/jbenet/contribute-ipfs-gif/master/img/contribute.gif)](https://github.com/ipfs/community/blob/master/contributing.md) - ## License Copyright (c) Protocol Labs, Inc. under the **MIT License**. See [LICENSE file](./LICENSE) for details. diff --git a/package.json b/package.json index 5b1a5d3..78ceb4c 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,6 @@ "coverage": "aegir coverage", "coverage-publish": "aegir coverage --provider coveralls" }, - "browser": { - "test/utils/nodejs-bundle": "./test/utils/browser-bundle.js" - }, "files": [ "src", "dist" @@ -45,35 +42,26 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-pubsub#readme", "devDependencies": { - "aegir": "^20.3.1", + "aegir": "^20.4.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "dirty-chai": "^2.0.1", - "libp2p": "~0.26.2", - "libp2p-secio": "~0.11.1", - "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.1", - "libp2p-websocket-star": "~0.10.2", - "libp2p-websocket-star-rendezvous": "~0.4.1", - "lodash": "^4.17.15", + "it-pair": "^1.0.0", "multiaddr": "^6.1.0", - "peer-id": "~0.12.2", - "peer-info": "~0.15.1" + "peer-id": "~0.13.3", + "peer-info": "~0.17.0" }, "dependencies": { - "async": "^2.6.2", "bs58": "^4.0.1", "debug": "^4.1.1", "err-code": "^2.0.0", - "length-prefixed-stream": "^2.0.0", + "it-length-prefixed": "^2.0.0", + "it-pipe": "^1.0.1", + "it-pushable": "^1.3.2", "libp2p-crypto": "~0.17.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.3", - "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.14", - "sinon": "^7.5.0", - "time-cache": "~0.3.0" + "sinon": "^7.5.0" }, "contributors": [ "Cayman ", diff --git a/src/index.js b/src/index.js index 753f9d1..54188ab 100644 --- a/src/index.js +++ b/src/index.js @@ -1,57 +1,63 @@ 'use strict' -const EventEmitter = require('events') -const pull = require('pull-stream/pull') -const empty = require('pull-stream/sources/empty') -const TimeCache = require('time-cache') +const assert = require('assert') const debug = require('debug') +const EventEmitter = require('events') const errcode = require('err-code') -const Peer = require('./peer') +const PeerInfo = require('peer-info') + const message = require('./message') +const Peer = require('./peer') +const utils = require('./utils') const { signMessage, verifySignature } = require('./message/sign') -const utils = require('./utils') /** * PubsubBaseProtocol handles the peers and connections logic for pubsub routers */ class PubsubBaseProtocol extends EventEmitter { /** - * @param {String} debugName - * @param {String} multicodec - * @param {Object} libp2p libp2p implementation - * @param {Object} options - * @param {boolean} options.signMessages if messages should be signed, defaults to true - * @param {boolean} options.strictSigning if message signing should be required, defaults to true - * @constructor + * @param {Object} props + * @param {String} props.debugName log namespace + * @param {Array|string} props.multicodecs protocol identificers to connect + * @param {PeerInfo} props.peerInfo peer's peerInfo + * @param {Object} props.registrar registrar for libp2p protocols + * @param {function} props.registrar.register + * @param {function} props.registrar.unregister + * @param {boolean} [props.signMessages] if messages should be signed, defaults to true + * @param {boolean} [props.strictSigning] if message signing should be required, defaults to true + * @abstract */ - constructor (debugName, multicodec, libp2p, options) { - super() + constructor ({ + debugName, + multicodecs, + peerInfo, + registrar, + signMessages = true, + strictSigning = true + }) { + assert(debugName && typeof debugName === 'string', 'a debugname `string` is required') + assert(multicodecs, 'multicodecs are required') + assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') + + // registrar handling + assert(registrar && typeof registrar === 'object', 'a registrar object is required') // TODO: isRegistrar when it's implemented + assert(typeof registrar.register === 'function', 'a register function must be provided in registrar') + assert(typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') - options = { - signMessages: true, - strictSigning: true, - ...options - } + super() this.log = debug(debugName) this.log.err = debug(`${debugName}:error`) - this.multicodec = multicodec - this.libp2p = libp2p - this.started = false - if (options.signMessages) { - this.peerId = this.libp2p.peerInfo.id - } + this.multicodecs = utils.ensureArray(multicodecs) + this.peerInfo = peerInfo + this.registrar = registrar - /** - * If message signing should be required for incoming messages - * @type {boolean} - */ - this.strictSigning = options.strictSigning + this.started = false /** * Map of topics to which peers are subscribed to @@ -60,13 +66,6 @@ class PubsubBaseProtocol extends EventEmitter { */ this.topics = new Map() - /** - * Cache of seen messages - * - * @type {TimeCache} - */ - this.seenCache = new TimeCache() - /** * Map of peers. * @@ -74,181 +73,151 @@ class PubsubBaseProtocol extends EventEmitter { */ this.peers = new Map() - // Dials that are currently in progress - this._dials = new Set() - - this._onConnection = this._onConnection.bind(this) - this._dialPeer = this._dialPeer.bind(this) - } - - /** - * Add a new connected peer to the peers map. - * @private - * @param {PeerInfo} peer peer info - * @returns {PeerInfo} - */ - _addPeer (peer) { - const id = peer.info.id.toB58String() - - /* - Always use an existing peer. + // Message signing + if (signMessages) { + this.peerId = this.peerInfo.id + } - What is happening here is: "If the other peer has already dialed to me, we already have - an establish link between the two, what might be missing is a - Connection specifically between me and that Peer" + /** + * If message signing should be required for incoming messages + * @type {boolean} */ - let existing = this.peers.get(id) - if (!existing) { - this.log('new peer', id) - this.peers.set(id, peer) - existing = peer - - peer.once('close', () => this._removePeer(peer)) - } - ++existing._references + this.strictSigning = strictSigning - return existing + this._onPeerConnected = this._onPeerConnected.bind(this) + this._onPeerDisconnected = this._onPeerDisconnected.bind(this) } /** - * Remove a peer from the peers map if it has no references. - * @private - * @param {Peer} peer peer state - * @returns {PeerInfo} + * Register the pubsub protocol onto the libp2p node. + * @returns {Promise} */ - _removePeer (peer) { - const id = peer.info.id.toB58String() - - this.log('remove', id, peer._references) - // Only delete when no one else is referencing this peer. - if (--peer._references === 0) { - this.log('delete peer', id) - this.peers.delete(id) + async start () { + if (this.started) { + return } + this.log('starting') - return peer + // register protocol with multicodec and handlers + await this.registrar.register(this.multicodecs, { + onConnect: this._onPeerConnected, + onDisconnect: this._onPeerDisconnected + }) + + this.log('started') + this.started = true } /** - * Dial a received peer. - * @private - * @param {PeerInfo} peerInfo peer info + * Unregister the pubsub protocol and the streams with other peers will be closed. * @returns {Promise} */ - _dialPeer (peerInfo) { - const idB58Str = peerInfo.id.toB58String() - - // If already have a PubSub conn, ignore - const peer = this.peers.get(idB58Str) - if (peer && peer.isConnected) { - return Promise.resolve() - } - - // If already dialing this peer, ignore - if (this._dials.has(idB58Str)) { - this.log('already dialing %s, ignoring dial attempt', idB58Str) - return Promise.resolve() + async stop () { + if (!this.started) { + return } - this._dials.add(idB58Str) - - this.log('dialing %s', idB58Str) - - return new Promise((resolve) => { - this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { - this.log('dial to %s complete', idB58Str) - // If the dial is not in the set, it means that pubsub has been - // stopped - const pubsubStopped = !this._dials.has(idB58Str) - this._dials.delete(idB58Str) + // unregister protocol and handlers + await this.registrar.unregister(this.multicodecs) - if (err) { - this.log.err(err) - return resolve() - } - - // pubsub has been stopped, so we should just bail out - if (pubsubStopped) { - this.log('pubsub was stopped, not processing dial to %s', idB58Str) - return resolve() - } + this.log('stopping') + this.peers.forEach((peer) => peer.close()) - this._onDial(peerInfo, conn) - resolve() - }) - }) + this.peers = new Map() + this.started = false + this.log('stopped') } /** - * Dial a received peer. + * Registrar notifies a connection successfully with pubsub protocol. * @private - * @param {PeerInfo} peerInfo peer info + * @param {PeerInfo} peerInfo remote peer info * @param {Connection} conn connection to the peer */ - _onDial (peerInfo, conn) { + _onPeerConnected (peerInfo, conn) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) const peer = this._addPeer(new Peer(peerInfo)) peer.attachConnection(conn) + + this._processMessages(idB58Str, conn, peer) } /** - * On successful connection event. + * Registrar notifies a closing connection with pubsub protocol. * @private - * @param {String} protocol connection protocol - * @param {Connection} conn connection to the peer + * @param {PeerInfo} peerInfo peer info + * @param {Error} err error for connection end */ - _onConnection (protocol, conn) { - conn.getPeerInfo((err, peerInfo) => { - if (err) { - this.log.err('Failed to identify incomming conn', err) - return pull(empty(), conn) - } - - const idB58Str = peerInfo.id.toB58String() - const peer = this._addPeer(new Peer(peerInfo)) + _onPeerDisconnected (peerInfo, err) { + const idB58Str = peerInfo.id.toB58String() + const peer = this.peers.get(idB58Str) - this._processConnection(idB58Str, conn, peer) - }) + this.log('connection ended', idB58Str, err ? err.message : '') + this._removePeer(peer) } /** - * Overriding the implementation of _processConnection should keep the connection and is - * responsible for processing each RPC message received by other peers. - * @abstract - * @param {string} idB58Str peer id string in base58 - * @param {Connection} conn connection + * Add a new connected peer to the peers map. + * @private * @param {PeerInfo} peer peer info - * @returns {undefined} - * + * @returns {PeerInfo} */ - _processConnection (idB58Str, conn, peer) { - throw errcode('_processConnection must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + _addPeer (peer) { + const id = peer.info.id.toB58String() + + let existing = this.peers.get(id) + if (!existing) { + this.log('new peer', id) + this.peers.set(id, peer) + existing = peer + + peer.once('close', () => this._removePeer(peer)) + } + + return existing } /** - * On connection end event. + * Remove a peer from the peers map. * @private - * @param {string} idB58Str peer id string in base58 - * @param {PeerInfo} peer peer info - * @param {Error} err error for connection end + * @param {Peer} peer peer state + * @returns {PeerInfo} */ - _onConnectionEnd (idB58Str, peer, err) { - // socket hang up, means the one side canceled - if (err && err.message !== 'socket hang up') { - this.log.err(err) + _removePeer (peer) { + const id = peer.info.id.toB58String() + + this.log('delete peer', id) + this.peers.delete(id) + return peer + } + + /** + * Validates the given message. The signature will be checked for authenticity. + * @param {rpc.RPC.Message} message + * @returns {Promise} + */ + async validate (message) { // eslint-disable-line require-await + // If strict signing is on and we have no signature, abort + if (this.strictSigning && !message.signature) { + this.log('Signing required and no signature was present, dropping message:', message) + return Promise.resolve(false) } - this.log('connection ended', idB58Str, err ? err.message : '') - this._removePeer(peer) + // Check the message signature if present + if (message.signature) { + return verifySignature(message) + } else { + return Promise.resolve(true) + } } /** * Normalizes the message and signs it, if signing is enabled - * + * @private * @param {Message} message - * @returns {Message} + * @returns {Promise} */ _buildMessage (message) { const msg = utils.normalizeOutRpcMessage(message) @@ -259,17 +228,36 @@ class PubsubBaseProtocol extends EventEmitter { } } + /** + * Get a list of the peer-ids that are subscribed to one topic. + * @param {string} topic + * @returns {Array} + */ + getPeersSubscribed (topic) { + if (!this.started) { + throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') + } + + if (!topic || typeof topic !== 'string') { + throw errcode(new Error('a string topic must be provided'), 'ERR_NOT_VALID_TOPIC') + } + + return Array.from(this.peers.values()) + .filter((peer) => peer.topics.has(topic)) + .map((peer) => peer.info.id.toB58String()) + } + /** * Overriding the implementation of publish should handle the appropriate algorithms for the publish/subscriber implementation. * For example, a Floodsub implementation might simply publish each message to each topic for every peer * @abstract * @param {Array|string} topics * @param {Array|any} messages - * @returns {undefined} + * @returns {Promise} * */ publish (topics, messages) { - throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('publish must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** @@ -277,10 +265,10 @@ class PubsubBaseProtocol extends EventEmitter { * For example, a Floodsub implementation might simply send a message for every peer showing interest in the topics * @abstract * @param {Array|string} topics - * @returns {undefined} + * @returns {void} */ subscribe (topics) { - throw errcode('subscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('subscribe must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** @@ -288,78 +276,34 @@ class PubsubBaseProtocol extends EventEmitter { * For example, a Floodsub implementation might simply send a message for every peer revoking interest in the topics * @abstract * @param {Array|string} topics - * @returns {undefined} + * @returns {void} */ unsubscribe (topics) { - throw errcode('unsubscribe must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') + throw errcode(new Error('unsubscribe must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** - * Mounts the pubsub protocol onto the libp2p node and sends our - * subscriptions to every peer conneceted - * @returns {Promise} + * Overriding the implementation of getTopics should handle the appropriate algorithms for the publish/subscriber implementation. + * Get the list of subscriptions the peer is subscribed to. + * @abstract + * @returns {Array} */ - async start () { - if (this.started) { - throw errcode(new Error('already started'), 'ERR_ALREADY_STARTED') - } - this.log('starting') - - this.libp2p.handle(this.multicodec, this._onConnection) - - // Speed up any new peer that comes in my way - this.libp2p.on('peer:connect', this._dialPeer) - - // Dial already connected peers - const peerInfos = Object.values(this.libp2p.peerBook.getAll()) - - await Promise.all(peerInfos.map((peer) => this._dialPeer(peer))) - - this.log('started') - this.started = true + getTopics () { + throw errcode(new Error('getTopics must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } /** - * Unmounts the pubsub protocol and shuts down every connection + * Overriding the implementation of _processMessages should keep the connection and is + * responsible for processing each RPC message received by other peers. + * @abstract + * @param {string} idB58Str peer id string in base58 + * @param {Connection} conn connection + * @param {PeerInfo} peer peer info * @returns {void} + * */ - stop () { - if (!this.started) { - throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') - } - - this.libp2p.unhandle(this.multicodec) - this.libp2p.removeListener('peer:connect', this._dialPeer) - - // Prevent any dials that are in flight from being processed - this._dials = new Set() - - this.log('stopping') - this.peers.forEach((peer) => peer.close()) - - this.log('stopped') - this.peers = new Map() - this.started = false - } - - /** - * Validates the given message. The signature will be checked for authenticity. - * @param {rpc.RPC.Message} message - * @returns {Promise} - */ - async validate (message) { // eslint-disable-line require-await - // If strict signing is on and we have no signature, abort - if (this.strictSigning && !message.signature) { - this.log('Signing required and no signature was present, dropping message:', message) - return Promise.resolve(false) - } - - // Check the message signature if present - if (message.signature) { - return verifySignature(message) - } else { - return Promise.resolve(true) - } + _processMessages (idB58Str, conn, peer) { + throw errcode(new Error('_processMessages must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED') } } diff --git a/src/message/sign.js b/src/message/sign.js index 7e7e70b..1ccf118 100644 --- a/src/message/sign.js +++ b/src/message/sign.js @@ -11,25 +11,20 @@ const SignPrefix = Buffer.from('libp2p-pubsub:') * @param {Message} message * @returns {Promise} */ -function signMessage (peerId, message) { +async function signMessage (peerId, message) { // Get the message in bytes, and prepend with the pubsub prefix const bytes = Buffer.concat([ SignPrefix, Message.encode(message) ]) - return new Promise((resolve, reject) => { - // Sign the bytes with the private key - peerId.privKey.sign(bytes, (err, signature) => { - if (err) return reject(err) + const signature = await peerId.privKey.sign(bytes) - resolve({ - ...message, - signature: signature, - key: peerId.pubKey.bytes - }) - }) - }) + return { + ...message, + signature: signature, + key: peerId.pubKey.bytes + } } /** @@ -50,15 +45,8 @@ async function verifySignature (message) { // Get the public key const pubKey = await messagePublicKey(message) - // Verify the base message - return new Promise((resolve, reject) => { - pubKey.verify(bytes, message.signature, (err, res) => { - if (err) { - return reject(err) - } - resolve(res) - }) - }) + // verify the base message + return pubKey.verify(bytes, message.signature) } /** @@ -68,26 +56,24 @@ async function verifySignature (message) { * @param {Message} message * @returns {Promise} */ -function messagePublicKey (message) { - return new Promise((resolve, reject) => { - if (message.key) { - PeerId.createFromPubKey(message.key, (err, peerId) => { - if (err) return reject(err) - // the key belongs to the sender, return the key - if (peerId.isEqual(message.from)) return resolve(peerId.pubKey) - // We couldn't validate pubkey is from the originator, error - return reject(new Error('Public Key does not match the originator')) - }) +async function messagePublicKey (message) { + if (message.key) { + const peerId = await PeerId.createFromPubKey(message.key) + + // the key belongs to the sender, return the key + if (peerId.isEqual(message.from)) return peerId.pubKey + // We couldn't validate pubkey is from the originator, error + throw new Error('Public Key does not match the originator') + } else { + // should be available in the from property of the message (peer id) + const from = PeerId.createFromBytes(message.from) + + if (from.pubKey) { + return from.pubKey } else { - // should be available in the from property of the message (peer id) - const from = PeerId.createFromBytes(message.from) - if (from.pubKey) { - return resolve(from.pubKey) - } else { - reject(new Error('Could not get the public key from the originator id')) - } + throw new Error('Could not get the public key from the originator id') } - }) + } } module.exports = { diff --git a/src/peer.js b/src/peer.js index c1bb4ab..0939f93 100644 --- a/src/peer.js +++ b/src/peer.js @@ -1,10 +1,11 @@ 'use strict' -const lp = require('pull-length-prefixed') -const Pushable = require('pull-pushable') -const pull = require('pull-stream') const EventEmitter = require('events') +const lp = require('it-length-prefixed') +const pushable = require('it-pushable') +const pipe = require('it-pipe') + const { RPC } = require('./message') /** @@ -33,8 +34,6 @@ class Peer extends EventEmitter { * @type {Pushable} */ this.stream = null - - this._references = 0 } /** @@ -75,21 +74,22 @@ class Peer extends EventEmitter { * Attach the peer to a connection and setup a write stream * * @param {Connection} conn - * @returns {undefined} + * @returns {void} */ attachConnection (conn) { this.conn = conn - this.stream = new Pushable() - - pull( - this.stream, - lp.encode(), - conn, - pull.onEnd(() => { + this.stream = pushable({ + onEnd: () => { this.conn = null this.stream = null this.emit('close') - }) + } + }) + + pipe( + this.stream, + lp.encode(), + conn ) this.emit('connection') @@ -164,9 +164,6 @@ class Peer extends EventEmitter { * @returns {void} */ close () { - // Force removal of peer - this._references = 1 - // End the pushable if (this.stream) { this.stream.end() diff --git a/test/fixtures/test-peer.json b/test/fixtures/test-peer.json deleted file mode 100644 index 105046a..0000000 --- a/test/fixtures/test-peer.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "id": "Qmex1SSsueWFsUfjdkugJ5zhcnjddAt8TxcnDLUXKD9Sx7", - "privKey": "CAASqAkwggSkAgEAAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAECggEAdBUzV/GaQ0nmoQrWvOnUxmFIho7kCjkh1NwnNVPNc+Msa1r7pcI9wJNPwap8j1w4L/cZuYhOJgcg+o2mWFiuULKZ4F9Ro/M89gZ038457g2/2pPu43c/Xoi/2YcAHXg0Gr+OCe2zCIyITBWKAFqyAzL6DubAxrJW2Ezj1LrZ+EZgMyzbh/go/eEGSJaaGkINeAkY144DqDWWWvzyhKhryipsGkZGEkVy9xJgMEI3ipVvuPez2XAvoyyeuinBBLe+Z2vY5G50XXzbIMhIQGLncHf9MwTv6wt1ilyOSLOXK0BoQbB76J3R3is5dSULXXP9r8VocjLBEkmBuf4FXAKzoQKBgQDNNS4F1XE1gxD8LPkL+aB/hi6eVHVPhr+w0I/9ATikcLGeUfBM2Gd6cZRPFtNVrv1p6ZF1D1UyGDknGbDBSQd9wLUgb0fDoo3jKYMGWq6G+VvaP5rzWQeBV8YV2EhSmUk1i6kiYe2ZE8WyrPie7iwpQIY60e2A8Ly0GKZiBZUcHQKBgQC9YDAVsGnEHFVFkTDpvw5HwEzCgTb2A3NgkGY3rTYZ7L6AFjqCYmUwFB8Fmbyc4kdFWNh8wfmq5Qrvl49NtaeukiqWKUUlB8uPdztB1P0IahA2ks0owStZlRifmwfgYyMd4xE17lhaOgQQJZZPxmP0F6mdOvb3YJafNURCdMS51wKBgEvvIM+h0tmFXXSjQ6kNvzlRMtD92ccKysYn9xAdMpOO6/r0wSH+dhQWEVZO0PcE4NsfReb2PIVj90ojtIdhebcr5xpQc1LORQjJJKXmSmzBux6AqNrhl+hhzXfp56FA/Zkly/lgGWaqrV5XqUxOP+Mn8EO1yNgMvRc7g94DyNB1AoGBAKLBuXHalXwDsdHBUB2Eo3xNLGt6bEcRfia+0+sEBdxQGQWylQScFkU09dh1YaIf44sZKa5HdBFJGpYCVxo9hmjFnK5Dt/Z0daHOonIY4INLzLVqg8KECoLKXkhGEIXsDjFQhukn+G1LMVTDSSU055DQiWjlVX4UWD9qo0jOXIkvAoGBAMP50p2X6PsWWZUuuR7i1JOJHRyQZPWdHh9p8SSLnCtEpHYZfJr4INXNmhnSiB/3TUnHix2vVKjosjMTCk/CjfzXV2H41WPOLZ2/Pi3SxCicWIRj4kCcWhkEuIF2jGkg1+jmNiCl/zNMaBOAIP3QbDPtqOWbYlPd2YIzdj6WQ6R4", - "pubKey": "CAASpgIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCXzV127CvVHOGMzvsn/U+/32JM58KA6k0FSCCeNFzNowiDS/vV5eezGN5AFoxsF6icWLoaczz7l9RdVD+I/t6PEt9X7XUdrDCtSS8WmAcCgvZWSSf7yAd3jT4GSZDUIgIEeRZsERDt/yVqTLwsZ1G9dMIeh8sbf2zwjTXZIWaRM6o4lq3DYFfzLvJUXlJodxPogU7l7nLkITPUv+yQAMcVHizbNwJvwiETKYeUj73/m/wEPAlnFESexDstxNiIwE/FH8Ao50QPZRO6E6Jb0hhYSI/4CLRdrzDFm/Vzplei3Wr2DokSROaNyeG37VAueyA+pDqn84um+L9uXLwbv5FbAgMBAAE=" -} \ No newline at end of file diff --git a/test/instance.spec.js b/test/instance.spec.js new file mode 100644 index 0000000..e57a107 --- /dev/null +++ b/test/instance.spec.js @@ -0,0 +1,72 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-spies')) +const expect = chai.expect + +const PubsubBaseProtocol = require('../src') +const { createPeerInfo, mockRegistrar } = require('./utils') + +describe('should validate instance parameters', () => { + let peerInfo + + before(async () => { + peerInfo = await createPeerInfo() + }) + + it('should throw if no debugName is provided', () => { + expect(() => { + new PubsubBaseProtocol() // eslint-disable-line no-new + }).to.throw() + }) + + it('should throw if no multicodec is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub' + }) + }).to.throw() + }) + + it('should throw if no peerInfo is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0' + }) + }).to.throw() + }) + + it('should throw if an invalid peerInfo is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: 'fake-peer-info' + }) + }).to.throw() + }) + + it('should throw if no registrar object is provided', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo + }) + }).to.throw() + }) + + it('should accept valid parameters', () => { + expect(() => { + new PubsubBaseProtocol({ // eslint-disable-line no-new + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: mockRegistrar + }) + }).not.to.throw() + }) +}) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 44a2bd0..1e68d02 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,347 +6,197 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect const sinon = require('sinon') +const DuplexPair = require('it-pair/duplex') const PubsubBaseProtocol = require('../src') const { randomSeqno } = require('../src/utils') -const utils = require('./utils') -const createNode = utils.createNode - -class PubsubImplementation extends PubsubBaseProtocol { - constructor (libp2p) { - super('libp2p:pubsub', 'libp2p:pubsub-implementation', libp2p) - } - - publish (topics, messages) { - // ... - } - - subscribe (topics) { - // ... - } - - unsubscribe (topics) { - // ... - } - - _processConnection (idB58Str, conn, peer) { - // ... - } -} +const { createPeerInfo, mockRegistrar, PubsubImplementation } = require('./utils') describe('pubsub base protocol', () => { - afterEach(() => { - sinon.restore() - }) + describe('should start and stop properly', () => { + let pubsub + let sinonMockRegistrar + + beforeEach(async () => { + const peerInfo = await createPeerInfo() + sinonMockRegistrar = { + register: sinon.stub(), + unregister: sinon.stub() + } - describe('fresh nodes', () => { - let nodeA - let nodeB - let psA - let psB + pubsub = new PubsubBaseProtocol({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: sinonMockRegistrar + }) - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) + expect(pubsub.peers.size).to.be.eql(0) }) - before('mount the pubsub protocol', () => { - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + afterEach(() => { + sinon.restore() + }) - return new Promise((resolve) => { - setTimeout(() => { - expect(psA.peers.size).to.be.eql(0) - expect(psB.peers.size).to.be.eql(0) - resolve() - }, 50) - }) + it('should be able to start and stop', async () => { + await pubsub.start() + expect(sinonMockRegistrar.register.calledOnce).to.be.true() + + await pubsub.stop() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() }) - before('start both Pubsub', () => { - return Promise.all([ - psA.start(), - psB.start() - ]) + it('should not throw to start if already started', async () => { + await pubsub.start() + await pubsub.start() + expect(sinonMockRegistrar.register.calledOnce).to.be.true() + + await pubsub.stop() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() }) - after(() => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) + it('should not throw if stop before start', async () => { + await pubsub.stop() + expect(sinonMockRegistrar.register.calledOnce).to.be.false() + expect(sinonMockRegistrar.unregister.calledOnce).to.be.false() }) + }) - it('Dial from nodeA to nodeB', async () => { - await nodeA.dial(nodeB.peerInfo) + describe('should handle messages creating and signing', () => { + let peerInfo + let pubsub - return new Promise((resolve) => { - setTimeout(() => { - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) - resolve() - }, 1000) + before(async () => { + peerInfo = await createPeerInfo() + pubsub = new PubsubBaseProtocol({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: mockRegistrar }) }) + afterEach(() => { + sinon.restore() + }) + it('_buildMessage normalizes and signs messages', async () => { const message = { - from: psA.peerId.id, + from: peerInfo.id.id, data: 'hello', seqno: randomSeqno(), topicIDs: ['test-topic'] } - const signedMessage = await psA._buildMessage(message) - const verified = await psA.validate(signedMessage) + const signedMessage = await pubsub._buildMessage(message) + const verified = await pubsub.validate(signedMessage) expect(verified).to.eql(true) }) it('validate with strict signing off will validate a present signature', async () => { const message = { - from: psA.peerId.id, + from: peerInfo.id.id, data: 'hello', seqno: randomSeqno(), topicIDs: ['test-topic'] } - sinon.stub(psA, 'strictSigning').value(false) + sinon.stub(pubsub, 'strictSigning').value(false) - const signedMessage = await psA._buildMessage(message) - const verified = await psA.validate(signedMessage) + const signedMessage = await pubsub._buildMessage(message) + const verified = await pubsub.validate(signedMessage) expect(verified).to.eql(true) }) it('validate with strict signing requires a signature', async () => { const message = { - from: psA.peerId.id, + from: peerInfo.id.id, data: 'hello', seqno: randomSeqno(), topicIDs: ['test-topic'] } - const verified = await psA.validate(message) + const verified = await pubsub.validate(message) expect(verified).to.eql(false) }) }) - describe('dial the pubsub protocol on mount', () => { - let nodeA - let nodeB - let psA - let psB - - before(async () => { - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - await nodeA.dial(nodeB.peerInfo) - await new Promise((resolve) => setTimeout(resolve, 1000)) - }) - - after(() => { - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) - }) - - it('dial on pubsub on mount', async () => { - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - - await Promise.all([ - psA.start(), - psB.start() - ]) - - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) - }) - - it('stop both pubsubs', () => { - psA.stop() - psB.stop() - }) - }) - - describe('prevent concurrent dials', () => { - let sandbox - let nodeA - let nodeB - let psA - let psB - - before(async () => { - // sandbox = chai.spy.sandbox() - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - - sandbox = chai.spy.sandbox() - - return psB.start() + describe('should be able to register two nodes', () => { + const protocol = '/pubsub/1.0.0' + let pubsubA, pubsubB + let peerInfoA, peerInfoB + const registrarRecordA = {} + const registrarRecordB = {} + + const registrar = (registrarRecord) => ({ + register: (multicodecs, handlers) => { + registrarRecord[multicodecs[0]] = handlers + }, + unregister: (multicodecs) => { + delete registrarRecord[multicodecs[0]] + } }) - after(() => { - sandbox.restore() + // mount pubsub + beforeEach(async () => { + peerInfoA = await createPeerInfo() + peerInfoB = await createPeerInfo() - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) + pubsubA = new PubsubImplementation(protocol, peerInfoA, registrar(registrarRecordA)) + pubsubB = new PubsubImplementation(protocol, peerInfoB, registrar(registrarRecordB)) }) - it('does not dial twice to same peer', async () => { - sandbox.on(psA, ['_onDial']) - - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await psA.start() - - // Simulate a connection coming in from peer B at the same time. This - // causes pubsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - return new Promise((resolve) => { - // Check that only one dial was made - setTimeout(() => { - expect(psA._onDial).to.have.been.called.once() - resolve() - }, 1000) - }) - }) - }) - - describe('allow dials even after error', () => { - let sandbox - let nodeA - let nodeB - let psA - let psB - - before(async () => { - // sandbox = chai.spy.sandbox() - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() + // start pubsub + beforeEach(async () => { + await Promise.all([ + pubsubA.start(), + pubsubB.start() ]) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) - - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - sandbox = chai.spy.sandbox() - - return psB.start() + expect(Object.keys(registrarRecordA)).to.have.lengthOf(1) + expect(Object.keys(registrarRecordB)).to.have.lengthOf(1) }) - after(() => { - sandbox.restore() + afterEach(() => { + sinon.restore() return Promise.all([ - nodeA.stop(), - nodeB.stop() + pubsubA.stop(), + pubsubB.stop() ]) }) - it('can dial again after error', async () => { - let firstTime = true - const dialProtocol = psA.libp2p.dialProtocol.bind(psA.libp2p) - sandbox.on(psA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { - // Return an error for the first dial - if (firstTime) { - firstTime = false - return cb(new Error('dial error')) - } - - // Subsequent dials proceed as normal - dialProtocol(peerInfo, multicodec, cb) - }) + it('should handle onConnect as expected', () => { + const onConnectA = registrarRecordA[protocol].onConnect + const onConnectB = registrarRecordB[protocol].onConnect - // When node A starts, it will dial all peers in its peer book, which - // is just peer B - await psA.start() + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) - // Simulate a connection coming in from peer B. This causes pubsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) - - return new Promise((resolve) => { - // Check that both dials were made - setTimeout(() => { - expect(psA.libp2p.dialProtocol).to.have.been.called.twice() - resolve() - }, 1000) - }) - }) - }) - - describe('prevent processing dial after stop', () => { - let sandbox - let nodeA - let nodeB - let psA - let psB - - before(async () => { - // sandbox = chai.spy.sandbox() - [nodeA, nodeB] = await Promise.all([ - createNode(), - createNode() - ]) - - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) - - sandbox = chai.spy.sandbox() - - return Promise.all([ - psA.start(), - psB.start() - ]) - }) - - after(() => { - sandbox.restore() - - return Promise.all([ - nodeA.stop(), - nodeB.stop() - ]) + expect(pubsubA.peers.size).to.be.eql(1) + expect(pubsubB.peers.size).to.be.eql(1) }) - it('does not process dial after stop', () => { - sandbox.on(psA, ['_onDial']) + it('should handle onDisconnect as expected', () => { + const onConnectA = registrarRecordA[protocol].onConnect + const onDisconnectA = registrarRecordA[protocol].onDisconnect + const onConnectB = registrarRecordB[protocol].onConnect + const onDisconnectB = registrarRecordB[protocol].onDisconnect - // Simulate a connection coming in from peer B at the same time. This - // causes pubsub to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) + // Notice peers of connection + const [d0, d1] = DuplexPair() + onConnectA(peerInfoB, d0) + onConnectB(peerInfoA, d1) + onDisconnectA(peerInfoB) + onDisconnectB(peerInfoA) - // Stop pubsub before the dial can complete - psA.stop() - - return new Promise((resolve) => { - // Check that the dial was not processed - setTimeout(() => { - expect(psA._onDial).to.not.have.been.called() - resolve() - }, 1000) - }) + expect(pubsubA.peers.size).to.be.eql(0) + expect(pubsubB.peers.size).to.be.eql(0) }) }) }) diff --git a/test/sign.spec.js b/test/sign.spec.js index f9cadf1..573aac0 100644 --- a/test/sign.spec.js +++ b/test/sign.spec.js @@ -18,19 +18,12 @@ const { randomSeqno } = require('../src/utils') describe('message signing', () => { let peerId before(async () => { - peerId = await new Promise((resolve, reject) => { - peerId = PeerId.create({ - bits: 1024 - }, (err, id) => { - if (err) { - reject(err) - } - resolve(id) - }) + peerId = await PeerId.create({ + bits: 1024 }) }) - it('should be able to sign and verify a message', () => { + it('should be able to sign and verify a message', async () => { const message = { from: peerId.id, data: 'hello', @@ -39,60 +32,44 @@ describe('message signing', () => { } const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + const expectedSignature = await peerId.privKey.sign(bytesToSign) - return new Promise((resolve, reject) => { - peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { - if (err) return reject(err) + const signedMessage = await signMessage(peerId, message) - const signedMessage = await signMessage(peerId, message) + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - // Check the signature and public key - expect(signedMessage.signature).to.eql(expectedSignature) - expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - - // Verify the signature - const verified = await verifySignature(signedMessage) - expect(verified).to.eql(true) - - resolve() - }) - }) + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) }) - it('should be able to extract the public key from an inlined key', () => { - return new Promise((resolve, reject) => { - PeerId.create({ keyType: 'secp256k1', bits: 256 }, (err, secPeerId) => { - if (err) return reject(err) - - const message = { - from: secPeerId.id, - data: 'hello', - seqno: randomSeqno(), - topicIDs: ['test-topic'] - } + it('should be able to extract the public key from an inlined key', async () => { + const secPeerId = await PeerId.create({ keyType: 'secp256k1', bits: 256 }) - const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - - secPeerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { - if (err) return reject(err) + const message = { + from: secPeerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } - const signedMessage = await signMessage(secPeerId, message) + const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + const expectedSignature = await secPeerId.privKey.sign(bytesToSign) - // Check the signature and public key - expect(signedMessage.signature).to.eql(expectedSignature) - signedMessage.key = undefined + const signedMessage = await signMessage(secPeerId, message) - // Verify the signature - const verified = await verifySignature(signedMessage) - expect(verified).to.eql(true) + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + signedMessage.key = undefined - resolve() - }) - }) - }) + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) }) - it('should be able to extract the public key from the message', () => { + it('should be able to extract the public key from the message', async () => { const message = { from: peerId.id, data: 'hello', @@ -101,23 +78,16 @@ describe('message signing', () => { } const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + const expectedSignature = await peerId.privKey.sign(bytesToSign) - return new Promise((resolve, reject) => { - peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { - if (err) return reject(err) + const signedMessage = await signMessage(peerId, message) - const signedMessage = await signMessage(peerId, message) + // Check the signature and public key + expect(signedMessage.signature).to.eql(expectedSignature) + expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - // Check the signature and public key - expect(signedMessage.signature).to.eql(expectedSignature) - expect(signedMessage.key).to.eql(peerId.pubKey.bytes) - - // Verify the signature - const verified = await verifySignature(signedMessage) - expect(verified).to.eql(true) - - resolve() - }) - }) + // Verify the signature + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) }) }) diff --git a/test/utils/browser-bundle.js b/test/utils/browser-bundle.js deleted file mode 100644 index 117e0a0..0000000 --- a/test/utils/browser-bundle.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -const WebSocketStar = require('libp2p-websocket-star') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -const { WS_STAR_MULTIADDR } = require('./constants') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const starOpts = { id: peerInfo.id } - const wsStar = new WebSocketStar(starOpts) - - peerInfo.multiaddrs.add(WS_STAR_MULTIADDR) - - const modules = { - transport: [wsStar], - streamMuxer: [spdy], - connEncryption: [secio] - } - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node diff --git a/test/utils/constants.js b/test/utils/constants.js deleted file mode 100644 index 93516b1..0000000 --- a/test/utils/constants.js +++ /dev/null @@ -1,40 +0,0 @@ -'use strict' - -const PeerId = require('peer-id') -const PeerInfo = require('peer-info') -const peerJSON = require('../fixtures/test-peer') -const multiaddr = require('multiaddr') - -let peerRelay = null - -/** - * Creates a `PeerInfo` that can be used across testing. Once the - * relay `PeerInfo` has been requested, it will be reused for each - * additional request. - * - * This is currently being used to create a relay on test bootstrapping - * so that it can be used by browser nodes during their test suite. This - * is necessary for running a TCP node during browser tests. - * @private - * @returns {Promise} - */ -module.exports.getPeerRelay = () => { - if (peerRelay) return peerRelay - - return new Promise((resolve, reject) => { - PeerId.createFromJSON(peerJSON, (err, peerId) => { - if (err) { - return reject(err) - } - peerRelay = new PeerInfo(peerId) - - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') - - resolve(peerRelay) - }) - }) -} - -module.exports.WS_STAR_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/ws/p2p-websocket-star/') -module.exports.WS_RENDEZVOUS_MULTIADDR = multiaddr('/ip4/127.0.0.1/tcp/14444/wss') diff --git a/test/utils/index.js b/test/utils/index.js index 3baadda..90cc9f0 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -1,26 +1,64 @@ 'use strict' +const lp = require('it-length-prefixed') +const pipe = require('it-pipe') + const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const Node = require('./nodejs-bundle') - -const waterfall = require('async/waterfall') - -exports.createNode = () => { - return new Promise((resolve, reject) => { - waterfall([ - (cb) => PeerId.create({ bits: 1024 }, cb), - (id, cb) => PeerInfo.create(id, cb), - (peerInfo, cb) => { - cb(null, new Node({ peerInfo })) - }, - (node, cb) => node.start((err) => cb(err, node)) - ], (err, node) => { - if (err) { - return reject(err) - } - resolve(node) +const PubsubBaseProtocol = require('../../src') +const { message } = require('../../src') + +exports.createPeerInfo = async () => { + const peerId = await PeerId.create({ bits: 1024 }) + + return PeerInfo.create(peerId) +} + +class PubsubImplementation extends PubsubBaseProtocol { + constructor (protocol, peerInfo, registrar) { + super({ + debugName: 'libp2p:pubsub', + multicodecs: protocol, + peerInfo: peerInfo, + registrar: registrar }) - }) + } + + publish (topics, messages) { + // ... + } + + subscribe (topics) { + // ... + } + + unsubscribe (topics) { + // ... + } + + _processMessages (idB58Str, conn, peer) { + pipe( + conn, + lp.decode(), + async function collect (source) { + for await (const val of source) { + const rpc = message.rpc.RPC.decode(val) + + return rpc + } + } + ) + } +} + +exports.PubsubImplementation = PubsubImplementation + +exports.mockRegistrar = { + register: (multicodec, handlers) => { + + }, + unregister: (multicodec) => { + + } } diff --git a/test/utils/nodejs-bundle.js b/test/utils/nodejs-bundle.js deleted file mode 100644 index ef572d1..0000000 --- a/test/utils/nodejs-bundle.js +++ /dev/null @@ -1,26 +0,0 @@ -'use strict' - -const TCP = require('libp2p-tcp') -const spdy = require('libp2p-spdy') -const secio = require('libp2p-secio') -const libp2p = require('libp2p') - -class Node extends libp2p { - constructor ({ peerInfo, peerBook }) { - const modules = { - transport: [TCP], - streamMuxer: [spdy], - connEncryption: [secio] - } - - peerInfo.multiaddrs.add('/ip4/127.0.0.1/tcp/0') - - super({ - modules, - peerInfo, - peerBook - }) - } -} - -module.exports = Node From 5b4f41b35be13e393d9c477432f059f83942a46e Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 28 Oct 2019 09:55:24 +0100 Subject: [PATCH 3/6] chore: apply suggestions from code review Co-Authored-By: Jacob Heun --- README.md | 8 +++--- src/index.js | 59 ++++++++++++++++++++++++++++++++++++--------- src/peer.js | 5 ++++ test/pubsub.spec.js | 57 +++++++++++++++++++++++++------------------ test/utils.spec.js | 2 +- test/utils/index.js | 43 ++++++++++++++++++++++++++++++--- 6 files changed, 131 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 8ac7de9..d017240 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ js-libp2p-pubsub [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) [![](https://img.shields.io/badge/pm-waffle-yellow.svg?style=flat-square)](https://waffle.io/libp2p/js-libp2p-pubsub) -> libp2p-pubsub consists on the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol in libp2p, as well as all the logic regarding pubsub connections with other peers. +> libp2p-pubsub is the base protocol for libp2p pubsub implementations. This module is responsible for registering the protocol with libp2p, as well as managing the logic regarding pubsub connections with other peers. ## Lead Maintainer @@ -34,11 +34,11 @@ js-libp2p-pubsub ## Usage -`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algortithm, instead of also needing to create the setup for it. +`libp2p-pubsub` abstracts the implementation protocol registration within `libp2p` and takes care of all the protocol connections. This way, a pubsub implementation can focus on its routing algorithm, instead of also needing to create the setup for it. A pubsub implementation **MUST** override the `_processMessages`, `publish`, `subscribe`, `unsubscribe` and `getTopics` functions. -Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to add custom logic on them. It is important pointing out that `start` and `stop` **must** call `super`. The `start` function is responsible for registering the pubsub protocol onto the libp2p node, while the `stop` function is responsible for unregistering the pubsub protocol and shutting down every connection +Other functions, such as `_onPeerConnected`, `_onPeerDisconnected`, `_addPeer`, `_removePeer`, `start` and `stop` may be overwritten if the pubsub implementation needs to customize their logic. Implementations overriding `start` and `stop` **MUST** call `super`. The `start` function is responsible for registering the pubsub protocol with libp2p, while the `stop` function is responsible for unregistering the pubsub protocol and closing pubsub connections. All the remaining functions **MUST NOT** be overwritten. @@ -183,7 +183,7 @@ Get a list of the peer-ids that are subscribed to one topic. | Type | Description | |------|-------------| -| `Array` | Array of base-58 peer id's | +| `Array` | Array of base-58 PeerId's | ### Validate diff --git a/src/index.js b/src/index.js index 54188ab..e73ed2c 100644 --- a/src/index.js +++ b/src/index.js @@ -25,6 +25,7 @@ class PubsubBaseProtocol extends EventEmitter { * @param {Array|string} props.multicodecs protocol identificers to connect * @param {PeerInfo} props.peerInfo peer's peerInfo * @param {Object} props.registrar registrar for libp2p protocols + * @param {function} props.registrar.handle * @param {function} props.registrar.register * @param {function} props.registrar.unregister * @param {boolean} [props.signMessages] if messages should be signed, defaults to true @@ -44,7 +45,8 @@ class PubsubBaseProtocol extends EventEmitter { assert(PeerInfo.isPeerInfo(peerInfo), 'peer info must be an instance of `peer-info`') // registrar handling - assert(registrar && typeof registrar === 'object', 'a registrar object is required') // TODO: isRegistrar when it's implemented + assert(registrar && typeof registrar === 'object', 'a registrar object is required') + assert(typeof registrar.handle === 'function', 'a handle function must be provided in registrar') assert(typeof registrar.register === 'function', 'a register function must be provided in registrar') assert(typeof registrar.unregister === 'function', 'a unregister function must be provided in registrar') @@ -84,13 +86,15 @@ class PubsubBaseProtocol extends EventEmitter { */ this.strictSigning = strictSigning + this._registrarId = undefined + this._onIncomingStream = this._onIncomingStream.bind(this) this._onPeerConnected = this._onPeerConnected.bind(this) this._onPeerDisconnected = this._onPeerDisconnected.bind(this) } /** * Register the pubsub protocol onto the libp2p node. - * @returns {Promise} + * @returns {Promise} */ async start () { if (this.started) { @@ -98,8 +102,11 @@ class PubsubBaseProtocol extends EventEmitter { } this.log('starting') + // Incoming streams + this.registrar.handle(this.multicodecs, this._onIncomingStream) + // register protocol with multicodec and handlers - await this.registrar.register(this.multicodecs, { + this._registrarId = await this.registrar.register(this.multicodecs, { onConnect: this._onPeerConnected, onDisconnect: this._onPeerDisconnected }) @@ -118,7 +125,7 @@ class PubsubBaseProtocol extends EventEmitter { } // unregister protocol and handlers - await this.registrar.unregister(this.multicodecs) + await this.registrar.unregister(this._registrarId) this.log('stopping') this.peers.forEach((peer) => peer.close()) @@ -128,20 +135,41 @@ class PubsubBaseProtocol extends EventEmitter { this.log('stopped') } + /** + * On an incoming stream event. + * @private + * @param {Object} props + * @param {string} props.protocol + * @param {DuplexStream} props.strean + * @param {PeerId} props.remotePeer remote peer-id + */ + async _onIncomingStream ({ protocol, stream, remotePeer }) { + const peerInfo = await PeerInfo.create(remotePeer) + peerInfo.protocols.add(protocol) + + const idB58Str = peerInfo.id.toB58String() + + const peer = this._addPeer(new Peer(peerInfo)) + + peer.attachConnection(stream) + this._processMessages(idB58Str, stream, peer) + } + /** * Registrar notifies a connection successfully with pubsub protocol. * @private * @param {PeerInfo} peerInfo remote peer info * @param {Connection} conn connection to the peer */ - _onPeerConnected (peerInfo, conn) { + async _onPeerConnected (peerInfo, conn) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) const peer = this._addPeer(new Peer(peerInfo)) - peer.attachConnection(conn) + const { stream } = await conn.newStream(this.multicodecs) - this._processMessages(idB58Str, conn, peer) + peer.attachConnection(stream) + this._processMessages(idB58Str, stream, peer) } /** @@ -166,8 +194,8 @@ class PubsubBaseProtocol extends EventEmitter { */ _addPeer (peer) { const id = peer.info.id.toB58String() - let existing = this.peers.get(id) + if (!existing) { this.log('new peer', id) this.peers.set(id, peer) @@ -175,6 +203,7 @@ class PubsubBaseProtocol extends EventEmitter { peer.once('close', () => this._removePeer(peer)) } + ++existing._references return existing } @@ -188,8 +217,14 @@ class PubsubBaseProtocol extends EventEmitter { _removePeer (peer) { const id = peer.info.id.toB58String() - this.log('delete peer', id) - this.peers.delete(id) + this.log('remove', id, peer._references) + + // Only delete when no one else is referencing this peer. + if (--peer._references === 0) { + this.log('delete peer', id) + this.peers.delete(id) + } + return peer } @@ -202,14 +237,14 @@ class PubsubBaseProtocol extends EventEmitter { // If strict signing is on and we have no signature, abort if (this.strictSigning && !message.signature) { this.log('Signing required and no signature was present, dropping message:', message) - return Promise.resolve(false) + return false } // Check the message signature if present if (message.signature) { return verifySignature(message) } else { - return Promise.resolve(true) + return true } } diff --git a/src/peer.js b/src/peer.js index 0939f93..efaa067 100644 --- a/src/peer.js +++ b/src/peer.js @@ -34,6 +34,8 @@ class Peer extends EventEmitter { * @type {Pushable} */ this.stream = null + + this._references = 0 } /** @@ -164,6 +166,9 @@ class Peer extends EventEmitter { * @returns {void} */ close () { + // Force removal of peer + this._references = 1 + // End the pushable if (this.stream) { this.stream.end() diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 1e68d02..00d12a6 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,11 +6,16 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect const sinon = require('sinon') -const DuplexPair = require('it-pair/duplex') const PubsubBaseProtocol = require('../src') const { randomSeqno } = require('../src/utils') -const { createPeerInfo, mockRegistrar, PubsubImplementation } = require('./utils') +const { + createPeerInfo, + createMockRegistrar, + mockRegistrar, + PubsubImplementation, + ConnectionPair +} = require('./utils') describe('pubsub base protocol', () => { describe('should start and stop properly', () => { @@ -20,6 +25,7 @@ describe('pubsub base protocol', () => { beforeEach(async () => { const peerInfo = await createPeerInfo() sinonMockRegistrar = { + handle: sinon.stub(), register: sinon.stub(), unregister: sinon.stub() } @@ -40,6 +46,7 @@ describe('pubsub base protocol', () => { it('should be able to start and stop', async () => { await pubsub.start() + expect(sinonMockRegistrar.handle.calledOnce).to.be.true() expect(sinonMockRegistrar.register.calledOnce).to.be.true() await pubsub.stop() @@ -49,6 +56,7 @@ describe('pubsub base protocol', () => { it('should not throw to start if already started', async () => { await pubsub.start() await pubsub.start() + expect(sinonMockRegistrar.handle.calledOnce).to.be.true() expect(sinonMockRegistrar.register.calledOnce).to.be.true() await pubsub.stop() @@ -131,22 +139,13 @@ describe('pubsub base protocol', () => { const registrarRecordA = {} const registrarRecordB = {} - const registrar = (registrarRecord) => ({ - register: (multicodecs, handlers) => { - registrarRecord[multicodecs[0]] = handlers - }, - unregister: (multicodecs) => { - delete registrarRecord[multicodecs[0]] - } - }) - // mount pubsub beforeEach(async () => { peerInfoA = await createPeerInfo() peerInfoB = await createPeerInfo() - pubsubA = new PubsubImplementation(protocol, peerInfoA, registrar(registrarRecordA)) - pubsubB = new PubsubImplementation(protocol, peerInfoB, registrar(registrarRecordB)) + pubsubA = new PubsubImplementation(protocol, peerInfoA, createMockRegistrar(registrarRecordA)) + pubsubB = new PubsubImplementation(protocol, peerInfoB, createMockRegistrar(registrarRecordB)) }) // start pubsub @@ -169,29 +168,41 @@ describe('pubsub base protocol', () => { ]) }) - it('should handle onConnect as expected', () => { + it('should handle onConnect as expected', async () => { const onConnectA = registrarRecordA[protocol].onConnect - const onConnectB = registrarRecordB[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [c0, c1] = ConnectionPair() + + await onConnectA(peerInfoB, c0) + await handlerB({ + protocol, + stream: c1.stream, + remotePeer: peerInfoA.id + }) expect(pubsubA.peers.size).to.be.eql(1) expect(pubsubB.peers.size).to.be.eql(1) }) - it('should handle onDisconnect as expected', () => { + it('should handle onDisconnect as expected', async () => { const onConnectA = registrarRecordA[protocol].onConnect const onDisconnectA = registrarRecordA[protocol].onDisconnect - const onConnectB = registrarRecordB[protocol].onConnect + const handlerB = registrarRecordB[protocol].handler const onDisconnectB = registrarRecordB[protocol].onDisconnect // Notice peers of connection - const [d0, d1] = DuplexPair() - onConnectA(peerInfoB, d0) - onConnectB(peerInfoA, d1) + const [c0, c1] = ConnectionPair() + + await onConnectA(peerInfoB, c0) + await handlerB({ + protocol, + stream: c1.stream, + remotePeer: peerInfoA.id + }) + + // Notice peers of disconnect onDisconnectA(peerInfoB) onDisconnectB(peerInfoA) diff --git a/test/utils.spec.js b/test/utils.spec.js index 22499ea..d1e00b0 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -1,7 +1,7 @@ /* eslint-env mocha */ 'use strict' -const expect = require('chai').expect +const { expect } = require('chai') const utils = require('../src/utils') diff --git a/test/utils/index.js b/test/utils/index.js index 90cc9f0..3d8a75b 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -2,6 +2,7 @@ const lp = require('it-length-prefixed') const pipe = require('it-pipe') +const DuplexPair = require('it-pair/duplex') const PeerId = require('peer-id') const PeerInfo = require('peer-info') @@ -41,7 +42,7 @@ class PubsubImplementation extends PubsubBaseProtocol { pipe( conn, lp.decode(), - async function collect (source) { + async function (source) { for await (const val of source) { const rpc = message.rpc.RPC.decode(val) @@ -55,10 +56,46 @@ class PubsubImplementation extends PubsubBaseProtocol { exports.PubsubImplementation = PubsubImplementation exports.mockRegistrar = { - register: (multicodec, handlers) => { + handle: () => {}, + register: () => {}, + unregister: () => {} +} + +exports.createMockRegistrar = (registrarRecord) => ({ + handle: (multicodecs, handler) => { + const rec = registrarRecord[multicodecs[0]] || {} + registrarRecord[multicodecs[0]] = { + ...rec, + handler + } }, - unregister: (multicodec) => { + register: (multicodecs, handlers) => { + const rec = registrarRecord[multicodecs[0]] || {} + + registrarRecord[multicodecs[0]] = { + ...rec, + ...handlers + } + return multicodecs[0] + }, + unregister: (id) => { + delete registrarRecord[id] } +}) + +exports.ConnectionPair = () => { + const [d0, d1] = DuplexPair() + + return [ + { + stream: d0, + newStream: () => Promise.resolve({ stream: d0 }) + }, + { + stream: d1, + newStream: () => Promise.resolve({ stream: d1 }) + } + ] } From 56ac7664a58465f5bab1b0442e162680eff0d0a9 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 5 Nov 2019 15:54:46 +0100 Subject: [PATCH 4/6] chore: address review --- README.md | 8 ++++---- src/index.js | 9 ++++++--- test/pubsub.spec.js | 6 +++--- test/utils/index.js | 2 +- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index d017240..bddaa97 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ TODO: add explanation for registrar! const Pubsub = require('libp2p-pubsub') class PubsubImplementation extends Pubsub { - constructor(peerInfo, registrar, options = {}) { + constructor({ peerInfo, registrar, ...options }) super({ debugName: 'libp2p:pubsub', multicodecs: '/pubsub-implementation/1.0.0', @@ -90,7 +90,7 @@ The following specified API should be the base API for a pubsub implementation o ### Start -Start the pubsub subsystem. The protocol will be registered to `libp2p`, which will notify about peers being connected and disconnected with the protocol. +Starts the pubsub subsystem. The protocol will be registered to `libp2p`, which will result in pubsub being notified when peers who support the protocol connect/disconnect to `libp2p`. #### `pubsub.start()` @@ -102,7 +102,7 @@ Start the pubsub subsystem. The protocol will be registered to `libp2p`, which w ### Stop -Stop the pubsub subsystem. The protocol will be unregistered to `libp2p`, which will remove all listeners for the protocol and the streams with other peers will be closed. +Stops the pubsub subsystem. The protocol will be unregistered from `libp2p`, which will remove all listeners for the protocol and the established connections will be closed. #### `pubsub.stop()` @@ -169,7 +169,7 @@ Get the list of topics which the peer is subscribed to. ### Get Peers Subscribed to a topic -Get a list of the peer-ids that are subscribed to one topic. +Get a list of the [PeerId](https://github.com/libp2p/js-peer-id) strings that are subscribed to one topic. #### `pubsub.getPeersSubscribed(topic)` diff --git a/src/index.js b/src/index.js index e73ed2c..789b794 100644 --- a/src/index.js +++ b/src/index.js @@ -106,9 +106,12 @@ class PubsubBaseProtocol extends EventEmitter { this.registrar.handle(this.multicodecs, this._onIncomingStream) // register protocol with multicodec and handlers - this._registrarId = await this.registrar.register(this.multicodecs, { - onConnect: this._onPeerConnected, - onDisconnect: this._onPeerDisconnected + this._registrarId = await this.registrar.register({ + multicodecs: this.multicodecs, + handlers: { + onConnect: this._onPeerConnected, + onDisconnect: this._onPeerDisconnected + } }) this.log('started') diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 00d12a6..b8477dc 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -53,7 +53,7 @@ describe('pubsub base protocol', () => { expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() }) - it('should not throw to start if already started', async () => { + it('starting should not throw if already started', async () => { await pubsub.start() await pubsub.start() expect(sinonMockRegistrar.handle.calledOnce).to.be.true() @@ -63,14 +63,14 @@ describe('pubsub base protocol', () => { expect(sinonMockRegistrar.unregister.calledOnce).to.be.true() }) - it('should not throw if stop before start', async () => { + it('stopping should not throw if not started', async () => { await pubsub.stop() expect(sinonMockRegistrar.register.calledOnce).to.be.false() expect(sinonMockRegistrar.unregister.calledOnce).to.be.false() }) }) - describe('should handle messages creating and signing', () => { + describe('should handle message creation and signing', () => { let peerInfo let pubsub diff --git a/test/utils/index.js b/test/utils/index.js index 3d8a75b..3eafa05 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -70,7 +70,7 @@ exports.createMockRegistrar = (registrarRecord) => ({ handler } }, - register: (multicodecs, handlers) => { + register: ({ multicodecs, handlers }) => { const rec = registrarRecord[multicodecs[0]] || {} registrarRecord[multicodecs[0]] = { From 4f94cd2e5795b4e1b8fa0b7b3a83e192e66000ed Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 7 Nov 2019 10:06:47 +0100 Subject: [PATCH 5/6] chore: add topology interface --- package.json | 1 + src/index.js | 6 ++++-- test/utils/index.js | 5 +++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 78ceb4c..32fdccc 100644 --- a/package.json +++ b/package.json @@ -60,6 +60,7 @@ "it-pipe": "^1.0.1", "it-pushable": "^1.3.2", "libp2p-crypto": "~0.17.0", + "libp2p-interfaces": "~0.1.4", "protons": "^1.0.1", "sinon": "^7.5.0" }, diff --git a/src/index.js b/src/index.js index 789b794..27b386a 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ const EventEmitter = require('events') const errcode = require('err-code') const PeerInfo = require('peer-info') +const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology') const message = require('./message') const Peer = require('./peer') @@ -105,14 +106,15 @@ class PubsubBaseProtocol extends EventEmitter { // Incoming streams this.registrar.handle(this.multicodecs, this._onIncomingStream) - // register protocol with multicodec and handlers - this._registrarId = await this.registrar.register({ + // register protocol with topology + const topology = new MulticodecTopology({ multicodecs: this.multicodecs, handlers: { onConnect: this._onPeerConnected, onDisconnect: this._onPeerDisconnected } }) + this._registrarId = await this.registrar.register(topology) this.log('started') this.started = true diff --git a/test/utils/index.js b/test/utils/index.js index 3eafa05..cc41275 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -70,12 +70,13 @@ exports.createMockRegistrar = (registrarRecord) => ({ handler } }, - register: ({ multicodecs, handlers }) => { + register: ({ multicodecs, _onConnect, _onDisconnect }) => { const rec = registrarRecord[multicodecs[0]] || {} registrarRecord[multicodecs[0]] = { ...rec, - ...handlers + onConnect: _onConnect, + onDisconnect: _onDisconnect } return multicodecs[0] From f3cd195d1a2074311d8b20ae487caf86fd1c0778 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 14 Nov 2019 14:29:23 +0100 Subject: [PATCH 6/6] chore: tests for getPeersSubscribed --- test/pubsub.spec.js | 67 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index b8477dc..3a86884 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -8,6 +8,7 @@ const expect = chai.expect const sinon = require('sinon') const PubsubBaseProtocol = require('../src') +const Peer = require('../src/peer') const { randomSeqno } = require('../src/utils') const { createPeerInfo, @@ -210,4 +211,70 @@ describe('pubsub base protocol', () => { expect(pubsubB.peers.size).to.be.eql(0) }) }) + + describe('getPeersSubscribed', () => { + let peerInfo + let pubsub + + beforeEach(async () => { + peerInfo = await createPeerInfo() + pubsub = new PubsubBaseProtocol({ + debugName: 'pubsub', + multicodecs: '/pubsub/1.0.0', + peerInfo: peerInfo, + registrar: mockRegistrar + }) + }) + + afterEach(() => pubsub.stop()) + + it('should fail if pubsub is not started', () => { + const topic = 'topic-test' + + try { + pubsub.getPeersSubscribed(topic) + } catch (err) { + expect(err).to.exist() + expect(err.code).to.eql('ERR_NOT_STARTED_YET') + return + } + throw new Error('should fail if pubsub is not started') + }) + + it('should fail if no topic is provided', async () => { + // start pubsub + await pubsub.start() + + try { + pubsub.getPeersSubscribed() + } catch (err) { + expect(err).to.exist() + expect(err.code).to.eql('ERR_NOT_VALID_TOPIC') + return + } + throw new Error('should fail if no topic is provided') + }) + + it('should get peer subscribed to one topic', async () => { + const topic = 'topic-test' + + // start pubsub + await pubsub.start() + + let peersSubscribed = pubsub.getPeersSubscribed(topic) + expect(peersSubscribed).to.be.empty() + + // Set mock peer subscribed + const peer = new Peer(peerInfo) + const id = peer.info.id.toB58String() + + peer.topics.add(topic) + pubsub.peers.set(id, peer) + + peersSubscribed = pubsub.getPeersSubscribed(topic) + + expect(peersSubscribed).to.not.be.empty() + expect(peersSubscribed[0]).to.eql(id) + }) + }) })