diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js new file mode 100644 index 0000000000..a3a4a1a3d1 --- /dev/null +++ b/src/core/components/pubsub.js @@ -0,0 +1,64 @@ +'use strict' + +const promisify = require('promisify-es6') +const Stream = require('stream') + +const FloodSub = require('libp2p-floodsub') + +const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR + +module.exports = function pubsub (self) { + let fsub + + return { + start: promisify((libp2pNode, callback) => { + fsub = new FloodSub(libp2pNode) + callback(null) + }), + + sub: promisify((topic, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + if (!self.isOnline()) { + return callback(OFFLINE_ERROR) + } + + var rs = new Stream() + rs.readable = true + rs._read = () => {} + rs.cancel = () => { + fsub.unsubscribe(topic) + } + + fsub.on(topic, (data) => { + console.log('PUBSUB DATA:', data.toString()) + rs.emit('data', { + data: data.toString(), + topicIDs: [topic] + // these fields are currently missing from message + // (but are present in messages from go-ipfs pubsub) + // from: bs58.encode(message.from), + // seqno: Base64.decode(message.seqno) + }) + }) + + fsub.subscribe(topic) + callback(null, rs) + }), + + pub: promisify((topic, data, callback) => { + if (!self.isOnline()) { + return callback(OFFLINE_ERROR) + } + + const buf = Buffer.isBuffer(data) ? data : new Buffer(data) + + fsub.publish(topic, buf) + callback(null) + }) + + } +} diff --git a/src/core/index.js b/src/core/index.js index a58836c10c..9df64cb3f0 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -23,6 +23,7 @@ const swarm = require('./components/swarm') const ping = require('./components/ping') const files = require('./components/files') const bitswap = require('./components/bitswap') +const pubsub = require('./components/pubsub') exports = module.exports = IPFS @@ -67,4 +68,5 @@ function IPFS (repoInstance) { this.files = files(this) this.bitswap = bitswap(this) this.ping = ping(this) + this.pubsub = pubsub(this) }