From 123c1ad57e29aa0ab07a40636cc4e9599f8baad6 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Fri, 30 Aug 2019 12:21:25 +0200 Subject: [PATCH] refactor: async BREAKING CHANGE: Switch to using async/await and async iterators. --- .aegir.js | 78 +++++----- README.md | 9 +- package.json | 24 ++-- src/index.js | 122 +++++++--------- src/message/sign.js | 81 ++++++----- src/peer.js | 16 +-- src/utils.js | 2 +- test/pubsub.spec.js | 309 ++++++++++++++++++---------------------- test/sign.spec.js | 97 +++++++------ test/utils.spec.js | 8 +- test/utils/constants.js | 25 ++-- test/utils/index.js | 26 ++-- 12 files changed, 370 insertions(+), 427 deletions(-) diff --git a/.aegir.js b/.aegir.js index 3a7b02d..6934a62 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,7 +1,6 @@ 'use strict' const pull = require('pull-stream') -const parallel = require('async/parallel') const WebSocketStarRendezvous = require('libp2p-websocket-star-rendezvous') const Node = require('./test/utils/nodejs-bundle.js') @@ -13,54 +12,47 @@ const { let wsRendezvous let node -const before = (done) => { - parallel([ - (cb) => { - WebSocketStarRendezvous.start({ - port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, - refreshPeerListIntervalMS: 1000, - strictMultiaddr: false, - cryptoChallenge: true - }, (err, _server) => { - if (err) { - return cb(err) - } - wsRendezvous = _server - cb() - }) - }, - (cb) => { - getPeerRelay((err, peerInfo) => { - if (err) { - return done(err) - } - - node = new Node({ - peerInfo, - config: { - relay: { +const before = async () => { + [wsRendezvous, node] = await Promise.all([ + WebSocketStarRendezvous.start({ + port: WS_RENDEZVOUS_MULTIADDR.nodeAddress().port, + refreshPeerListIntervalMS: 1000, + strictMultiaddr: false, + cryptoChallenge: true + }), + new Promise(async (resolve, reject) => { + const peerInfo = await getPeerRelay() + const n = new Node({ + peerInfo, + config: { + relay: { + enabled: true, + hop: { enabled: true, - hop: { - enabled: true, - active: true - } + active: true } } - }) - - node.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) - node.start(cb) + } }) - } - ], done) + + n.handle('/echo/1.0.0', (_, conn) => pull(conn, conn)) + await n.start() + + resolve(n) + }) + ]) } -const after = (done) => { - setTimeout(() => - parallel( - [node, wsRendezvous].map((s) => (cb) => s.stop(cb)), - done), - 2000) +const after = () => { + return new Promise((resolve) => { + setTimeout(async () => { + await Promise.all([ + node.stop(), + wsRendezvous.stop() + ]) + resolve() + }, 2000) + }) } module.exports = { diff --git a/README.md b/README.md index 170bd37..047d0e3 100644 --- a/README.md +++ b/README.md @@ -72,14 +72,19 @@ class PubsubImplementation extends Pubsub { Validates the signature of a message. -#### `pubsub.validate(message, callback)` +#### `pubsub.validate(message)` ##### Parameters | Name | Type | Description | |------|------|-------------| | message | `Message` | a pubsub message | -| callback | `function(Error, Boolean)` | calls back with true if the message is valid | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | resolves to true if the message is valid | ## Implementations using this base protocol diff --git a/package.json b/package.json index a4df62a..5b1a5d3 100644 --- a/package.json +++ b/package.json @@ -45,34 +45,34 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-pubsub#readme", "devDependencies": { - "aegir": "^18.2.1", + "aegir": "^20.3.1", "benchmark": "^2.1.4", "chai": "^4.2.0", "chai-spies": "^1.0.0", "dirty-chai": "^2.0.1", - "libp2p": "~0.24.4", + "libp2p": "~0.26.2", "libp2p-secio": "~0.11.1", "libp2p-spdy": "~0.13.3", - "libp2p-tcp": "~0.13.0", + "libp2p-tcp": "~0.13.1", "libp2p-websocket-star": "~0.10.2", - "libp2p-websocket-star-rendezvous": "~0.3.0", - "lodash": "^4.17.11", - "multiaddr": "^6.0.6", - "peer-id": "~0.12.5", + "libp2p-websocket-star-rendezvous": "~0.4.1", + "lodash": "^4.17.15", + "multiaddr": "^6.1.0", + "peer-id": "~0.12.2", "peer-info": "~0.15.1" }, "dependencies": { "async": "^2.6.2", "bs58": "^4.0.1", "debug": "^4.1.1", - "err-code": "^1.1.2", + "err-code": "^2.0.0", "length-prefixed-stream": "^2.0.0", - "libp2p-crypto": "~0.16.1", + "libp2p-crypto": "~0.17.0", "protons": "^1.0.1", - "pull-length-prefixed": "^1.3.1", + "pull-length-prefixed": "^1.3.3", "pull-pushable": "^2.2.0", - "pull-stream": "^3.6.9", - "sinon": "^7.3.2", + "pull-stream": "^3.6.14", + "sinon": "^7.5.0", "time-cache": "~0.3.0" }, "contributors": [ diff --git a/src/index.js b/src/index.js index f5b1300..753f9d1 100644 --- a/src/index.js +++ b/src/index.js @@ -3,7 +3,6 @@ const EventEmitter = require('events') const pull = require('pull-stream/pull') const empty = require('pull-stream/sources/empty') -const asyncEach = require('async/each') const TimeCache = require('time-cache') const debug = require('debug') const errcode = require('err-code') @@ -16,8 +15,6 @@ const { } = require('./message/sign') const utils = require('./utils') -const nextTick = require('async/nextTick') - /** * PubsubBaseProtocol handles the peers and connections logic for pubsub routers */ @@ -136,47 +133,49 @@ class PubsubBaseProtocol extends EventEmitter { * Dial a received peer. * @private * @param {PeerInfo} peerInfo peer info - * @param {function} callback - * @returns {void} + * @returns {Promise} */ - _dialPeer (peerInfo, callback) { - callback = callback || function noop () { } + _dialPeer (peerInfo) { const idB58Str = peerInfo.id.toB58String() // If already have a PubSub conn, ignore const peer = this.peers.get(idB58Str) if (peer && peer.isConnected) { - return nextTick(() => callback()) + return Promise.resolve() } // If already dialing this peer, ignore if (this._dials.has(idB58Str)) { this.log('already dialing %s, ignoring dial attempt', idB58Str) - return nextTick(() => callback()) + return Promise.resolve() } this._dials.add(idB58Str) this.log('dialing %s', idB58Str) - this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { - this.log('dial to %s complete', idB58Str) - // If the dial is not in the set, it means that pubsub has been - // stopped - const pubsubStopped = !this._dials.has(idB58Str) - this._dials.delete(idB58Str) + return new Promise((resolve) => { + this.libp2p.dialProtocol(peerInfo, this.multicodec, (err, conn) => { + this.log('dial to %s complete', idB58Str) - if (err) { - this.log.err(err) - return callback() - } + // If the dial is not in the set, it means that pubsub has been + // stopped + const pubsubStopped = !this._dials.has(idB58Str) + this._dials.delete(idB58Str) - // pubsub has been stopped, so we should just bail out - if (pubsubStopped) { - this.log('pubsub was stopped, not processing dial to %s', idB58Str) - return callback() - } + if (err) { + this.log.err(err) + return resolve() + } + + // pubsub has been stopped, so we should just bail out + if (pubsubStopped) { + this.log('pubsub was stopped, not processing dial to %s', idB58Str) + return resolve() + } - this._onDial(peerInfo, conn, callback) + this._onDial(peerInfo, conn) + resolve() + }) }) } @@ -185,16 +184,13 @@ class PubsubBaseProtocol extends EventEmitter { * @private * @param {PeerInfo} peerInfo peer info * @param {Connection} conn connection to the peer - * @param {function} callback */ - _onDial (peerInfo, conn, callback) { + _onDial (peerInfo, conn) { const idB58Str = peerInfo.id.toB58String() this.log('connected', idB58Str) const peer = this._addPeer(new Peer(peerInfo)) peer.attachConnection(conn) - - nextTick(() => callback()) } /** @@ -252,14 +248,14 @@ class PubsubBaseProtocol extends EventEmitter { * Normalizes the message and signs it, if signing is enabled * * @param {Message} message - * @param {function(Error, Message)} callback + * @returns {Message} */ - _buildMessage (message, callback) { + _buildMessage (message) { const msg = utils.normalizeOutRpcMessage(message) if (this.peerId) { - signMessage(this.peerId, msg, callback) + return signMessage(this.peerId, msg) } else { - nextTick(callback, null, msg) + return message } } @@ -269,11 +265,10 @@ class PubsubBaseProtocol extends EventEmitter { * @abstract * @param {Array|string} topics * @param {Array|any} messages - * @param {function(Error)} callback * @returns {undefined} * */ - publish (topics, messages, callback) { + publish (topics, messages) { throw errcode('publish must be implemented by the subclass', 'ERR_NOT_IMPLEMENTED') } @@ -302,14 +297,11 @@ class PubsubBaseProtocol extends EventEmitter { /** * Mounts the pubsub protocol onto the libp2p node and sends our * subscriptions to every peer conneceted - * - * @param {Function} callback - * @returns {undefined} - * + * @returns {Promise} */ - start (callback) { + async start () { if (this.started) { - return nextTick(() => callback(new Error('already started'))) + throw errcode(new Error('already started'), 'ERR_ALREADY_STARTED') } this.log('starting') @@ -321,25 +313,19 @@ class PubsubBaseProtocol extends EventEmitter { // Dial already connected peers const peerInfos = Object.values(this.libp2p.peerBook.getAll()) - asyncEach(peerInfos, (peer, cb) => this._dialPeer(peer, cb), (err) => { - nextTick(() => { - this.log('started') - this.started = true - callback(err) - }) - }) + await Promise.all(peerInfos.map((peer) => this._dialPeer(peer))) + + this.log('started') + this.started = true } /** * Unmounts the pubsub protocol and shuts down every connection - * - * @param {Function} callback - * @returns {undefined} - * + * @returns {void} */ - stop (callback) { + stop () { if (!this.started) { - return nextTick(() => callback(new Error('not started yet'))) + throw errcode(new Error('not started yet'), 'ERR_NOT_STARTED_YET') } this.libp2p.unhandle(this.multicodec) @@ -349,40 +335,30 @@ class PubsubBaseProtocol extends EventEmitter { this._dials = new Set() this.log('stopping') - asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { - if (err) { - return callback(err) - } + this.peers.forEach((peer) => peer.close()) - this.log('stopped') - this.peers = new Map() - this.started = false - callback() - }) + this.log('stopped') + this.peers = new Map() + this.started = false } /** * Validates the given message. The signature will be checked for authenticity. * @param {rpc.RPC.Message} message - * @param {function(Error, Boolean)} callback - * @returns {void} + * @returns {Promise} */ - validate (message, callback) { + async validate (message) { // eslint-disable-line require-await // If strict signing is on and we have no signature, abort if (this.strictSigning && !message.signature) { this.log('Signing required and no signature was present, dropping message:', message) - return nextTick(callback, null, false) + return Promise.resolve(false) } // Check the message signature if present if (message.signature) { - verifySignature(message, (err, valid) => { - if (err) return callback(err) - callback(null, valid) - }) + return verifySignature(message) } else { - // The message is valid - nextTick(callback, null, true) + return Promise.resolve(true) } } } diff --git a/src/message/sign.js b/src/message/sign.js index 1419bcc..7e7e70b 100644 --- a/src/message/sign.js +++ b/src/message/sign.js @@ -9,24 +9,25 @@ const SignPrefix = Buffer.from('libp2p-pubsub:') * * @param {PeerId} peerId * @param {Message} message - * @param {function(Error, Message)} callback - * @returns {void} + * @returns {Promise} */ -function signMessage (peerId, message, callback) { +function signMessage (peerId, message) { // Get the message in bytes, and prepend with the pubsub prefix const bytes = Buffer.concat([ SignPrefix, Message.encode(message) ]) - // Sign the bytes with the private key - peerId.privKey.sign(bytes, (err, signature) => { - if (err) return callback(err) + return new Promise((resolve, reject) => { + // Sign the bytes with the private key + peerId.privKey.sign(bytes, (err, signature) => { + if (err) return reject(err) - callback(null, { - ...message, - signature: signature, - key: peerId.pubKey.bytes + resolve({ + ...message, + signature: signature, + key: peerId.pubKey.bytes + }) }) }) } @@ -34,11 +35,11 @@ function signMessage (peerId, message, callback) { /** * Verifies the signature of the given message * @param {rpc.RPC.Message} message - * @param {function(Error, Boolean)} callback + * @returns {Promise} */ -function verifySignature (message, callback) { +async function verifySignature (message) { // Get message sans the signature - let baseMessage = { ...message } + const baseMessage = { ...message } delete baseMessage.signature delete baseMessage.key const bytes = Buffer.concat([ @@ -47,10 +48,16 @@ function verifySignature (message, callback) { ]) // Get the public key - messagePublicKey(message, (err, pubKey) => { - if (err) return callback(err, false) - // Verify the base message - pubKey.verify(bytes, message.signature, callback) + const pubKey = await messagePublicKey(message) + + // Verify the base message + return new Promise((resolve, reject) => { + pubKey.verify(bytes, message.signature, (err, res) => { + if (err) { + return reject(err) + } + resolve(res) + }) }) } @@ -59,28 +66,28 @@ function verifySignature (message, callback) { * If no, valid PublicKey can be retrieved an error will be returned. * * @param {Message} message - * @param {function(Error, PublicKey)} callback - * @returns {void} + * @returns {Promise} */ -function messagePublicKey (message, callback) { - if (message.key) { - PeerId.createFromPubKey(message.key, (err, peerId) => { - if (err) return callback(err, null) - // the key belongs to the sender, return the key - if (peerId.isEqual(message.from)) return callback(null, peerId.pubKey) - // We couldn't validate pubkey is from the originator, error - callback(new Error('Public Key does not match the originator')) - }) - return - } else { - // should be available in the from property of the message (peer id) - const from = PeerId.createFromBytes(message.from) - if (from.pubKey) { - return callback(null, from.pubKey) +function messagePublicKey (message) { + return new Promise((resolve, reject) => { + if (message.key) { + PeerId.createFromPubKey(message.key, (err, peerId) => { + if (err) return reject(err) + // the key belongs to the sender, return the key + if (peerId.isEqual(message.from)) return resolve(peerId.pubKey) + // We couldn't validate pubkey is from the originator, error + return reject(new Error('Public Key does not match the originator')) + }) + } else { + // should be available in the from property of the message (peer id) + const from = PeerId.createFromBytes(message.from) + if (from.pubKey) { + return resolve(from.pubKey) + } else { + reject(new Error('Could not get the public key from the originator id')) + } } - } - - callback(new Error('Could not get the public key from the originator id')) + }) } module.exports = { diff --git a/src/peer.js b/src/peer.js index 7179b4e..c1bb4ab 100644 --- a/src/peer.js +++ b/src/peer.js @@ -3,7 +3,6 @@ const lp = require('pull-length-prefixed') const Pushable = require('pull-pushable') const pull = require('pull-stream') -const setImmediate = require('async/setImmediate') const EventEmitter = require('events') const { RPC } = require('./message') @@ -162,11 +161,9 @@ class Peer extends EventEmitter { /** * Closes the open connection to peer - * - * @param {Function} callback - * @returns {undefined} + * @returns {void} */ - close (callback) { + close () { // Force removal of peer this._references = 1 @@ -175,12 +172,9 @@ class Peer extends EventEmitter { this.stream.end() } - setImmediate(() => { - this.conn = null - this.stream = null - this.emit('close') - callback() - }) + this.conn = null + this.stream = null + this.emit('close') } } diff --git a/src/utils.js b/src/utils.js index a2d6767..d9c6033 100644 --- a/src/utils.js +++ b/src/utils.js @@ -44,7 +44,7 @@ exports.anyMatch = (a, b) => { bHas = (val) => b.has(val) } - for (let val of a) { + for (const val of a) { if (bHas(val)) { return true } diff --git a/test/pubsub.spec.js b/test/pubsub.spec.js index 11f10bc..44a2bd0 100644 --- a/test/pubsub.spec.js +++ b/test/pubsub.spec.js @@ -6,8 +6,6 @@ chai.use(require('dirty-chai')) chai.use(require('chai-spies')) const expect = chai.expect const sinon = require('sinon') -const series = require('async/series') -const parallel = require('async/parallel') const PubsubBaseProtocol = require('../src') const { randomSeqno } = require('../src/utils') @@ -47,57 +45,53 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) { - return done(err) - } - nodeA = nodes[0] - nodeB = nodes[1] - done() - }) + before(async () => { + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) }) - before('mount the pubsub protocol', (done) => { + before('mount the pubsub protocol', () => { psA = new PubsubImplementation(nodeA) psB = new PubsubImplementation(nodeB) - setTimeout(() => { - expect(psA.peers.size).to.be.eql(0) - expect(psB.peers.size).to.be.eql(0) - done() - }, 50) + return new Promise((resolve) => { + setTimeout(() => { + expect(psA.peers.size).to.be.eql(0) + expect(psB.peers.size).to.be.eql(0) + resolve() + }, 50) + }) }) - before('start both Pubsub', (done) => { - parallel([ - (cb) => psA.start(cb), - (cb) => psB.start(cb) - ], done) + before('start both Pubsub', () => { + return Promise.all([ + psA.start(), + psB.start() + ]) }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) + after(() => { + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('Dial from nodeA to nodeB', (done) => { - series([ - (cb) => nodeA.dial(nodeB.peerInfo, cb), - (cb) => setTimeout(() => { + it('Dial from nodeA to nodeB', async () => { + await nodeA.dial(nodeB.peerInfo) + + return new Promise((resolve) => { + setTimeout(() => { expect(psA.peers.size).to.equal(1) expect(psB.peers.size).to.equal(1) - cb() + resolve() }, 1000) - ], done) + }) }) - it('_buildMessage normalizes and signs messages', (done) => { + it('_buildMessage normalizes and signs messages', async () => { const message = { from: psA.peerId.id, data: 'hello', @@ -105,17 +99,13 @@ describe('pubsub base protocol', () => { topicIDs: ['test-topic'] } - psA._buildMessage(message, (err, signedMessage) => { - expect(err).to.not.exist() + const signedMessage = await psA._buildMessage(message) + const verified = await psA.validate(signedMessage) - psA.validate(signedMessage, (err, verified) => { - expect(verified).to.eql(true) - done(err) - }) - }) + expect(verified).to.eql(true) }) - it('validate with strict signing off will validate a present signature', (done) => { + it('validate with strict signing off will validate a present signature', async () => { const message = { from: psA.peerId.id, data: 'hello', @@ -125,17 +115,13 @@ describe('pubsub base protocol', () => { sinon.stub(psA, 'strictSigning').value(false) - psA._buildMessage(message, (err, signedMessage) => { - expect(err).to.not.exist() + const signedMessage = await psA._buildMessage(message) + const verified = await psA.validate(signedMessage) - psA.validate(signedMessage, (err, verified) => { - expect(verified).to.eql(true) - done(err) - }) - }) + expect(verified).to.eql(true) }) - it('validate with strict signing requires a signature', (done) => { + it('validate with strict signing requires a signature', async () => { const message = { from: psA.peerId.id, data: 'hello', @@ -143,10 +129,9 @@ describe('pubsub base protocol', () => { topicIDs: ['test-topic'] } - psA.validate(message, (err, verified) => { - expect(verified).to.eql(false) - done(err) - }) + const verified = await psA.validate(message) + + expect(verified).to.eql(false) }) }) @@ -156,45 +141,39 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (cb, nodes) => { - nodeA = nodes[0] - nodeB = nodes[1] - nodeA.dial(nodeB.peerInfo, () => setTimeout(done, 1000)) - }) + before(async () => { + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) + + await nodeA.dial(nodeB.peerInfo) + await new Promise((resolve) => setTimeout(resolve, 1000)) }) - after((done) => { - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], done) + after(() => { + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('dial on pubsub on mount', (done) => { + it('dial on pubsub on mount', async () => { psA = new PubsubImplementation(nodeA) psB = new PubsubImplementation(nodeB) - parallel([ - (cb) => psA.start(cb), - (cb) => psB.start(cb) - ], next) + await Promise.all([ + psA.start(), + psB.start() + ]) - function next () { - expect(psA.peers.size).to.equal(1) - expect(psB.peers.size).to.equal(1) - done() - } + expect(psA.peers.size).to.equal(1) + expect(psB.peers.size).to.equal(1) }) - it('stop both pubsubs', (done) => { - parallel([ - (cb) => psA.stop(cb), - (cb) => psB.stop(cb) - ], done) + it('stop both pubsubs', () => { + psA.stop() + psB.stop() }) }) @@ -205,57 +184,50 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] + before(async () => { + // sandbox = chai.spy.sandbox() + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + sandbox = chai.spy.sandbox() - psB.start(done) - }) + return psB.start() }) - after((done) => { + after(() => { sandbox.restore() - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('does not dial twice to same peer', (done) => { + it('does not dial twice to same peer', async () => { sandbox.on(psA, ['_onDial']) // When node A starts, it will dial all peers in its peer book, which // is just peer B - psA.start(startComplete) + await psA.start() // Simulate a connection coming in from peer B at the same time. This // causes pubsub to dial peer B nodeA.emit('peer:connect', nodeB.peerInfo) - function startComplete () { + return new Promise((resolve) => { // Check that only one dial was made setTimeout(() => { expect(psA._onDial).to.have.been.called.once() - done() + resolve() }, 1000) - } + }) }) }) @@ -266,40 +238,33 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - sandbox = chai.spy.sandbox() - - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] + before(async () => { + // sandbox = chai.spy.sandbox() + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) + // Put node B in node A's peer book + nodeA.peerBook.put(nodeB.peerInfo) - // Put node B in node A's peer book - nodeA.peerBook.put(nodeB.peerInfo) + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + sandbox = chai.spy.sandbox() - psB.start(done) - }) + return psB.start() }) - after((done) => { + after(() => { sandbox.restore() - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('can dial again after error', (done) => { + it('can dial again after error', async () => { let firstTime = true const dialProtocol = psA.libp2p.dialProtocol.bind(psA.libp2p) sandbox.on(psA.libp2p, 'dialProtocol', (peerInfo, multicodec, cb) => { @@ -315,19 +280,19 @@ describe('pubsub base protocol', () => { // When node A starts, it will dial all peers in its peer book, which // is just peer B - psA.start(startComplete) + await psA.start() - function startComplete () { - // Simulate a connection coming in from peer B. This causes pubsub - // to dial peer B - nodeA.emit('peer:connect', nodeB.peerInfo) + // Simulate a connection coming in from peer B. This causes pubsub + // to dial peer B + nodeA.emit('peer:connect', nodeB.peerInfo) + return new Promise((resolve) => { // Check that both dials were made setTimeout(() => { expect(psA.libp2p.dialProtocol).to.have.been.called.twice() - done() + resolve() }, 1000) - } + }) }) }) @@ -338,40 +303,34 @@ describe('pubsub base protocol', () => { let psA let psB - before((done) => { - sandbox = chai.spy.sandbox() + before(async () => { + // sandbox = chai.spy.sandbox() + [nodeA, nodeB] = await Promise.all([ + createNode(), + createNode() + ]) - series([ - (cb) => createNode(cb), - (cb) => createNode(cb) - ], (err, nodes) => { - if (err) return done(err) - - nodeA = nodes[0] - nodeB = nodes[1] + psA = new PubsubImplementation(nodeA) + psB = new PubsubImplementation(nodeB) - psA = new PubsubImplementation(nodeA) - psB = new PubsubImplementation(nodeB) + sandbox = chai.spy.sandbox() - parallel([ - (cb) => psA.start(cb), - (cb) => psB.start(cb) - ], done) - }) + return Promise.all([ + psA.start(), + psB.start() + ]) }) - after((done) => { + after(() => { sandbox.restore() - parallel([ - (cb) => nodeA.stop(cb), - (cb) => nodeB.stop(cb) - ], (ignoreErr) => { - done() - }) + return Promise.all([ + nodeA.stop(), + nodeB.stop() + ]) }) - it('does not process dial after stop', (done) => { + it('does not process dial after stop', () => { sandbox.on(psA, ['_onDial']) // Simulate a connection coming in from peer B at the same time. This @@ -379,11 +338,13 @@ describe('pubsub base protocol', () => { nodeA.emit('peer:connect', nodeB.peerInfo) // Stop pubsub before the dial can complete - psA.stop(() => { + psA.stop() + + return new Promise((resolve) => { // Check that the dial was not processed setTimeout(() => { expect(psA._onDial).to.not.have.been.called() - done() + resolve() }, 1000) }) }) diff --git a/test/sign.spec.js b/test/sign.spec.js index 9a65dc5..f9cadf1 100644 --- a/test/sign.spec.js +++ b/test/sign.spec.js @@ -17,16 +17,20 @@ const { randomSeqno } = require('../src/utils') describe('message signing', () => { let peerId - before((done) => { - peerId = PeerId.create({ - bits: 1024 - }, (err, id) => { - peerId = id - done(err) + before(async () => { + peerId = await new Promise((resolve, reject) => { + peerId = PeerId.create({ + bits: 1024 + }, (err, id) => { + if (err) { + reject(err) + } + resolve(id) + }) }) }) - it('should be able to sign and verify a message', (done) => { + it('should be able to sign and verify a message', () => { const message = { from: peerId.id, data: 'hello', @@ -36,63 +40,59 @@ describe('message signing', () => { const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { - if (err) return done(err) + return new Promise((resolve, reject) => { + peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { + if (err) return reject(err) - signMessage(peerId, message, (err, signedMessage) => { - if (err) return done(err) + const signedMessage = await signMessage(peerId, message) // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) expect(signedMessage.key).to.eql(peerId.pubKey.bytes) // Verify the signature - verifySignature(signedMessage, (err, verified) => { - expect(err).to.not.exist() - expect(verified).to.eql(true) - done(err) - }) + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + + resolve() }) }) }) - it('should be able to extract the public key from an inlined key', (done) => { - const testSecp256k1 = (peerId) => { - const message = { - from: peerId.id, - data: 'hello', - seqno: randomSeqno(), - topicIDs: ['test-topic'] - } + it('should be able to extract the public key from an inlined key', () => { + return new Promise((resolve, reject) => { + PeerId.create({ keyType: 'secp256k1', bits: 256 }, (err, secPeerId) => { + if (err) return reject(err) - const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { - if (err) return done(err) + const message = { + from: secPeerId.id, + data: 'hello', + seqno: randomSeqno(), + topicIDs: ['test-topic'] + } - signMessage(peerId, message, (err, signedMessage) => { - if (err) return done(err) + const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) + + secPeerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { + if (err) return reject(err) + + const signedMessage = await signMessage(secPeerId, message) // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) signedMessage.key = undefined // Verify the signature - verifySignature(signedMessage, (err, verified) => { - expect(err).to.not.exist() - expect(verified).to.eql(true) - done(err) - }) + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + + resolve() }) }) - } - - PeerId.create({ keyType: 'secp256k1', bits: 256 }, (err, peerId) => { - expect(err).to.not.exist() - testSecp256k1(peerId) }) }) - it('should be able to extract the public key from the message', (done) => { + it('should be able to extract the public key from the message', () => { const message = { from: peerId.id, data: 'hello', @@ -102,22 +102,21 @@ describe('message signing', () => { const bytesToSign = Buffer.concat([SignPrefix, Message.encode(message)]) - peerId.privKey.sign(bytesToSign, (err, expectedSignature) => { - if (err) return done(err) + return new Promise((resolve, reject) => { + peerId.privKey.sign(bytesToSign, async (err, expectedSignature) => { + if (err) return reject(err) - signMessage(peerId, message, (err, signedMessage) => { - if (err) return done(err) + const signedMessage = await signMessage(peerId, message) // Check the signature and public key expect(signedMessage.signature).to.eql(expectedSignature) expect(signedMessage.key).to.eql(peerId.pubKey.bytes) // Verify the signature - verifySignature(signedMessage, (err, verified) => { - expect(err).to.not.exist() - expect(verified).to.eql(true) - done(err) - }) + const verified = await verifySignature(signedMessage) + expect(verified).to.eql(true) + + resolve() }) }) }) diff --git a/test/utils.spec.js b/test/utils.spec.js index d573f06..22499ea 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -49,8 +49,8 @@ describe('utils', () => { }) it('converts an IN msg.from to b58', () => { - let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') - let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') + const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' const m = [ { from: binaryId }, { from: stringId } @@ -63,8 +63,8 @@ describe('utils', () => { }) it('converts an OUT msg.from to binary', () => { - let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') - let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') + const stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' const m = [ { from: binaryId }, { from: stringId } diff --git a/test/utils/constants.js b/test/utils/constants.js index a836a03..c780b69 100644 --- a/test/utils/constants.js +++ b/test/utils/constants.js @@ -2,7 +2,6 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') -const nextTick = require('async/nextTick') const peerJSON = require('../fixtures/test-peer') const multiaddr = require('multiaddr') @@ -18,21 +17,23 @@ let peerRelay = null * is necessary for running a TCP node during browser tests. * @private * @param {function(error, PeerInfo)} callback - * @returns {void} + * @returns {Promise} */ -module.exports.getPeerRelay = (callback) => { - if (peerRelay) return nextTick(callback, null, peerRelay) +module.exports.getPeerRelay = () => { + if (peerRelay) return peerRelay - PeerId.createFromJSON(peerJSON, (err, peerId) => { - if (err) { - return callback(err) - } - peerRelay = new PeerInfo(peerId) + return new Promise((resolve, reject) => { + PeerId.createFromJSON(peerJSON, (err, peerId) => { + if (err) { + return reject(err) + } + peerRelay = new PeerInfo(peerId) - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') - peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') + peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9200/ws') + peerRelay.multiaddrs.add('/ip4/127.0.0.1/tcp/9245') - callback(null, peerRelay) + resolve(peerRelay) + }) }) } diff --git a/test/utils/index.js b/test/utils/index.js index e3cc5ef..3baadda 100644 --- a/test/utils/index.js +++ b/test/utils/index.js @@ -6,13 +6,21 @@ const Node = require('./nodejs-bundle') const waterfall = require('async/waterfall') -exports.createNode = (callback) => { - waterfall([ - (cb) => PeerId.create({ bits: 1024 }, cb), - (id, cb) => PeerInfo.create(id, cb), - (peerInfo, cb) => { - cb(null, new Node({ peerInfo })) - }, - (node, cb) => node.start((err) => cb(err, node)) - ], callback) +exports.createNode = () => { + return new Promise((resolve, reject) => { + waterfall([ + (cb) => PeerId.create({ bits: 1024 }, cb), + (id, cb) => PeerInfo.create(id, cb), + (peerInfo, cb) => { + cb(null, new Node({ peerInfo })) + }, + (node, cb) => node.start((err) => cb(err, node)) + ], (err, node) => { + if (err) { + return reject(err) + } + + resolve(node) + }) + }) }