diff --git a/package.json b/package.json index 65c539a7..fe60f482 100644 --- a/package.json +++ b/package.json @@ -53,6 +53,7 @@ "ipld-dag-pb": "~0.17.3", "is-ipfs": "~0.6.1", "is-plain-object": "^3.0.0", + "it-pushable": "^1.2.1", "libp2p-crypto": "~0.16.0", "multiaddr": "^6.0.0", "multibase": "~0.6.0", @@ -63,6 +64,7 @@ "pull-stream": "^3.6.11", "pump": "^3.0.0", "readable-stream": "^3.1.1", + "streaming-iterables": "^4.1.0", "through2": "^3.0.0" }, "devDependencies": { diff --git a/src/pubsub/ls.js b/src/pubsub/ls.js index 670d1a4a..639aed5e 100644 --- a/src/pubsub/ls.js +++ b/src/pubsub/ls.js @@ -1,9 +1,9 @@ /* eslint-env mocha */ 'use strict' -const eachSeries = require('async/eachSeries') const { getTopic } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') +const delay = require('../utils/delay') module.exports = (createCommon, options) => { const describe = getDescribe(options) @@ -14,6 +14,7 @@ module.exports = (createCommon, options) => { this.timeout(80 * 1000) let ipfs + let subscribedTopics = [] before(function (done) { // CI takes longer to instantiate the daemon, so we need to increase the @@ -30,33 +31,32 @@ module.exports = (createCommon, options) => { }) }) + afterEach(async () => { + for (let i = 0; i < subscribedTopics.length; i++) { + await ipfs.pubsub.unsubscribe(subscribedTopics[i]) + } + subscribedTopics = [] + await delay(100) + }) + after((done) => common.teardown(done)) - it('should return an empty list when no topics are subscribed', (done) => { - ipfs.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics.length).to.equal(0) - done() - }) + it('should return an empty list when no topics are subscribed', async () => { + const topics = await ipfs.pubsub.ls() + expect(topics.length).to.equal(0) }) - it('should return a list with 1 subscribed topic', (done) => { - const sub1 = (msg) => {} + it('should return a list with 1 subscribed topic', async () => { + const sub1 = () => {} const topic = getTopic() + subscribedTopics = [topic] - ipfs.pubsub.subscribe(topic, sub1, (err) => { - expect(err).to.not.exist() - - ipfs.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics).to.be.eql([topic]) - - ipfs.pubsub.unsubscribe(topic, sub1, done) - }) - }) + await ipfs.pubsub.subscribe(topic, sub1) + const topics = await ipfs.pubsub.ls() + expect(topics).to.be.eql([topic]) }) - it('should return a list with 3 subscribed topics', (done) => { + it('should return a list with 3 subscribed topics', async () => { const topics = [{ name: 'one', handler () {} @@ -68,22 +68,14 @@ module.exports = (createCommon, options) => { handler () {} }] - eachSeries(topics, (t, cb) => { - ipfs.pubsub.subscribe(t.name, t.handler, cb) - }, (err) => { - expect(err).to.not.exist() + subscribedTopics = topics.map(t => t.name) - ipfs.pubsub.ls((err, list) => { - expect(err).to.not.exist() + for (let i = 0; i < topics.length; i++) { + await ipfs.pubsub.subscribe(topics[i].name, topics[i].handler) + } - expect(list.sort()) - .to.eql(topics.map((t) => t.name).sort()) - - eachSeries(topics, (t, cb) => { - ipfs.pubsub.unsubscribe(t.name, t.handler, cb) - }, done) - }) - }) + const list = await ipfs.pubsub.ls() + expect(list.sort()).to.eql(topics.map(t => t.name).sort()) }) }) } diff --git a/src/pubsub/peers.js b/src/pubsub/peers.js index 7f5684d8..44813023 100644 --- a/src/pubsub/peers.js +++ b/src/pubsub/peers.js @@ -2,11 +2,11 @@ 'use strict' const parallel = require('async/parallel') -const series = require('async/series') const { spawnNodesWithId } = require('../utils/spawn') const { waitForPeers, getTopic } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') const { connect } = require('../utils/swarm') +const delay = require('../utils/delay') module.exports = (createCommon, options) => { const describe = getDescribe(options) @@ -19,6 +19,7 @@ module.exports = (createCommon, options) => { let ipfs1 let ipfs2 let ipfs3 + let subscribedTopics = [] before(function (done) { // CI takes longer to instantiate the daemon, so we need to increase the @@ -40,6 +41,16 @@ module.exports = (createCommon, options) => { }) }) + afterEach(async () => { + const nodes = [ipfs1, ipfs2, ipfs3] + for (let i = 0; i < subscribedTopics.length; i++) { + const topic = subscribedTopics[i] + await Promise.all(nodes.map(ipfs => ipfs.pubsub.unsubscribe(topic))) + } + subscribedTopics = [] + await delay(100) + }) + after((done) => common.teardown(done)) before((done) => { @@ -52,94 +63,64 @@ module.exports = (createCommon, options) => { ], done) }) - it('should not error when not subscribed to a topic', (done) => { + it('should not error when not subscribed to a topic', async () => { const topic = getTopic() - ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.not.exist() - // Should be empty() but as mentioned below go-ipfs returns more than it should - // expect(peers).to.be.empty() - - done() - }) + const peers = await ipfs1.pubsub.peers(topic) + expect(peers).to.exist() + // Should be empty() but as mentioned below go-ipfs returns more than it should + // expect(peers).to.be.empty() }) - it('should not return extra peers', (done) => { + it('should not return extra peers', async () => { // Currently go-ipfs returns peers that have not been // subscribed to the topic. Enable when go-ipfs has been fixed - const sub1 = (msg) => {} - const sub2 = (msg) => {} - const sub3 = (msg) => {} + const sub1 = () => {} + const sub2 = () => {} + const sub3 = () => {} const topic = getTopic() const topicOther = topic + 'different topic' - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb), - (cb) => ipfs3.pubsub.subscribe(topicOther, sub3, cb) - ], (err) => { - expect(err).to.not.exist() - - ipfs1.pubsub.peers(topic, (err, peers) => { - expect(err).to.not.exist() - expect(peers).to.be.empty() - - parallel([ - (cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.unsubscribe(topicOther, sub2, cb), - (cb) => ipfs3.pubsub.unsubscribe(topicOther, sub3, cb) - ], done) - }) - }) + subscribedTopics = [topic, topicOther] + + await ipfs1.pubsub.subscribe(topic, sub1) + await ipfs2.pubsub.subscribe(topicOther, sub2) + await ipfs3.pubsub.subscribe(topicOther, sub3) + + const peers = await ipfs1.pubsub.peers(topic) + expect(peers).to.be.empty() }) - it('should return peers for a topic - one peer', (done) => { + it('should return peers for a topic - one peer', async () => { // Currently go-ipfs returns peers that have not been // subscribed to the topic. Enable when go-ipfs has been fixed - const sub1 = (msg) => {} - const sub2 = (msg) => {} - const sub3 = (msg) => {} + const sub1 = () => {} + const sub2 = () => {} + const sub3 = () => {} const topic = getTopic() - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), - (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb), - (cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000, cb) - ], (err) => { - expect(err).to.not.exist() - - parallel([ - (cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb), - (cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb) - ], done) - }) + subscribedTopics = [topic] + + await ipfs1.pubsub.subscribe(topic, sub1) + await ipfs2.pubsub.subscribe(topic, sub2) + await ipfs3.pubsub.subscribe(topic, sub3) + + await waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000) }) - it('should return peers for a topic - multiple peers', (done) => { - const sub1 = (msg) => {} - const sub2 = (msg) => {} - const sub3 = (msg) => {} + it('should return peers for a topic - multiple peers', async () => { + const sub1 = () => {} + const sub2 = () => {} + const sub3 = () => {} const topic = getTopic() - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), - (cb) => ipfs3.pubsub.subscribe(topic, sub3, cb), - (cb) => waitForPeers(ipfs1, topic, [ - ipfs2.peerId.id, - ipfs3.peerId.id - ], 30000, cb) - ], (err) => { - expect(err).to.not.exist() - - parallel([ - (cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb), - (cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb) - ], done) - }) + subscribedTopics = [topic] + + await ipfs1.pubsub.subscribe(topic, sub1) + await ipfs2.pubsub.subscribe(topic, sub2) + await ipfs3.pubsub.subscribe(topic, sub3) + + await waitForPeers(ipfs1, topic, [ipfs2.peerId.id, ipfs3.peerId.id], 30000) }) }) } diff --git a/src/pubsub/publish.js b/src/pubsub/publish.js index 79620005..ef219129 100644 --- a/src/pubsub/publish.js +++ b/src/pubsub/publish.js @@ -1,7 +1,6 @@ /* eslint-env mocha */ 'use strict' -const timesSeries = require('async/timesSeries') const hat = require('hat') const { getTopic } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') @@ -33,26 +32,29 @@ module.exports = (createCommon, options) => { after((done) => common.teardown(done)) - it('should error on string messags', (done) => { + it('should error on string messags', async () => { const topic = getTopic() - ipfs.pubsub.publish(topic, 'hello friend', (err) => { + try { + await ipfs.pubsub.publish(topic, 'hello friend') + } catch (err) { expect(err).to.exist() - done() - }) + return + } + throw new Error('did not error on string message') }) - it('should publish message from buffer', (done) => { + it('should publish message from buffer', () => { const topic = getTopic() - ipfs.pubsub.publish(topic, Buffer.from(hat()), done) + return ipfs.pubsub.publish(topic, Buffer.from(hat())) }) - it('should publish 10 times within time limit', (done) => { + it('should publish 10 times within time limit', async () => { const count = 10 const topic = getTopic() - timesSeries(count, (_, cb) => { - ipfs.pubsub.publish(topic, Buffer.from(hat()), cb) - }, done) + for (let i = 0; i < count; i++) { + await ipfs.pubsub.publish(topic, Buffer.from(hat())) + } }) }) } diff --git a/src/pubsub/subscribe.js b/src/pubsub/subscribe.js index 64436f60..e04bb0a5 100644 --- a/src/pubsub/subscribe.js +++ b/src/pubsub/subscribe.js @@ -2,13 +2,13 @@ /* eslint max-nested-callbacks: ["error", 6] */ 'use strict' -const series = require('async/series') -const parallel = require('async/parallel') -const timesSeries = require('async/timesSeries') +const pushable = require('it-pushable') +const { collect } = require('streaming-iterables') const { spawnNodesWithId } = require('../utils/spawn') -const { waitForPeers, makeCheck, getTopic } = require('./utils') +const { waitForPeers, getTopic } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') const { connect } = require('../utils/swarm') +const delay = require('../utils/delay') module.exports = (createCommon, options) => { const describe = getDescribe(options) @@ -20,6 +20,8 @@ module.exports = (createCommon, options) => { let ipfs1 let ipfs2 + let topic + let subscribedTopics = [] before(function (done) { // CI takes longer to instantiate the daemon, so we need to increase the @@ -40,171 +42,115 @@ module.exports = (createCommon, options) => { }) }) - after((done) => common.teardown(done)) - - describe('single node', () => { - it('should subscribe to one topic', (done) => { - const check = makeCheck(2, done) - const topic = getTopic() + beforeEach(() => { + topic = getTopic() + subscribedTopics = [topic] + }) - const handler = (msg) => { - expect(msg.data.toString()).to.equal('hi') - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.eql(true) - expect(msg.topicIDs[0]).to.eq(topic) - expect(msg).to.have.property('from', ipfs1.peerId.id) + afterEach(async () => { + const nodes = [ipfs1, ipfs2] + for (let i = 0; i < subscribedTopics.length; i++) { + const topic = subscribedTopics[i] + await Promise.all(nodes.map(ipfs => ipfs.pubsub.unsubscribe(topic))) + } + subscribedTopics = [] + await delay(100) + }) - ipfs1.pubsub.unsubscribe(topic, handler, (err) => { - expect(err).to.not.exist() + after((done) => common.teardown(done)) - ipfs1.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics).to.be.empty() - check() - }) - }) - } + describe('single node', () => { + it('should subscribe to one topic', async () => { + const msgStream = pushable() - ipfs1.pubsub.subscribe(topic, handler, (err) => { - expect(err).to.not.exist() - ipfs1.pubsub.publish(topic, Buffer.from('hi'), check) + await ipfs1.pubsub.subscribe(topic, msg => { + msgStream.push(msg) + msgStream.end() }) - }) - it('should subscribe to one topic (promised)', (done) => { - const check = makeCheck(2, done) - const topic = getTopic() + await ipfs1.pubsub.publish(topic, Buffer.from('hi')) - const handler = (msg) => { + for await (const msg of msgStream) { expect(msg.data.toString()).to.equal('hi') expect(msg).to.have.property('seqno') expect(Buffer.isBuffer(msg.seqno)).to.eql(true) expect(msg.topicIDs[0]).to.eq(topic) expect(msg).to.have.property('from', ipfs1.peerId.id) - - ipfs1.pubsub.unsubscribe(topic, handler, (err) => { - expect(err).to.not.exist() - - ipfs1.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics).to.be.empty() - check() - }) - }) + break } - - ipfs1.pubsub - .subscribe(topic, handler) - .then(() => ipfs1.pubsub.publish(topic, Buffer.from('hi'), check)) - .catch((err) => expect(err).to.not.exist()) }) - it('should subscribe to one topic with options', (done) => { - const check = makeCheck(2, done) - const topic = getTopic() + it('should subscribe to one topic with options', async () => { + const msgStream = pushable() - const handler = (msg) => { + await ipfs1.pubsub.subscribe(topic, msg => { + msgStream.push(msg) + msgStream.end() + }, {}) + + await ipfs1.pubsub.publish(topic, Buffer.from('hi')) + + for await (const msg of msgStream) { expect(msg.data.toString()).to.equal('hi') expect(msg).to.have.property('seqno') expect(Buffer.isBuffer(msg.seqno)).to.eql(true) expect(msg.topicIDs[0]).to.eq(topic) expect(msg).to.have.property('from', ipfs1.peerId.id) + } + }) - ipfs1.pubsub.unsubscribe(topic, handler, (err) => { - expect(err).to.not.exist() + it('should subscribe to topic multiple times with different handlers', async () => { + const msgStream1 = pushable() + const msgStream2 = pushable() - ipfs1.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics).to.be.empty() - check() - }) - }) + const handler1 = msg => { + msgStream1.push(msg) + msgStream1.end() + } + const handler2 = msg => { + msgStream2.push(msg) + msgStream2.end() } - ipfs1.pubsub.subscribe(topic, handler, {}, (err) => { - expect(err).to.not.exist() - ipfs1.pubsub.publish(topic, Buffer.from('hi'), check) - }) - }) - - it('should subscribe to one topic with options (promised)', (done) => { - const check = makeCheck(2, done) - const topic = getTopic() + await Promise.all([ + ipfs1.pubsub.subscribe(topic, handler1), + ipfs1.pubsub.subscribe(topic, handler2) + ]) - const handler = (msg) => { - expect(msg.data.toString()).to.equal('hi') - expect(msg).to.have.property('seqno') - expect(Buffer.isBuffer(msg.seqno)).to.eql(true) - expect(msg.topicIDs[0]).to.eq(topic) - expect(msg).to.have.property('from', ipfs1.peerId.id) + await ipfs1.pubsub.publish(topic, Buffer.from('hello')) - ipfs1.pubsub.unsubscribe(topic, handler, (err) => { - expect(err).to.not.exist() + const [handler1Msg] = await collect(msgStream1) + expect(handler1Msg.data.toString()).to.eql('hello') - ipfs1.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics).to.be.empty() - check() - }) - }) - } + const [handler2Msg] = await collect(msgStream2) + expect(handler2Msg.data.toString()).to.eql('hello') - ipfs1.pubsub - .subscribe(topic, handler, {}) - .then(() => ipfs1.pubsub.publish(topic, Buffer.from('hi'), check)) - .catch((err) => expect(err).to.not.exist()) - }) + await ipfs1.pubsub.unsubscribe(topic, handler1) + await delay(100) - it('should subscribe to topic multiple times with different handlers', (done) => { - const topic = getTopic() - - const check = makeCheck(3, done) - const handler1 = (msg) => { - expect(msg.data.toString()).to.eql('hello') - - series([ - (cb) => ipfs1.pubsub.unsubscribe(topic, handler1, cb), - (cb) => ipfs1.pubsub.ls(cb), - (cb) => ipfs1.pubsub.unsubscribe(topic, handler2, cb), - (cb) => ipfs1.pubsub.ls(cb) - ], (err, res) => { - expect(err).to.not.exist() - - // Still subscribed as there is one listener left - expect(res[1]).to.eql([topic]) - // Now all listeners are gone no subscription anymore - expect(res[3]).to.eql([]) - check() - }) - } + // Still subscribed as there is one listener left + expect(await ipfs1.pubsub.ls()).to.eql([topic]) - const handler2 = (msg) => { - expect(msg.data.toString()).to.eql('hello') - check() - } + await ipfs1.pubsub.unsubscribe(topic, handler2) + await delay(100) - series([ - (cb) => ipfs1.pubsub.subscribe(topic, handler1, cb), - (cb) => ipfs1.pubsub.subscribe(topic, handler2, cb) - ], (err) => { - expect(err).to.not.exist() - ipfs1.pubsub.publish(topic, Buffer.from('hello'), check) - }) + // Now all listeners are gone no subscription anymore + expect(await ipfs1.pubsub.ls()).to.eql([]) }) - it('should allow discover option to be passed', (done) => { - const check = makeCheck(2, done) - const topic = getTopic() + it('should allow discover option to be passed', async () => { + const msgStream = pushable() + + await ipfs1.pubsub.subscribe(topic, msg => { + msgStream.push(msg) + msgStream.end() + }, { discover: true }) - const handler = (msg) => { + await ipfs1.pubsub.publish(topic, Buffer.from('hi')) + + for await (const msg of msgStream) { expect(msg.data.toString()).to.eql('hi') - ipfs1.pubsub.unsubscribe(topic, handler, check) } - - ipfs1.pubsub.subscribe(topic, handler, { discover: true }, (err) => { - expect(err).to.not.exist() - ipfs1.pubsub.publish(topic, Buffer.from('hi'), check) - }) }) }) @@ -222,171 +168,151 @@ module.exports = (createCommon, options) => { connect(ipfs1, ipfs2Addr, done) }) - let topic - let sub1 - let sub2 + it('should receive messages from a different node', async () => { + const expectedString = 'hello from the other side' - beforeEach(() => { - topic = getTopic() - }) + const msgStream1 = pushable() + const msgStream2 = pushable() - afterEach((done) => { - parallel([ - (cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb) - ], done) - }) + const sub1 = msg => { + msgStream1.push(msg) + msgStream1.end() + } + const sub2 = msg => { + msgStream2.push(msg) + msgStream2.end() + } - it('should receive messages from a different node', (done) => { - const check = makeCheck(3, done) - const expectedString = 'hello from the other side' + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sub1), + ipfs2.pubsub.subscribe(topic, sub2) + ]) - sub1 = (msg) => { - expect(msg.data.toString()).to.be.eql(expectedString) - expect(msg.from).to.eql(ipfs2.peerId.id) - check() - } + await waitForPeers(ipfs2, topic, [ipfs1.peerId.id], 30000) - sub2 = (msg) => { - expect(msg.data.toString()).to.be.eql(expectedString) - expect(msg.from).to.eql(ipfs2.peerId.id) - check() - } + await ipfs2.pubsub.publish(topic, Buffer.from(expectedString)) - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), - (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], 30000, cb) - ], (err) => { - expect(err).to.not.exist() + const [sub1Msg] = await collect(msgStream1) + expect(sub1Msg.data.toString()).to.be.eql(expectedString) + expect(sub1Msg.from).to.eql(ipfs2.peerId.id) - ipfs2.pubsub.publish(topic, Buffer.from(expectedString), check) - }) + const [sub2Msg] = await collect(msgStream2) + expect(sub2Msg.data.toString()).to.be.eql(expectedString) + expect(sub2Msg.from).to.eql(ipfs2.peerId.id) }) - it('should round trip a non-utf8 binary buffer', (done) => { - const check = makeCheck(3, done) + it('should round trip a non-utf8 binary buffer', async () => { const expectedHex = 'a36161636179656162830103056164a16466666666f4' const buffer = Buffer.from(expectedHex, 'hex') - sub1 = (msg) => { - try { - expect(msg.data.toString('hex')).to.be.eql(expectedHex) - expect(msg.from).to.eql(ipfs2.peerId.id) - check() - } catch (err) { - check(err) - } - } + const msgStream1 = pushable() + const msgStream2 = pushable() - sub2 = (msg) => { - try { - expect(msg.data.toString('hex')).to.eql(expectedHex) - expect(msg.from).to.eql(ipfs2.peerId.id) - check() - } catch (err) { - check(err) - } + const sub1 = msg => { + msgStream1.push(msg) + msgStream1.end() + } + const sub2 = msg => { + msgStream2.push(msg) + msgStream2.end() } - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), - (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], 30000, cb) - ], (err) => { - expect(err).to.not.exist() + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sub1), + ipfs2.pubsub.subscribe(topic, sub2) + ]) - ipfs2.pubsub.publish(topic, buffer, check) - }) + await waitForPeers(ipfs2, topic, [ipfs1.peerId.id], 30000) + + await ipfs2.pubsub.publish(topic, buffer) + + const [sub1Msg] = await collect(msgStream1) + expect(sub1Msg.data.toString('hex')).to.be.eql(expectedHex) + expect(sub1Msg.from).to.eql(ipfs2.peerId.id) + + const [sub2Msg] = await collect(msgStream2) + expect(sub2Msg.data.toString('hex')).to.be.eql(expectedHex) + expect(sub2Msg.from).to.eql(ipfs2.peerId.id) }) - it('should receive multiple messages', (done) => { - const inbox1 = [] - const inbox2 = [] + it('should receive multiple messages', async () => { const outbox = ['hello', 'world', 'this', 'is', 'pubsub'] - const check = makeCheck(outbox.length * 3, (err) => { - expect(inbox1.sort()).to.eql(outbox.sort()) - expect(inbox2.sort()).to.eql(outbox.sort()) - - done(err) - }) + const msgStream1 = pushable() + const msgStream2 = pushable() - sub1 = (msg) => { - inbox1.push(msg.data.toString()) - expect(msg.from).to.eql(ipfs2.peerId.id) - check() + const sub1 = msg => { + msgStream1.push(msg) + sub1.called++ + if (sub1.called === outbox.length) msgStream1.end() } + sub1.called = 0 - sub2 = (msg) => { - inbox2.push(msg.data.toString()) - expect(msg.from).to.be.eql(ipfs2.peerId.id) - check() + const sub2 = msg => { + msgStream2.push(msg) + sub2.called++ + if (sub2.called === outbox.length) msgStream2.end() } + sub2.called = 0 - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), - (cb) => waitForPeers(ipfs2, topic, [ipfs1.peerId.id], 30000, cb) - ], (err) => { - expect(err).to.not.exist() + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sub1), + ipfs2.pubsub.subscribe(topic, sub2) + ]) - outbox.forEach((msg) => { - ipfs2.pubsub.publish(topic, Buffer.from(msg), check) - }) - }) + await waitForPeers(ipfs2, topic, [ipfs1.peerId.id], 30000) + + outbox.forEach(msg => ipfs2.pubsub.publish(topic, Buffer.from(msg))) + + const sub1Msgs = await collect(msgStream1) + sub1Msgs.forEach(msg => expect(msg.from).to.eql(ipfs2.peerId.id)) + const inbox1 = sub1Msgs.map(msg => msg.data.toString()) + expect(inbox1.sort()).to.eql(outbox.sort()) + + const sub2Msgs = await collect(msgStream2) + sub2Msgs.forEach(msg => expect(msg.from).to.eql(ipfs2.peerId.id)) + const inbox2 = sub2Msgs.map(msg => msg.data.toString()) + expect(inbox2.sort()).to.eql(outbox.sort()) }) - it('send/receive 100 messages', function (done) { + it('should send/receive 100 messages', async function () { this.timeout(2 * 60 * 1000) const msgBase = 'msg - ' const count = 100 - let receivedCount = 0 - let startTime - let counter = 0 + const msgStream = pushable() - sub1 = (msg) => { - // go-ipfs can't send messages in order when there are - // only two nodes in the same machine ¯\_(ツ)_/¯ - // https://github.com/ipfs/js-ipfs-api/pull/493#issuecomment-289499943 - // const expectedMsg = msgBase + receivedCount - // const receivedMsg = msg.data.toString() - // expect(receivedMsg).to.eql(expectedMsg) + const sub = msg => { + msgStream.push(msg) + sub.called++ + if (sub.called === count) msgStream.end() + } + sub.called = 0 - receivedCount++ + await Promise.all([ + ipfs1.pubsub.subscribe(topic, sub), + ipfs2.pubsub.subscribe(topic, () => {}) + ]) - if (receivedCount >= count) { - const duration = new Date().getTime() - startTime - const opsPerSec = Math.floor(count / (duration / 1000)) + await waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000) - // eslint-disable-next-line - console.log(`Send/Receive 100 messages took: ${duration} ms, ${opsPerSec} ops / s`) + const startTime = new Date().getTime() - check() - } + for (let i = 0; i < count; i++) { + const msgData = Buffer.from(msgBase + i) + await ipfs2.pubsub.publish(topic, msgData) } - sub2 = (msg) => {} + const msgs = await collect(msgStream) + const duration = new Date().getTime() - startTime + const opsPerSec = Math.floor(count / (duration / 1000)) - function check () { - if (++counter === 2) { - done() - } - } + // eslint-disable-next-line + console.log(`Send/Receive 100 messages took: ${duration} ms, ${opsPerSec} ops / s`) - series([ - (cb) => ipfs1.pubsub.subscribe(topic, sub1, cb), - (cb) => ipfs2.pubsub.subscribe(topic, sub2, cb), - (cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000, cb) - ], (err) => { - expect(err).to.not.exist() - startTime = new Date().getTime() - - timesSeries(count, (sendCount, cb) => { - const msgData = Buffer.from(msgBase + sendCount) - ipfs2.pubsub.publish(topic, msgData, cb) - }, check) + msgs.forEach(msg => { + expect(msg.from).to.eql(ipfs2.peerId.id) + expect(msg.data.toString().startsWith(msgBase)).to.be.true() }) }) }) diff --git a/src/pubsub/unsubscribe.js b/src/pubsub/unsubscribe.js index a5325cef..6433df2b 100644 --- a/src/pubsub/unsubscribe.js +++ b/src/pubsub/unsubscribe.js @@ -1,10 +1,10 @@ /* eslint-env mocha */ 'use strict' -const eachSeries = require('async/eachSeries') -const timesSeries = require('async/timesSeries') +const { isBrowser, isWebWorker } = require('ipfs-utils/src/env') const { getTopic } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') +const delay = require('../utils/delay') module.exports = (createCommon, options) => { const describe = getDescribe(options) @@ -33,38 +33,34 @@ module.exports = (createCommon, options) => { after((done) => common.teardown(done)) - it('should subscribe and unsubscribe 10 times', (done) => { - const count = 10 + // Browser/worker has max ~5 open HTTP requests to the same origin + const count = isBrowser || isWebWorker ? 5 : 10 + + it(`should subscribe and unsubscribe ${count} times`, async () => { const someTopic = getTopic() + const handlers = Array.from(Array(count), () => msg => {}) - timesSeries(count, (_, cb) => { - const handler = (msg) => {} - ipfs.pubsub.subscribe(someTopic, handler, (err) => cb(err, handler)) - }, (err, handlers) => { - expect(err).to.not.exist() - eachSeries( - handlers, - (handler, cb) => ipfs.pubsub.unsubscribe(someTopic, handler, cb), - (err) => { - expect(err).to.not.exist() - // Assert unsubscribe worked - ipfs.pubsub.ls((err, topics) => { - expect(err).to.not.exist() - expect(topics).to.eql([]) - done() - }) - } - ) - }) + for (let i = 0; i < count; i++) { + await ipfs.pubsub.subscribe(someTopic, handlers[i]) + } + + for (let i = 0; i < count; i++) { + await ipfs.pubsub.unsubscribe(someTopic, handlers[i]) + } + + await delay(100) + const topics = await ipfs.pubsub.ls() + expect(topics).to.eql([]) }) - it('should subscribe 10 handlers and unsunscribe once with no reference to the handlers', async () => { - const count = 10 + it(`should subscribe ${count} handlers and unsunscribe once with no reference to the handlers`, async () => { const someTopic = getTopic() for (let i = 0; i < count; i++) { await ipfs.pubsub.subscribe(someTopic, (msg) => {}) } await ipfs.pubsub.unsubscribe(someTopic) + + await delay(100) const topics = await ipfs.pubsub.ls() expect(topics).to.eql([]) }) diff --git a/src/pubsub/utils.js b/src/pubsub/utils.js index 2f8efde2..f6721c8c 100644 --- a/src/pubsub/utils.js +++ b/src/pubsub/utils.js @@ -1,50 +1,27 @@ 'use strict' const hat = require('hat') +const delay = require('../utils/delay') -function waitForPeers (ipfs, topic, peersToWait, waitForMs, callback) { +async function waitForPeers (ipfs, topic, peersToWait, waitForMs) { const start = Date.now() - const checkPeers = () => { - ipfs.pubsub.peers(topic, (err, peers) => { - if (err) { - return callback(err) - } + while (true) { + const peers = await ipfs.pubsub.peers(topic) + const everyPeerFound = peersToWait.every(p => peers.includes(p)) - const missingPeers = peersToWait - .map((e) => peers.indexOf(e) !== -1) - .filter((e) => !e) - - if (missingPeers.length === 0) { - return callback() - } - - if (Date.now() > start + waitForMs) { - return callback(new Error('Timed out waiting for peers')) - } - - setTimeout(checkPeers, 10) - }) - } - - checkPeers() -} - -exports.waitForPeers = waitForPeers - -function makeCheck (n, done) { - let i = 0 - return (err) => { - if (err) { - return done(err) + if (everyPeerFound) { + return } - if (++i === n) { - done() + if (Date.now() > start + waitForMs) { + throw new Error(`Timed out waiting for peers to be subscribed to "${topic}"`) } + + await delay(10) } } -exports.makeCheck = makeCheck +exports.waitForPeers = waitForPeers exports.getTopic = () => 'pubsub-tests-' + hat()