diff --git a/.travis.yml b/.travis.yml index 304e7d8b..f929fb78 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ os: - linux - osx -script: npx nyc -s npm run test:node -- --bail +script: npm i pull-length-prefixed@dignifiedquire/pull-length-prefixed#fix/empty-streams && npx nyc -s npm run test:node -- --bail after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov jobs: diff --git a/package.json b/package.json index 7ac53e4b..28c9ae08 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ "form-data": "^2.3.3", "go-ipfs-dep": "~0.4.19", "hat": "0.0.3", - "ipfs": "~0.35.0", + "ipfs": "ipfs/js-ipfs#feat/pull-mplex", "ipfs-http-client": "^30.1.1", "ipfs-repo": "~0.26.1", "ipfs-unixfs": "~0.1.16", @@ -59,6 +59,7 @@ "left-pad": "^1.3.0", "libp2p-websocket-star-rendezvous": "~0.3.0", "lodash": "^4.17.11", + "merge-options": "^1.0.1", "mocha": "^5.2.0", "ncp": "^2.0.0", "pretty-bytes": "^5.1.0", diff --git a/test/pubsub.js b/test/pubsub.js index 58049d4a..3e627a37 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -1,3 +1,4 @@ +/* eslint max-nested-callbacks: ["error", 6] */ /* eslint-env mocha */ 'use strict' @@ -9,45 +10,11 @@ chai.use(dirtyChai) const series = require('async/series') const parallel = require('async/parallel') const retry = require('async/retry') -const auto = require('async/auto') -const DaemonFactory = require('ipfsd-ctl') - -/* - * Wait for a condition to become true. When its true, callback is called. - */ -function waitFor (predicate, callback) { - const ttl = Date.now() + (10 * 1000) - const self = setInterval(() => { - if (predicate()) { - clearInterval(self) - return callback() - } - if (Date.now() > ttl) { - clearInterval(self) - return callback(new Error('waitFor time expired')) - } - }, 500) -} - -const connect = (jsD, goD, callback) => { - parallel([ - (cb) => jsD.api.id(cb), - (cb) => goD.api.id(cb) - ], (error, ids) => { - if (error) { - return callback(error) - } - - const jsLocalAddr = ids[0].addresses.find(a => a.includes('127.0.0.1')) - const goLocalAddr = ids[1].addresses.find(a => a.includes('127.0.0.1')) - - parallel([ - (cb) => jsD.api.swarm.connect(goLocalAddr, cb), - (cb) => goD.api.swarm.connect(jsLocalAddr, cb) - ], callback) - }) -} +const { + spawnInitAndStartGoDaemon, + spawnInitAndStartJsDaemon +} = require('./utils/daemon') const waitForTopicPeer = (topic, peer, daemon, callback) => { retry({ @@ -68,326 +35,136 @@ const waitForTopicPeer = (topic, peer, daemon, callback) => { }, callback) } -const subscribe = (topic, onMessage, localDaemon, remoteDaemon, callback) => { - auto({ - // get the ID of the local daemon - localDaemonId: (cb) => localDaemon.api.id(cb), - - // subscribe to the topic on our local daemon - subscribeLocalDaemon: (cb) => localDaemon.api.pubsub.subscribe(topic, onMessage, cb), - - // wait for the local daemon to appear in the peer list for the remote daemon - waitForRemotePeer: ['localDaemonId', 'subscribeLocalDaemon', (results, cb) => { - waitForTopicPeer(topic, results.localDaemonId, remoteDaemon, cb) - }] - }, (error) => callback(error)) +const timeout = 20e3 +function createJs () { + return spawnInitAndStartJsDaemon({ args: ['--enable-pubsub-experiment'] }) +} +function createGo () { + return spawnInitAndStartGoDaemon({ args: ['--enable-pubsub-experiment'] }) } describe('pubsub', function () { - this.timeout(10 * 1000) + let tests = { + 'publish from Go, subscribe on Go': [createGo, createGo], + 'publish from JS, subscribe on JS': [createJs, createJs], + 'publish from JS, subscribe on Go': [createJs, createGo], + 'publish from Go, subscribe on JS': [createGo, createJs] + } + + Object.keys(tests).forEach((name) => { + describe(name, function () { + let daemon1 + let daemon2 + let id1 + let id2 + + before('spawn nodes', function () { + this.timeout(timeout) + + return Promise.all(tests[name].map(fn => fn())) + .then(nodes => { + [daemon1, daemon2] = nodes + }) + }) - let jsD - let goD - let jsId - let goId + before('connect', function (done) { + this.timeout(timeout) + + series([ + (cb) => parallel([ + (cb) => daemon1.api.id(cb), + (cb) => daemon2.api.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + id1 = ids[0] + id2 = ids[1] + cb() + }), + (cb) => daemon1.api.swarm.connect(id2.addresses[0], cb), + (cb) => daemon2.api.swarm.connect(id1.addresses[0], cb), + (cb) => parallel([ + (cb) => daemon1.api.swarm.peers(cb), + (cb) => daemon2.api.swarm.peers(cb) + ], (err, peers) => { + expect(err).to.not.exist() + expect(peers[0].map((p) => p.peer.toB58String())).to.include(id2.id) + expect(peers[1].map((p) => p.peer.toB58String())).to.include(id1.id) + cb() + }) + ], done) + }) - let nodes = [] - before(function (done) { - this.timeout(50 * 1000) + after('stop nodes', function (done) { + this.timeout(timeout) - parallel([ - (cb) => DaemonFactory.create().spawn({ - args: ['--enable-pubsub-experiment'], - initOptions: { bits: 1024 } - }, cb), - (cb) => DaemonFactory.create({ type: 'js' }).spawn({ - args: ['--enable-pubsub-experiment'], - initOptions: { bits: 512 } - }, cb) - ], (err, n) => { - expect(err).to.not.exist() - nodes = n - goD = nodes[0] - jsD = nodes[1] + parallel([daemon1, daemon2].map((node) => (cb) => node.stop(cb)), done) + }) - parallel([ - (cb) => jsD.api.id(cb), - (cb) => goD.api.id(cb) - ], (error, ids) => { - if (error) { - return done(error) + it('should exchange ascii data', function (done) { + const data = Buffer.from('hello world') + const topic = 'pubsub-ascii' + + function checkMessage (msg) { + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', id1.id) + done() } - jsId = ids[0].id - goId = ids[1].id - - done() + series([ + (cb) => daemon2.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => waitForTopicPeer(topic, id2, daemon1, cb), + (cb) => daemon1.api.pubsub.publish(topic, data, cb) + ], (err) => { + if (err) return done(err) + }) }) - }) - }) - - after(function (done) { - this.timeout(50 * 1000) - parallel(nodes.map((node) => (cb) => node.stop(cb)), done) - }) - - describe('ascii data', () => { - const data = Buffer.from('hello world') - - it('publish from Go, subscribe on Go', (done) => { - const topic = 'pubsub-go-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on JS', (done) => { - const topic = 'pubsub-js-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on Go', (done) => { - const topic = 'pubsub-js-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, goD, jsD, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-go-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, jsD, goD, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - }) - - describe('non-ascii data', () => { - const data = Buffer.from('你好世界') - it('publish from Go, subscribe on Go', (done) => { - const topic = 'pubsub-non-ascii-go-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on JS', (done) => { - const topic = 'pubsub-non-ascii-js-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on Go', (done) => { - const topic = 'pubsub-non-ascii-js-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, goD, jsD, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-non-ascii-go-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, jsD, goD, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - }) - - describe('binary data', () => { - const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') - - it('publish from Go, subscribe on Go', (done) => { - const topic = 'pubsub-binary-go-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => setTimeout(() => { cb() }, 500), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-binary-go-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, jsD, goD, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on Go', (done) => { - const topic = 'pubsub-binary-js-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, goD, jsD, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) + it('should exchange non ascii data', function (done) { + const data = Buffer.from('你好世界') + const topic = 'pubsub-non-ascii' + + function checkMessage (msg) { + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', id1.id) + done() + } - it('publish from JS, subscribe on JS', (done) => { - const topic = 'pubsub-binary-js-js' - let n = 0 + series([ + (cb) => daemon2.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => waitForTopicPeer(topic, id2, daemon1, cb), + (cb) => daemon1.api.pubsub.publish(topic, data, cb) + ], (err) => { + if (err) return done(err) + }) + }) - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } + it('should exchange binary data', function (done) { + const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') + const topic = 'pubsub-binary' + + function checkMessage (msg) { + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', id1.id) + done() + } - series([ - (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => setTimeout(() => { cb() }, 500), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) + series([ + (cb) => daemon2.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => waitForTopicPeer(topic, id2, daemon1, cb), + (cb) => daemon1.api.pubsub.publish(topic, data, cb) + ], (err) => { + if (err) return done(err) + }) + }) }) }) }) diff --git a/test/utils/daemon.js b/test/utils/daemon.js index d3dbf12b..069b7c26 100644 --- a/test/utils/daemon.js +++ b/test/utils/daemon.js @@ -1,28 +1,31 @@ 'use strict' +const mergeOptions = require('merge-options') const DaemonFactory = require('ipfsd-ctl') const goDf = DaemonFactory.create() const jsDf = DaemonFactory.create({ type: 'js' }) -const spawnInitAndStartDaemon = (factory) => { - return new Promise((resolve, reject) => { - factory.spawn({ - initOptions: { - bits: 1024 - }, - config: { - Bootstrap: [], - Discovery: { - MDNS: { - Enabled: false - }, - webRTCStar: { - Enabled: false - } +const spawnInitAndStartDaemon = (factory, options) => { + options = mergeOptions({ + initOptions: { + bits: 1024 + }, + config: { + Bootstrap: [], + Discovery: { + MDNS: { + Enabled: false + }, + webRTCStar: { + Enabled: false } - }, - profile: 'test' - }, (error, instance) => { + } + }, + profile: 'test' + }, options) + + return new Promise((resolve, reject) => { + factory.spawn(options, (error, instance) => { if (error) { return reject(error) } @@ -46,7 +49,7 @@ const stopDaemon = (daemon) => { module.exports = { spawnInitAndStartDaemon, - spawnInitAndStartGoDaemon: () => spawnInitAndStartDaemon(goDf), - spawnInitAndStartJsDaemon: () => spawnInitAndStartDaemon(jsDf), + spawnInitAndStartGoDaemon: (opts) => spawnInitAndStartDaemon(goDf, opts), + spawnInitAndStartJsDaemon: (opts) => spawnInitAndStartDaemon(jsDf, opts), stopDaemon }