Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

WIP: feat/pubsub \o/ #610

Merged
merged 10 commits into from
Dec 8, 2016
10 changes: 6 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"form-data": "^2.1.2",
"fs-pull-blob-store": "^0.4.1",
"gulp": "^3.9.1",
"interface-ipfs-core": "^0.22.0",
"interface-ipfs-core": "git+https://github.com/ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f",
"left-pad": "^1.1.3",
"lodash": "^4.17.2",
"ncp": "^2.0.0",
Expand All @@ -80,7 +80,7 @@
"hapi": "^16.0.0",
"hapi-set-header": "^1.0.2",
"idb-pull-blob-store": "^0.5.1",
"ipfs-api": "^12.0.0",
"ipfs-api": "git+https://github.com/ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931",
"ipfs-bitswap": "^0.8.1",
"ipfs-block": "^0.5.0",
"ipfs-block-service": "^0.7.0",
Expand All @@ -91,8 +91,9 @@
"ipld-resolver": "^0.3.0",
"isstream": "^0.1.2",
"joi": "^10.0.1",
"libp2p-ipfs-nodejs": "^0.16.1",
"libp2p-floodsub": "0.3.1",
"libp2p-ipfs-browser": "^0.17.0",
"libp2p-ipfs-nodejs": "^0.16.1",
"lodash.flatmap": "^4.5.0",
"lodash.get": "^4.4.2",
"lodash.has": "^4.5.2",
Expand All @@ -102,6 +103,7 @@
"mafmt": "^2.1.2",
"multiaddr": "^2.1.1",
"multihashes": "^0.3.0",
"ndjson": "1.5.0",
"path-exists": "^3.0.0",
"peer-book": "^0.3.0",
"peer-id": "^0.8.0",
Expand Down Expand Up @@ -149,4 +151,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
17 changes: 17 additions & 0 deletions src/cli/commands/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict'

// The command count bump from 56 to 60 depends on:
// ipfs/interface-ipfs-core.git#5c7df414a8f627f8adb50a52ef8d2b629381285f
// ipfs/js-ipfs-api.git#01044a1f59fb866e4e08b06aae4e74d968615931
module.exports = {
command: 'pubsub',

description: 'pubsub commands',

builder (yargs) {
return yargs
.commandDir('pubsub')
},

handler (argv) {}
}
30 changes: 30 additions & 0 deletions src/cli/commands/pubsub/ls.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'ls',

describe: 'Get your list of subscriptions',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.ls((err, subscriptions) => {
if (err) {
throw err
}

console.log(JSON.stringify(subscriptions, null, 2))
})
})
}
}
30 changes: 30 additions & 0 deletions src/cli/commands/pubsub/peers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'peers <topic>',

describe: 'Get all peers subscribed to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.peers(argv.topic, (err, peers) => {
if (err) {
throw err
}

console.log(JSON.stringify(peers, null, 2))
})
})
}
}
28 changes: 28 additions & 0 deletions src/cli/commands/pubsub/publish.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'publish <topic> <data>',

describe: 'Publish data to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.publish(argv.topic, argv.data, (err) => {
if (err) {
throw err
}
})
})
}
}
32 changes: 32 additions & 0 deletions src/cli/commands/pubsub/subscribe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const utils = require('../../utils')
const debug = require('debug')
const log = debug('cli:pubsub')
log.error = debug('cli:pubsub:error')

module.exports = {
command: 'subscribe <topic>',

alias: 'sub',

describe: 'Subscribe to a topic',

builder: {},

handler (argv) {
utils.getIPFS((err, ipfs) => {
if (err) {
throw err
}

ipfs.pubsub.subscribe(argv.topic, (err, stream) => {
if (err) {
throw err
}

console.log(stream.toString())
})
})
}
}
6 changes: 6 additions & 0 deletions src/core/components/go-online.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const series = require('async/series')
const Bitswap = require('ipfs-bitswap')
const FloodSub = require('libp2p-floodsub')

module.exports = function goOnline (self) {
return (cb) => {
Expand All @@ -21,6 +22,11 @@ module.exports = function goOnline (self) {
)
self._bitswap.start()
self._blockService.goOnline(self._bitswap)

//
self._pubsub = new FloodSub(self._libp2pNode)
//

cb()
})
}
Expand Down
131 changes: 131 additions & 0 deletions src/core/components/pubsub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
'use strict'

const promisify = require('promisify-es6')
const Readable = require('stream').Readable
const _values = require('lodash.values')

const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR

let subscriptions = {}

const addSubscription = (topic, request, stream) => {
subscriptions[topic] = { request: request, stream: stream }
}

const removeSubscription = promisify((topic, callback) => {
if (!subscriptions[topic]) {
return callback(new Error(`Not subscribed to ${topic}`))
}

subscriptions[topic].stream.emit('end')
delete subscriptions[topic]

if (callback) {
callback(null)
}
})

module.exports = function pubsub (self) {
return {
subscribe: promisify((topic, options, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (typeof options === 'function') {
callback = options
options = {}
}

if (subscriptions[topic]) {
return callback(`Error: Already subscribed to '${topic}'`)
}

const stream = new Readable({ objectMode: true })

stream._read = () => {}

// There is no explicit unsubscribe; subscriptions have a cancel event
stream.cancel = promisify((cb) => {
self._pubsub.unsubscribe(topic)
removeSubscription(topic, cb)
})

self._pubsub.on(topic, (data) => {
stream.emit('data', {
data: data.toString(),
topicIDs: [topic]
})
})

try {
self._pubsub.subscribe(topic)
} catch (err) {
return callback(err)
}

// Add the request to the active subscriptions and return the stream
addSubscription(topic, null, stream)
callback(null, stream)
}),

publish: promisify((topic, data, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

const buf = Buffer.isBuffer(data) ? data : new Buffer(data)

try {
self._pubsub.publish(topic, buf)
} catch (err) {
return callback(err)
}

callback(null)
}),

ls: promisify((callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

let subscriptions = []

try {
subscriptions = self._pubsub.getSubscriptions()
} catch (err) {
return callback(err)
}

callback(null, subscriptions)
}),

peers: promisify((topic, callback) => {
if (!self.isOnline()) {
throw OFFLINE_ERROR
}

if (!subscriptions[topic]) {
return callback(`Error: Not subscribed to '${topic}'`)
}

let peers = []

try {
const peerSet = self._pubsub.getPeerSet()
_values(peerSet).forEach((peer) => {
const idB58Str = peer.peerInfo.id.toB58String()
const index = peer.topics.indexOf(topic)
if (index > -1) {
peers.push(idB58Str)
}
})
} catch (err) {
return callback(err)
}

callback(null, peers)
})
}
}
3 changes: 3 additions & 0 deletions src/core/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const swarm = require('./components/swarm')
const ping = require('./components/ping')
const files = require('./components/files')
const bitswap = require('./components/bitswap')
const pubsub = require('./components/pubsub')

exports = module.exports = IPFS

Expand All @@ -44,6 +45,7 @@ function IPFS (repoInstance) {
this._bitswap = null
this._blockService = new BlockService(this._repo)
this._ipldResolver = new IPLDResolver(this._blockService)
this._pubsub = null

// IPFS Core exposed components

Expand All @@ -67,4 +69,5 @@ function IPFS (repoInstance) {
this.files = files(this)
this.bitswap = bitswap(this)
this.ping = ping(this)
this.pubsub = pubsub(this)
}
1 change: 1 addition & 0 deletions src/http-api/resources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ exports.block = require('./block')
exports.swarm = require('./swarm')
exports.bitswap = require('./bitswap')
exports.files = require('./files')
exports.pubsub = require('./pubsub')
Loading