Skip to content

Commit

Permalink
feat: add pubsub to libp2p
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Feb 16, 2018
1 parent beeb36c commit 0c543b7
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 3 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"homepage": "https://github.com/libp2p/js-libp2p",
"dependencies": {
"async": "^2.6.0",
"libp2p-floodsub": "^0.14.1",
"libp2p-ping": "~0.6.1",
"libp2p-switch": "~0.36.1",
"mafmt": "^4.0.0",
Expand Down
3 changes: 3 additions & 0 deletions src/error-messages.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

exports.NOT_STARTED_YET = 'The libp2p node is not started yet'
25 changes: 22 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const Ping = require('libp2p-ping')
const peerRouting = require('./peer-routing')
const contentRouting = require('./content-routing')
const dht = require('./dht')
const pubsub = require('./pubsub')
const getPeerInfo = require('./get-peer-info')

exports = module.exports
Expand Down Expand Up @@ -89,6 +90,7 @@ class Node extends EventEmitter {
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)
this.pubsub = pubsub(this)

this._getPeerInfo = getPeerInfo(this)

Expand Down Expand Up @@ -149,17 +151,29 @@ class Node extends EventEmitter {
cb()
},
(cb) => {
// TODO: chicken-and-egg problem:
// TODO: chicken-and-egg problem #1:
// have to set started here because DHT requires libp2p is already started
this._isStarted = true
if (this._dht) {
return this._dht.start(cb)
this._dht.start(cb)
} else {
cb()
}
cb()
},
(cb) => {
// TODO: chicken-and-egg problem #2:
// have to set started here because FloodSub requires libp2p is already started
if (this._options !== false) {
this._floodSub.start(cb)
} else {
cb()
}
},

(cb) => {
// detect which multiaddrs we don't have a transport for and remove them
const multiaddrs = this.peerInfo.multiaddrs.toArray()

transports.forEach((transport) => {
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
Expand Down Expand Up @@ -188,6 +202,11 @@ class Node extends EventEmitter {
}

series([
(cb) => {
if (this._floodSub.started) {
this._floodSub.stop(cb)
}
},
(cb) => {
if (this._dht) {
return this._dht.stop(cb)
Expand Down
89 changes: 89 additions & 0 deletions src/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict'

const setImmediate = require('async/setImmediate')
const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET
const FloodSub = require('libp2p-floodsub')

module.exports = (node) => {
const floodSub = new FloodSub(node)

node._floodSub = floodSub

return {
subscribe: (topic, options, handler, callback) => {
if (!node.isStarted()) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}

if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}

function subscribe (cb) {
if (floodSub.listenerCount(topic) === 0) {
floodSub.subscribe(topic)
}

floodSub.pubsub.on(topic, handler)
setImmediate(cb)
}

subscribe(callback)
},

unsubscribe: (topic, handler) => {
floodSub.removeListener(topic, handler)

if (floodSub.listenerCount(topic) === 0) {
floodSub.unsubscribe(topic)
}
},

publish: (topic, data, callback) => {
if (!node.isStarted()) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}

if (!Buffer.isBuffer(data)) {
return setImmediate(() => callback(new Error('data must be a Buffer')))
}

floodSub.publish(topic, data)

setImmediate(() => callback())
},

ls: (callback) => {
if (!node.isStarted()) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}

const subscriptions = Array.from(floodSub.subscriptions)

setImmediate(() => callback(null, subscriptions))
},

peers: (topic, callback) => {
if (!node.isStarted()) {
return setImmediate(() => callback(new Error(NOT_STARTED_YET)))
}

if (typeof topic === 'function') {
callback = topic
topic = null
}

const peers = Array.from(floodSub.peers.values())
.filter((peer) => topic ? peer.topics.has(topic) : true)
.map((peer) => peer.info.id.toB58String())

setImmediate(() => callback(null, peers))
},

setMaxListeners (n) {
return floodSub.setMaxListeners(n)
}
}
}
1 change: 1 addition & 0 deletions test/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require('./base')
require('./transports.node')
require('./stream-muxing.node')
require('./peer-discovery.node')
require('./pubsub.node')
require('./peer-routing.node')
require('./content-routing.node')
require('./circuit-relay.node')
Expand Down
52 changes: 52 additions & 0 deletions test/pubsub.node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 8] */

'use strict'

const chai = require('chai')
chai.use(require('dirty-chai'))
const expect = chai.expect
const parallel = require('async/parallel')
const _times = require('lodash.times')
const utils = require('./utils/node')
const createNode = utils.createNode

describe('.pubsub', () => {
let nodeA
let nodeB

before(function (done) {
this.timeout(5 * 1000)

const tasks = _times(2, () => (cb) => {
createNode('/ip4/0.0.0.0/tcp/0', {
mdns: false,
dht: true
}, (err, node) => {
expect(err).to.not.exist()
node.start((err) => cb(err, node))
})
})

parallel(tasks, (err, nodes) => {
expect(err).to.not.exist()
nodeA = nodes[0]
nodeB = nodes[1]

nodeA.dial(nodeB.peerInfo, done)
})
})

after((done) => {
parallel([
(cb) => nodeA.stop(cb),
(cb) => nodeB.stop(cb)
], done)
})

describe('.pubsub on (default)', () => {
})

describe('.pubsub off', () => {
})
})

0 comments on commit 0c543b7

Please sign in to comment.