Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

fix: reduce the number of concurrent requests in browser #505

Merged
merged 3 commits into from
Aug 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"ipld-dag-pb": "~0.17.3",
"is-ipfs": "~0.6.1",
"is-plain-object": "^3.0.0",
"it-pushable": "^1.2.1",
"libp2p-crypto": "~0.16.0",
"multiaddr": "^6.0.0",
"multibase": "~0.6.0",
Expand All @@ -63,6 +64,7 @@
"pull-stream": "^3.6.11",
"pump": "^3.0.0",
"readable-stream": "^3.1.1",
"streaming-iterables": "^4.1.0",
"through2": "^3.0.0"
},
"devDependencies": {
Expand Down
60 changes: 26 additions & 34 deletions src/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* eslint-env mocha */
'use strict'

const eachSeries = require('async/eachSeries')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const delay = require('../utils/delay')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -14,6 +14,7 @@ module.exports = (createCommon, options) => {
this.timeout(80 * 1000)

let ipfs
let subscribedTopics = []

before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the
Expand All @@ -30,33 +31,32 @@ module.exports = (createCommon, options) => {
})
})

afterEach(async () => {
for (let i = 0; i < subscribedTopics.length; i++) {
await ipfs.pubsub.unsubscribe(subscribedTopics[i])
}
subscribedTopics = []
await delay(100)
})

after((done) => common.teardown(done))

it('should return an empty list when no topics are subscribed', (done) => {
ipfs.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics.length).to.equal(0)
done()
})
it('should return an empty list when no topics are subscribed', async () => {
const topics = await ipfs.pubsub.ls()
expect(topics.length).to.equal(0)
})

it('should return a list with 1 subscribed topic', (done) => {
const sub1 = (msg) => {}
it('should return a list with 1 subscribed topic', async () => {
const sub1 = () => {}
const topic = getTopic()
subscribedTopics = [topic]

ipfs.pubsub.subscribe(topic, sub1, (err) => {
expect(err).to.not.exist()

ipfs.pubsub.ls((err, topics) => {
expect(err).to.not.exist()
expect(topics).to.be.eql([topic])

ipfs.pubsub.unsubscribe(topic, sub1, done)
})
})
await ipfs.pubsub.subscribe(topic, sub1)
const topics = await ipfs.pubsub.ls()
expect(topics).to.be.eql([topic])
})

it('should return a list with 3 subscribed topics', (done) => {
it('should return a list with 3 subscribed topics', async () => {
const topics = [{
name: 'one',
handler () {}
Expand All @@ -68,22 +68,14 @@ module.exports = (createCommon, options) => {
handler () {}
}]

eachSeries(topics, (t, cb) => {
ipfs.pubsub.subscribe(t.name, t.handler, cb)
}, (err) => {
expect(err).to.not.exist()
subscribedTopics = topics.map(t => t.name)

ipfs.pubsub.ls((err, list) => {
expect(err).to.not.exist()
for (let i = 0; i < topics.length; i++) {
await ipfs.pubsub.subscribe(topics[i].name, topics[i].handler)
}

expect(list.sort())
.to.eql(topics.map((t) => t.name).sort())

eachSeries(topics, (t, cb) => {
ipfs.pubsub.unsubscribe(t.name, t.handler, cb)
}, done)
})
})
const list = await ipfs.pubsub.ls()
expect(list.sort()).to.eql(topics.map(t => t.name).sort())
})
})
}
121 changes: 51 additions & 70 deletions src/pubsub/peers.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
'use strict'

const parallel = require('async/parallel')
const series = require('async/series')
const { spawnNodesWithId } = require('../utils/spawn')
const { waitForPeers, getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
const { connect } = require('../utils/swarm')
const delay = require('../utils/delay')

module.exports = (createCommon, options) => {
const describe = getDescribe(options)
Expand All @@ -19,6 +19,7 @@ module.exports = (createCommon, options) => {
let ipfs1
let ipfs2
let ipfs3
let subscribedTopics = []

before(function (done) {
// CI takes longer to instantiate the daemon, so we need to increase the
Expand All @@ -40,6 +41,16 @@ module.exports = (createCommon, options) => {
})
})

afterEach(async () => {
const nodes = [ipfs1, ipfs2, ipfs3]
for (let i = 0; i < subscribedTopics.length; i++) {
const topic = subscribedTopics[i]
await Promise.all(nodes.map(ipfs => ipfs.pubsub.unsubscribe(topic)))
}
subscribedTopics = []
await delay(100)
})

after((done) => common.teardown(done))

before((done) => {
Expand All @@ -52,94 +63,64 @@ module.exports = (createCommon, options) => {
], done)
})

it('should not error when not subscribed to a topic', (done) => {
it('should not error when not subscribed to a topic', async () => {
const topic = getTopic()
ipfs1.pubsub.peers(topic, (err, peers) => {
expect(err).to.not.exist()
// Should be empty() but as mentioned below go-ipfs returns more than it should
// expect(peers).to.be.empty()

done()
})
const peers = await ipfs1.pubsub.peers(topic)
expect(peers).to.exist()
// Should be empty() but as mentioned below go-ipfs returns more than it should
// expect(peers).to.be.empty()
})

it('should not return extra peers', (done) => {
it('should not return extra peers', async () => {
// Currently go-ipfs returns peers that have not been
// subscribed to the topic. Enable when go-ipfs has been fixed
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
const sub1 = () => {}
const sub2 = () => {}
const sub3 = () => {}

const topic = getTopic()
const topicOther = topic + 'different topic'

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topicOther, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topicOther, sub3, cb)
], (err) => {
expect(err).to.not.exist()

ipfs1.pubsub.peers(topic, (err, peers) => {
expect(err).to.not.exist()
expect(peers).to.be.empty()

parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.unsubscribe(topicOther, sub2, cb),
(cb) => ipfs3.pubsub.unsubscribe(topicOther, sub3, cb)
], done)
})
})
subscribedTopics = [topic, topicOther]

await ipfs1.pubsub.subscribe(topic, sub1)
await ipfs2.pubsub.subscribe(topicOther, sub2)
await ipfs3.pubsub.subscribe(topicOther, sub3)

const peers = await ipfs1.pubsub.peers(topic)
expect(peers).to.be.empty()
})

it('should return peers for a topic - one peer', (done) => {
it('should return peers for a topic - one peer', async () => {
// Currently go-ipfs returns peers that have not been
// subscribed to the topic. Enable when go-ipfs has been fixed
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
const sub1 = () => {}
const sub2 = () => {}
const sub3 = () => {}
const topic = getTopic()

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
(cb) => waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000, cb)
], (err) => {
expect(err).to.not.exist()

parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb)
], done)
})
subscribedTopics = [topic]

await ipfs1.pubsub.subscribe(topic, sub1)
await ipfs2.pubsub.subscribe(topic, sub2)
await ipfs3.pubsub.subscribe(topic, sub3)

await waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000)
})

it('should return peers for a topic - multiple peers', (done) => {
const sub1 = (msg) => {}
const sub2 = (msg) => {}
const sub3 = (msg) => {}
it('should return peers for a topic - multiple peers', async () => {
const sub1 = () => {}
const sub2 = () => {}
const sub3 = () => {}
const topic = getTopic()

series([
(cb) => ipfs1.pubsub.subscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.subscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.subscribe(topic, sub3, cb),
(cb) => waitForPeers(ipfs1, topic, [
ipfs2.peerId.id,
ipfs3.peerId.id
], 30000, cb)
], (err) => {
expect(err).to.not.exist()

parallel([
(cb) => ipfs1.pubsub.unsubscribe(topic, sub1, cb),
(cb) => ipfs2.pubsub.unsubscribe(topic, sub2, cb),
(cb) => ipfs3.pubsub.unsubscribe(topic, sub3, cb)
], done)
})
subscribedTopics = [topic]

await ipfs1.pubsub.subscribe(topic, sub1)
await ipfs2.pubsub.subscribe(topic, sub2)
await ipfs3.pubsub.subscribe(topic, sub3)

await waitForPeers(ipfs1, topic, [ipfs2.peerId.id, ipfs3.peerId.id], 30000)
})
})
}
24 changes: 13 additions & 11 deletions src/pubsub/publish.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-env mocha */
'use strict'

const timesSeries = require('async/timesSeries')
const hat = require('hat')
const { getTopic } = require('./utils')
const { getDescribe, getIt, expect } = require('../utils/mocha')
Expand Down Expand Up @@ -33,26 +32,29 @@ module.exports = (createCommon, options) => {

after((done) => common.teardown(done))

it('should error on string messags', (done) => {
it('should error on string messags', async () => {
const topic = getTopic()
ipfs.pubsub.publish(topic, 'hello friend', (err) => {
try {
await ipfs.pubsub.publish(topic, 'hello friend')
} catch (err) {
expect(err).to.exist()
done()
})
return
}
throw new Error('did not error on string message')
})

it('should publish message from buffer', (done) => {
it('should publish message from buffer', () => {
const topic = getTopic()
ipfs.pubsub.publish(topic, Buffer.from(hat()), done)
return ipfs.pubsub.publish(topic, Buffer.from(hat()))
})

it('should publish 10 times within time limit', (done) => {
it('should publish 10 times within time limit', async () => {
const count = 10
const topic = getTopic()

timesSeries(count, (_, cb) => {
ipfs.pubsub.publish(topic, Buffer.from(hat()), cb)
}, done)
for (let i = 0; i < count; i++) {
await ipfs.pubsub.publish(topic, Buffer.from(hat()))
}
})
})
}
Loading