Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate gossipsub by default #365

Merged
merged 5 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
const MulticastDNS = require('libp2p-mdns')
const DHT = require('libp2p-kad-dht')
const GossipSub = require('libp2p-gossipsub')
const defaultsDeep = require('@nodeutils/defaults-deep')
const Protector = require('libp2p-pnet')
const DelegatedPeerRouter = require('libp2p-delegated-peer-routing')
Expand Down Expand Up @@ -154,7 +155,8 @@ class Node extends Libp2p {
peerDiscovery: [
MulticastDNS
],
dht: DHT // DHT enables PeerRouting, ContentRouting and DHT itself components
dht: DHT, // DHT enables PeerRouting, ContentRouting and DHT itself components
pubsub: GossipSub
},

// libp2p config options (typically found on a config.json)
Expand Down Expand Up @@ -187,9 +189,8 @@ class Node extends Libp2p {
timeout: 10e3
}
},
// Enable/Disable Experimental features
EXPERIMENTAL: { // Experimental features ("behind a flag")
pubsub: false
pubsub: {
enabled: true
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
"err-code": "^1.1.2",
"fsm-event": "^2.1.0",
"libp2p-connection-manager": "^0.1.0",
"libp2p-floodsub": "^0.16.1",
"libp2p-ping": "^0.8.5",
"libp2p-switch": "^0.42.12",
"libp2p-websockets": "^0.12.2",
Expand All @@ -74,6 +73,8 @@
"libp2p-circuit": "^0.3.7",
"libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "~0.17.0",
"libp2p-gossipsub": "~0.0.4",
"libp2p-kad-dht": "^0.15.3",
"libp2p-mdns": "^0.12.3",
"libp2p-mplex": "^0.8.4",
Expand All @@ -84,6 +85,7 @@
"libp2p-websocket-star": "~0.10.2",
"libp2p-websocket-star-rendezvous": "~0.3.0",
"lodash.times": "^4.3.2",
"merge-options": "^1.0.1",
"nock": "^10.0.6",
"pull-goodbye": "0.0.2",
"pull-mplex": "^0.1.2",
Expand Down
11 changes: 5 additions & 6 deletions src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const modulesSchema = s({
connProtector: s.union(['undefined', s.interface({ protect: 'function' })]),
contentRouting: optional(list(['object'])),
dht: optional(s('null|function|object')),
pubsub: optional(s('null|function|object')),
peerDiscovery: optional(list([s('object|function')])),
peerRouting: optional(list(['object'])),
streamMuxer: optional(list([s('object|function')])),
Expand Down Expand Up @@ -59,12 +60,10 @@ const configSchema = s({
timeout: 10e3
}
}),
// Experimental config
EXPERIMENTAL: s({
pubsub: 'boolean'
}, {
// Experimental defaults
pubsub: false
// Pubsub config
pubsub: s('object?', {
// DHT defaults
enabled: false
})
}, {})

Expand Down
14 changes: 7 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ class Libp2p extends EventEmitter {
})
}

// enable/disable pubsub
if (this._config.EXPERIMENTAL.pubsub) {
this.pubsub = pubsub(this)
// start pubsub
if (this._modules.pubsub && this._config.pubsub.enabled !== false) {
this.pubsub = pubsub(this, this._modules.pubsub)
}

// Attach remaining APIs
Expand Down Expand Up @@ -403,8 +403,8 @@ class Libp2p extends EventEmitter {
}
},
(cb) => {
if (this._floodSub) {
return this._floodSub.start(cb)
if (this.pubsub) {
return this.pubsub.start(cb)
}
cb()
},
Expand Down Expand Up @@ -442,8 +442,8 @@ class Libp2p extends EventEmitter {
)
},
(cb) => {
if (this._floodSub) {
return this._floodSub.stop(cb)
if (this.pubsub) {
return this.pubsub.stop(cb)
}
cb()
},
Expand Down
51 changes: 27 additions & 24 deletions src/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

const nextTick = require('async/nextTick')
const { messages, codes } = require('./errors')
const FloodSub = require('libp2p-floodsub')
const promisify = require('promisify-es6')

const errCode = require('err-code')

module.exports = (node) => {
const floodSub = new FloodSub(node)

node._floodSub = floodSub
module.exports = (node, Pubsub) => {
const pubsub = new Pubsub(node, { emitSelf: true })

return {
/**
Expand Down Expand Up @@ -41,16 +38,16 @@ module.exports = (node) => {
options = {}
}

if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

function subscribe (cb) {
if (floodSub.listenerCount(topic) === 0) {
floodSub.subscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}

floodSub.on(topic, handler)
pubsub.on(topic, handler)
nextTick(cb)
}

Expand Down Expand Up @@ -80,18 +77,18 @@ module.exports = (node) => {
* libp2p.unsubscribe(topic, handler, callback)
*/
unsubscribe: promisify((topic, handler, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!handler) {
floodSub.removeAllListeners(topic)
pubsub.removeAllListeners(topic)
} else {
floodSub.removeListener(topic, handler)
pubsub.removeListener(topic, handler)
}

if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
if (pubsub.listenerCount(topic) === 0) {
pubsub.unsubscribe(topic)
}

if (typeof callback === 'function') {
Expand All @@ -102,29 +99,31 @@ module.exports = (node) => {
}),

publish: promisify((topic, data, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

if (!Buffer.isBuffer(data)) {
return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER'))
try {
data = Buffer.from(data)
} catch (err) {
return nextTick(callback, errCode(new Error('data must be convertible to a Buffer'), 'ERR_DATA_IS_NOT_VALID'))
}
vasco-santos marked this conversation as resolved.
Show resolved Hide resolved

floodSub.publish(topic, data, callback)
pubsub.publish(topic, data, callback)
}),

ls: promisify((callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

const subscriptions = Array.from(floodSub.subscriptions)
const subscriptions = Array.from(pubsub.subscriptions)

nextTick(() => callback(null, subscriptions))
}),

peers: promisify((topic, callback) => {
if (!node.isStarted() && !floodSub.started) {
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}

Expand All @@ -133,15 +132,19 @@ module.exports = (node) => {
topic = null
}

const peers = Array.from(floodSub.peers.values())
const peers = Array.from(pubsub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

nextTick(() => callback(null, peers))
}),

setMaxListeners (n) {
return floodSub.setMaxListeners(n)
}
return pubsub.setMaxListeners(n)
},

start: (cb) => pubsub.start(cb),

stop: (cb) => pubsub.stop(cb)
}
}
12 changes: 6 additions & 6 deletions test/config.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ describe('configuration', () => {
peerDiscovery: {
autoDial: true
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
Expand Down Expand Up @@ -144,8 +144,8 @@ describe('configuration', () => {
enabled: true
}
},
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
dht: {
kBucketSize: 20,
Expand Down Expand Up @@ -269,8 +269,8 @@ describe('configuration', () => {
dht: DHT
},
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
},
peerDiscovery: {
autoDial: true
Expand Down
12 changes: 6 additions & 6 deletions test/create.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ describe('libp2p creation', () => {
it('should be able to start and stop successfully', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: true
pubsub: {
enabled: true
},
dht: {
enabled: true
Expand All @@ -32,7 +32,7 @@ describe('libp2p creation', () => {
const sw = node._switch
const cm = node.connectionManager
const dht = node._dht
const pub = node._floodSub
const pub = node.pubsub

sinon.spy(sw, 'start')
sinon.spy(cm, 'start')
Expand Down Expand Up @@ -77,13 +77,13 @@ describe('libp2p creation', () => {
it('should not create disabled modules', (done) => {
createNode([], {
config: {
EXPERIMENTAL: {
pubsub: false
pubsub: {
enabled: false
}
}
}, (err, node) => {
expect(err).to.not.exist()
expect(node._floodSub).to.not.exist()
expect(node._pubsub).to.not.exist()
done()
})
})
Expand Down
Loading