From f0bd71fca257be24ac3d38ee4bf93a4d06f29e75 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 29 Apr 2019 15:43:27 +0200 Subject: [PATCH 1/7] test: simplify pubsub tests --- package.json | 1 + test/pubsub.js | 465 +++++++++++-------------------------------- test/utils/daemon.js | 43 ++-- 3 files changed, 145 insertions(+), 364 deletions(-) diff --git a/package.json b/package.json index 7ac53e4b..40bcb832 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "left-pad": "^1.3.0", "libp2p-websocket-star-rendezvous": "~0.3.0", "lodash": "^4.17.11", + "merge-options": "^1.0.1", "mocha": "^5.2.0", "ncp": "^2.0.0", "pretty-bytes": "^5.1.0", diff --git a/test/pubsub.js b/test/pubsub.js index 58049d4a..19f712ed 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -9,45 +9,11 @@ chai.use(dirtyChai) const series = require('async/series') const parallel = require('async/parallel') const retry = require('async/retry') -const auto = require('async/auto') -const DaemonFactory = require('ipfsd-ctl') - -/* - * Wait for a condition to become true. When its true, callback is called. - */ -function waitFor (predicate, callback) { - const ttl = Date.now() + (10 * 1000) - const self = setInterval(() => { - if (predicate()) { - clearInterval(self) - return callback() - } - if (Date.now() > ttl) { - clearInterval(self) - return callback(new Error('waitFor time expired')) - } - }, 500) -} - -const connect = (jsD, goD, callback) => { - parallel([ - (cb) => jsD.api.id(cb), - (cb) => goD.api.id(cb) - ], (error, ids) => { - if (error) { - return callback(error) - } - - const jsLocalAddr = ids[0].addresses.find(a => a.includes('127.0.0.1')) - const goLocalAddr = ids[1].addresses.find(a => a.includes('127.0.0.1')) - - parallel([ - (cb) => jsD.api.swarm.connect(goLocalAddr, cb), - (cb) => goD.api.swarm.connect(jsLocalAddr, cb) - ], callback) - }) -} +const { + spawnInitAndStartGoDaemon, + spawnInitAndStartJsDaemon +} = require('./utils/daemon') const waitForTopicPeer = (topic, peer, daemon, callback) => { retry({ @@ -68,326 +34,137 @@ const waitForTopicPeer = (topic, peer, daemon, callback) => { }, callback) } -const subscribe = (topic, onMessage, localDaemon, remoteDaemon, callback) => { - auto({ - // get the ID of the local daemon - localDaemonId: (cb) => localDaemon.api.id(cb), - - // subscribe to the topic on our local daemon - subscribeLocalDaemon: (cb) => localDaemon.api.pubsub.subscribe(topic, onMessage, cb), - - // wait for the local daemon to appear in the peer list for the remote daemon - waitForRemotePeer: ['localDaemonId', 'subscribeLocalDaemon', (results, cb) => { - waitForTopicPeer(topic, results.localDaemonId, remoteDaemon, cb) - }] - }, (error) => callback(error)) +const timeout = 20e3 +function createJs () { + return spawnInitAndStartJsDaemon({ args: ['--enable-pubsub-experiment'] }) +} +function createGo () { + return spawnInitAndStartGoDaemon({ args: ['--enable-pubsub-experiment'] }) } describe('pubsub', function () { - this.timeout(10 * 1000) + let tests = { + 'publish from Go, subscribe on JS': [createGo, createJs], + 'publish from Go, subscribe on Go': [createGo, createGo], + 'publish from JS, subscribe on Go': [createJs, createGo], + 'publish from JS, subscribe on JS': [createJs, createJs] + } + + Object.keys(tests).forEach((name) => { + describe(name, function () { + let daemon1 + let daemon2 + let id1 + let id2 + + before('spawn nodes', async function () { + this.timeout(timeout) + + return Promise.all(tests[name].map(fn => fn())) + .then(nodes => { + [daemon1, daemon2] = nodes + }) + }) - let jsD - let goD - let jsId - let goId + before('connect', function (done) { + this.timeout(timeout) + + series([ + (cb) => parallel([ + (cb) => daemon1.api.id(cb), + (cb) => daemon2.api.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + id1 = ids[0] + id2 = ids[1] + cb() + }), + (cb) => daemon1.api.swarm.connect(id2.addresses[0], cb), + (cb) => daemon2.api.swarm.connect(id1.addresses[0], cb), + (cb) => parallel([ + (cb) => daemon1.api.swarm.peers(cb), + (cb) => daemon2.api.swarm.peers(cb) + ], (err, peers) => { + expect(err).to.not.exist() + expect(peers[0].map((p) => p.peer.toB58String())).to.include(id2.id) + expect(peers[1].map((p) => p.peer.toB58String())).to.include(id1.id) + cb() + }) + ], done) + }) - let nodes = [] - before(function (done) { - this.timeout(50 * 1000) + after('stop nodes', function (done) { + this.timeout(timeout) - parallel([ - (cb) => DaemonFactory.create().spawn({ - args: ['--enable-pubsub-experiment'], - initOptions: { bits: 1024 } - }, cb), - (cb) => DaemonFactory.create({ type: 'js' }).spawn({ - args: ['--enable-pubsub-experiment'], - initOptions: { bits: 512 } - }, cb) - ], (err, n) => { - expect(err).to.not.exist() - nodes = n - goD = nodes[0] - jsD = nodes[1] + parallel([daemon1, daemon2].map((node) => (cb) => node.stop(cb)), done) + }) - parallel([ - (cb) => jsD.api.id(cb), - (cb) => goD.api.id(cb) - ], (error, ids) => { - if (error) { - return done(error) + it('should exchange ascii data', function(done) { + const data = Buffer.from('hello world') + const topic = 'pubsub-ascii' + + function checkMessage (msg) { + console.log('Check message', msg) + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', id1.id) + done() } - jsId = ids[0].id - goId = ids[1].id - - done() + series([ + (cb) => daemon2.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => waitForTopicPeer(topic, id2, daemon1, cb), + (cb) => daemon1.api.pubsub.publish(topic, data, cb) + ], (err) => { + if (err) return done(err) + }) }) - }) - }) - - after(function (done) { - this.timeout(50 * 1000) - parallel(nodes.map((node) => (cb) => node.stop(cb)), done) - }) - - describe('ascii data', () => { - const data = Buffer.from('hello world') - - it('publish from Go, subscribe on Go', (done) => { - const topic = 'pubsub-go-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on JS', (done) => { - const topic = 'pubsub-js-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on Go', (done) => { - const topic = 'pubsub-js-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, goD, jsD, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-go-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, jsD, goD, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - }) - - describe('non-ascii data', () => { - const data = Buffer.from('你好世界') - it('publish from Go, subscribe on Go', (done) => { - const topic = 'pubsub-non-ascii-go-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on JS', (done) => { - const topic = 'pubsub-non-ascii-js-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on Go', (done) => { - const topic = 'pubsub-non-ascii-js-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, goD, jsD, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-non-ascii-go-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString()).to.equal(data.toString()) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, jsD, goD, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - }) - - describe('binary data', () => { - const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') - - it('publish from Go, subscribe on Go', (done) => { - const topic = 'pubsub-binary-go-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => setTimeout(() => { cb() }, 500), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from Go, subscribe on JS', (done) => { - const topic = 'pubsub-binary-go-js' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', goId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, jsD, goD, cb), - (cb) => goD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) - - it('publish from JS, subscribe on Go', (done) => { - const topic = 'pubsub-binary-js-go' - let n = 0 - - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } - - series([ - (cb) => connect(jsD, goD, cb), - (cb) => subscribe(topic, checkMessage, goD, jsD, cb), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) - }) + it('should exchange non ascii data', function(done) { + const data = Buffer.from('你好世界') + const topic = 'pubsub-non-ascii' + + function checkMessage (msg) { + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', id1.id) + done() + } - it('publish from JS, subscribe on JS', (done) => { - const topic = 'pubsub-binary-js-js' - let n = 0 + series([ + (cb) => daemon2.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => waitForTopicPeer(topic, id2, daemon1, cb), + (cb) => daemon1.api.pubsub.publish(topic, data, cb) + ], (err) => { + if (err) return done(err) + }) + }) - function checkMessage (msg) { - ++n - expect(msg.data.toString('hex')).to.equal(data.toString('hex')) - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) - expect(msg).to.have.property('topicIDs').eql([topic]) - expect(msg).to.have.property('from', jsId) - } + it('should exchange binary data', function(done) { + const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') + const topic = 'pubsub-binary' + + function checkMessage (msg) { + expect(msg.data.toString()).to.equal(data.toString()) + expect(msg).to.have.property('seqno') + expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) + expect(msg).to.have.property('topicIDs').eql([topic]) + expect(msg).to.have.property('from', id1.id) + done() + } - series([ - (cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb), - (cb) => setTimeout(() => { cb() }, 500), - (cb) => jsD.api.pubsub.publish(topic, data, cb), - (cb) => waitFor(() => n === 1, cb) - ], done) + series([ + (cb) => daemon2.api.pubsub.subscribe(topic, checkMessage, cb), + (cb) => waitForTopicPeer(topic, id2, daemon1, cb), + (cb) => daemon1.api.pubsub.publish(topic, data, cb) + ], (err) => { + if (err) return done(err) + }) + }) }) }) }) diff --git a/test/utils/daemon.js b/test/utils/daemon.js index d3dbf12b..069b7c26 100644 --- a/test/utils/daemon.js +++ b/test/utils/daemon.js @@ -1,28 +1,31 @@ 'use strict' +const mergeOptions = require('merge-options') const DaemonFactory = require('ipfsd-ctl') const goDf = DaemonFactory.create() const jsDf = DaemonFactory.create({ type: 'js' }) -const spawnInitAndStartDaemon = (factory) => { - return new Promise((resolve, reject) => { - factory.spawn({ - initOptions: { - bits: 1024 - }, - config: { - Bootstrap: [], - Discovery: { - MDNS: { - Enabled: false - }, - webRTCStar: { - Enabled: false - } +const spawnInitAndStartDaemon = (factory, options) => { + options = mergeOptions({ + initOptions: { + bits: 1024 + }, + config: { + Bootstrap: [], + Discovery: { + MDNS: { + Enabled: false + }, + webRTCStar: { + Enabled: false } - }, - profile: 'test' - }, (error, instance) => { + } + }, + profile: 'test' + }, options) + + return new Promise((resolve, reject) => { + factory.spawn(options, (error, instance) => { if (error) { return reject(error) } @@ -46,7 +49,7 @@ const stopDaemon = (daemon) => { module.exports = { spawnInitAndStartDaemon, - spawnInitAndStartGoDaemon: () => spawnInitAndStartDaemon(goDf), - spawnInitAndStartJsDaemon: () => spawnInitAndStartDaemon(jsDf), + spawnInitAndStartGoDaemon: (opts) => spawnInitAndStartDaemon(goDf, opts), + spawnInitAndStartJsDaemon: (opts) => spawnInitAndStartDaemon(jsDf, opts), stopDaemon } From aff3e0f6bfa525b40338b3f05e0f4af5a5a42be9 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 29 Apr 2019 15:46:38 +0200 Subject: [PATCH 2/7] chore: fix lint --- test/pubsub.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/pubsub.js b/test/pubsub.js index 19f712ed..b140e3c7 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -1,3 +1,4 @@ +/* eslint max-nested-callbacks: ["error", 6] */ /* eslint-env mocha */ 'use strict' @@ -99,7 +100,7 @@ describe('pubsub', function () { parallel([daemon1, daemon2].map((node) => (cb) => node.stop(cb)), done) }) - it('should exchange ascii data', function(done) { + it('should exchange ascii data', function (done) { const data = Buffer.from('hello world') const topic = 'pubsub-ascii' @@ -122,7 +123,7 @@ describe('pubsub', function () { }) }) - it('should exchange non ascii data', function(done) { + it('should exchange non ascii data', function (done) { const data = Buffer.from('你好世界') const topic = 'pubsub-non-ascii' @@ -144,7 +145,7 @@ describe('pubsub', function () { }) }) - it('should exchange binary data', function(done) { + it('should exchange binary data', function (done) { const data = Buffer.from('a36161636179656162830103056164a16466666666f400010203040506070809', 'hex') const topic = 'pubsub-binary' From 02aa650d8b91def3b2bc03118b7ec1eb41b53ef5 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 29 Apr 2019 15:53:40 +0200 Subject: [PATCH 3/7] test: reorder pubsub tests --- test/pubsub.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/pubsub.js b/test/pubsub.js index b140e3c7..2285d743 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -45,10 +45,10 @@ function createGo () { describe('pubsub', function () { let tests = { - 'publish from Go, subscribe on JS': [createGo, createJs], 'publish from Go, subscribe on Go': [createGo, createGo], + 'publish from JS, subscribe on JS': [createJs, createJs], 'publish from JS, subscribe on Go': [createJs, createGo], - 'publish from JS, subscribe on JS': [createJs, createJs] + 'publish from Go, subscribe on JS': [createGo, createJs] } Object.keys(tests).forEach((name) => { From 1cdeb8d4540e9614bfb44ce9cf97b13acc5ef7df Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 29 Apr 2019 16:06:05 +0200 Subject: [PATCH 4/7] chore: remove console log --- test/pubsub.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/pubsub.js b/test/pubsub.js index 2285d743..430dbb6d 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -105,7 +105,6 @@ describe('pubsub', function () { const topic = 'pubsub-ascii' function checkMessage (msg) { - console.log('Check message', msg) expect(msg.data.toString()).to.equal(data.toString()) expect(msg).to.have.property('seqno') expect(Buffer.isBuffer(msg.seqno)).to.be.eql(true) From 11ab2fd7e2f130ec59db480b906f2af09a08a9a8 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 1 May 2019 14:25:13 +0200 Subject: [PATCH 5/7] refactor: remove unneeded async --- test/pubsub.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/pubsub.js b/test/pubsub.js index 430dbb6d..3e627a37 100644 --- a/test/pubsub.js +++ b/test/pubsub.js @@ -58,7 +58,7 @@ describe('pubsub', function () { let id1 let id2 - before('spawn nodes', async function () { + before('spawn nodes', function () { this.timeout(timeout) return Promise.all(tests[name].map(fn => fn())) From 841708fa8ce3d49e0bde1f18651b9bcff125c6f2 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 1 May 2019 14:28:37 +0200 Subject: [PATCH 6/7] fix: test pull-length-prefixed change --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 304e7d8b..f929fb78 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ os: - linux - osx -script: npx nyc -s npm run test:node -- --bail +script: npm i pull-length-prefixed@dignifiedquire/pull-length-prefixed#fix/empty-streams && npx nyc -s npm run test:node -- --bail after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov jobs: From c579fd7f0256ed99bdd371cac2cc3d3744832be0 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Wed, 1 May 2019 15:23:04 +0200 Subject: [PATCH 7/7] chore: use pull-mplex ipfs branch --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 40bcb832..28c9ae08 100644 --- a/package.json +++ b/package.json @@ -50,7 +50,7 @@ "form-data": "^2.3.3", "go-ipfs-dep": "~0.4.19", "hat": "0.0.3", - "ipfs": "~0.35.0", + "ipfs": "ipfs/js-ipfs#feat/pull-mplex", "ipfs-http-client": "^30.1.1", "ipfs-repo": "~0.26.1", "ipfs-unixfs": "~0.1.16",