diff --git a/package.json b/package.json index 17601fe..602ad2d 100644 --- a/package.json +++ b/package.json @@ -54,6 +54,7 @@ }, "dependencies": { "async": "^2.6.0", + "bs58": "^4.0.1", "debug": "^3.1.0", "length-prefixed-stream": "^1.5.1", "libp2p-crypto": "~0.10.3", diff --git a/src/index.js b/src/index.js index 641560b..265c0e0 100644 --- a/src/index.js +++ b/src/index.js @@ -59,10 +59,45 @@ class FloodSub extends EventEmitter { this._dialPeer = this._dialPeer.bind(this) } + _addPeer (peer) { + const id = peer.info.id.toB58String() + + /* + Always use an existing peer. + + What is happening here is: "If the other peer has already dialed to me, we already have + an establish link between the two, what might be missing is a + Connection specifically between me and that Peer" + */ + let existing = this.peers.get(id) + if (existing) { + log('already existing peer', id) + ++existing._references + } else { + log('new peer', id) + this.peers.set(id, peer) + existing = peer + } + + return existing + } + + _removePeer (peer) { + const id = peer.info.id.toB58String() + + log('remove', id, peer._references) + // Only delete when no one else is referencing this peer. + if (--peer._references === 0) { + log('delete peer', id) + this.peers.delete(id) + } + + return peer + } + _dialPeer (peerInfo, callback) { callback = callback || function noop () {} const idB58Str = peerInfo.id.toB58String() - log('dialing %s', idB58Str) // If already have a PubSub conn, ignore const peer = this.peers.get(idB58Str) @@ -70,6 +105,7 @@ class FloodSub extends EventEmitter { return setImmediate(() => callback()) } + log('dialing %s', idB58Str) this.libp2p.dial(peerInfo, multicodec, (err, conn) => { if (err) { log.err(err) @@ -82,13 +118,9 @@ class FloodSub extends EventEmitter { _onDial (peerInfo, conn, callback) { const idB58Str = peerInfo.id.toB58String() + log('connected', idB58Str) - // If already had a dial to me, just add the conn - if (!this.peers.has(idB58Str)) { - this.peers.set(idB58Str, new Peer(peerInfo)) - } - - const peer = this.peers.get(idB58Str) + const peer = this._addPeer(new Peer(peerInfo)) peer.attachConnection(conn) // Immediately send my own subscriptions to the newly established conn @@ -104,24 +136,20 @@ class FloodSub extends EventEmitter { } const idB58Str = peerInfo.id.toB58String() + const peer = this._addPeer(new Peer(peerInfo)) - if (!this.peers.has(idB58Str)) { - log('new peer', idB58Str) - this.peers.set(idB58Str, new Peer(peerInfo)) - } - - this._processConnection(idB58Str, conn) + this._processConnection(idB58Str, conn, peer) }) } - _processConnection (idB58Str, conn) { + _processConnection (idB58Str, conn, peer) { pull( conn, lp.decode(), pull.map((data) => pb.rpc.RPC.decode(data)), pull.drain( (rpc) => this._onRpc(idB58Str, rpc), - (err) => this._onConnectionEnd(idB58Str, err) + (err) => this._onConnectionEnd(idB58Str, peer, err) ) ) } @@ -131,11 +159,12 @@ class FloodSub extends EventEmitter { return } + log('rpc from', idB58Str) const subs = rpc.subscriptions const msgs = rpc.msgs if (msgs && msgs.length) { - this._processRpcMessages(rpc.msgs) + this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs)) } if (subs && subs.length) { @@ -164,13 +193,14 @@ class FloodSub extends EventEmitter { }) } - _onConnectionEnd (idB58Str, err) { + _onConnectionEnd (idB58Str, peer, err) { // socket hang up, means the one side canceled if (err && err.message !== 'socket hang up') { log.err(err) } - this.peers.delete(idB58Str) + log('connection ended', idB58Str, err ? err.message : '') + this._removePeer(peer) } _emitMessages (topics, messages) { @@ -191,7 +221,7 @@ class FloodSub extends EventEmitter { return } - peer.sendMessages(messages) + peer.sendMessages(utils.normalizeOutRpcMessages(messages)) log('publish msgs on topics', topics, peer.info.id.toB58String()) }) @@ -241,11 +271,15 @@ class FloodSub extends EventEmitter { this.libp2p.unhandle(multicodec) this.libp2p.removeListener('peer:connect', this._dialPeer) + log('stopping') asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => { if (err) { return callback(err) } + + log('stopped') this.peers = new Map() + this.subscriptions = new Set() this.started = false callback() }) @@ -287,7 +321,7 @@ class FloodSub extends EventEmitter { this._emitMessages(topics, msgObjects) // send to all the other peers - this._forwardMessages(topics, messages.map(buildMessage)) + this._forwardMessages(topics, msgObjects) } /** @@ -303,14 +337,18 @@ class FloodSub extends EventEmitter { topics.forEach((topic) => this.subscriptions.add(topic)) - this.peers.forEach((peer) => checkIfReady(peer)) + this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer)) // make sure that FloodSub is already mounted - function checkIfReady (peer) { + function sendSubscriptionsOnceReady (peer) { if (peer && peer.isWritable) { - peer.sendSubscriptions(topics) - } else { - setImmediate(checkIfReady.bind(peer)) + return peer.sendSubscriptions(topics) + } + const onConnection = () => { + peer.removeListener('connection', onConnection) + sendSubscriptionsOnceReady(peer) } + peer.on('connection', onConnection) + peer.once('close', () => peer.removeListener('connection', onConnection)) } } @@ -321,7 +359,11 @@ class FloodSub extends EventEmitter { * @returns {undefined} */ unsubscribe (topics) { - assert(this.started, 'FloodSub is not started') + // Avoid race conditions, by quietly ignoring unsub when shutdown. + if (!this.started) { + return + } + topics = ensureArray(topics) topics.forEach((topic) => this.subscriptions.delete(topic)) diff --git a/src/message/rpc.proto.js b/src/message/rpc.proto.js index 99c306e..50eb950 100644 --- a/src/message/rpc.proto.js +++ b/src/message/rpc.proto.js @@ -10,9 +10,9 @@ message RPC { } message Message { - optional string from = 1; + optional bytes from = 1; optional bytes data = 2; optional bytes seqno = 3; - repeated string topicIDs = 4; // CID of topic descriptor object + repeated string topicIDs = 4; } }` diff --git a/src/peer.js b/src/peer.js index 38314cd..d6edcf6 100644 --- a/src/peer.js +++ b/src/peer.js @@ -4,17 +4,20 @@ const lp = require('pull-length-prefixed') const Pushable = require('pull-pushable') const pull = require('pull-stream') const setImmediate = require('async/setImmediate') +const EventEmitter = require('events') const rpc = require('./message').rpc.RPC /** * The known state of a connected peer. */ -class Peer { +class Peer extends EventEmitter { /** * @param {PeerInfo} info */ constructor (info) { + super() + /** * @type {PeerInfo} */ @@ -31,6 +34,8 @@ class Peer { * @type {Pushable} */ this.stream = null + + this._references = 1 } /** @@ -80,8 +85,15 @@ class Peer { pull( this.stream, lp.encode(), - conn + conn, + pull.onEnd(() => { + this.conn = null + this.stream = null + this.emit('close') + }) ) + + this.emit('connection') } _sendRawSubscriptions (topics, subscribe) { @@ -155,16 +167,18 @@ class Peer { * @returns {undefined} */ close (callback) { - if (!this.conn || !this.stream) { - // no connection to close - } - // end the pushable pull-stream + // Force removal of peer + this._references = 1 + + // End the pushable if (this.stream) { this.stream.end() } + setImmediate(() => { this.conn = null this.stream = null + this.emit('close') callback() }) } diff --git a/src/utils.js b/src/utils.js index 33d7629..d4def69 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,6 +1,7 @@ 'use strict' const crypto = require('libp2p-crypto') +const bs58 = require('bs58') exports = module.exports @@ -66,3 +67,29 @@ exports.ensureArray = (maybeArray) => { return maybeArray } + +exports.normalizeInRpcMessages = (messages) => { + if (!messages) { + return messages + } + return messages.map((msg) => { + const m = Object.assign({}, msg) + if (Buffer.isBuffer(msg.from)) { + m.from = bs58.encode(msg.from) + } + return m + }) +} + +exports.normalizeOutRpcMessages = (messages) => { + if (!messages) { + return messages + } + return messages.map((msg) => { + const m = Object.assign({}, msg) + if (typeof msg.from === 'string' || msg.from instanceof String) { + m.from = bs58.decode(msg.from) + } + return m + }) +} diff --git a/test/2-nodes.js b/test/2-nodes.js index bc14734..3c4a827 100644 --- a/test/2-nodes.js +++ b/test/2-nodes.js @@ -142,6 +142,30 @@ describe('basics between 2 nodes', () => { _times(10, () => fsB.publish('Z', new Buffer('banana'))) }) + it('Publish 10 msg to a topic:Z in nodeB as array', (done) => { + let counter = 0 + + fsB.once('Z', shouldNotHappen) + + fsA.on('Z', receivedMsg) + + function receivedMsg (msg) { + expect(msg.data.toString()).to.equal('banana') + expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String()) + expect(Buffer.isBuffer(msg.seqno)).to.be.true() + expect(msg.topicIDs).to.be.eql(['Z']) + + if (++counter === 10) { + fsA.removeListener('Z', receivedMsg) + done() + } + } + + let msgs = [] + _times(10, () => msgs.push(new Buffer('banana'))) + fsB.publish('Z', msgs) + }) + it('Unsubscribe from topic:Z in nodeA', (done) => { fsA.unsubscribe('Z') expect(fsA.subscriptions.size).to.equal(0) @@ -291,11 +315,11 @@ describe('basics between 2 nodes', () => { nodeA.dial(nodeB.peerInfo, (err) => { expect(err).to.not.exist() setTimeout(() => { - expect(fsA.peers.size).to.equal(1) - expect(fsB.peers.size).to.equal(1) + expect(first(fsA.peers)._references).to.equal(2) + expect(first(fsB.peers)._references).to.equal(2) fsA.stop(() => setTimeout(() => { - expect(fsB.peers.size).to.equal(0) + expect(first(fsB.peers)._references).to.equal(1) done() }, 250)) }, 1000) diff --git a/test/multiple-nodes.js b/test/multiple-nodes.js index a462dfd..50b210b 100644 --- a/test/multiple-nodes.js +++ b/test/multiple-nodes.js @@ -135,6 +135,30 @@ describe('multiple nodes (more than 2)', () => { } }) + it('publish array on node a', (done) => { + let counter = 0 + + a.ps.on('Z', incMsg) + b.ps.on('Z', incMsg) + c.ps.on('Z', incMsg) + + a.ps.publish('Z', [Buffer.from('hey'), Buffer.from('hey')]) + + function incMsg (msg) { + expect(msg.data.toString()).to.equal('hey') + check() + } + + function check () { + if (++counter === 6) { + a.ps.removeListener('Z', incMsg) + b.ps.removeListener('Z', incMsg) + c.ps.removeListener('Z', incMsg) + done() + } + } + }) + // since the topology is the same, just the publish // gets sent by other peer, we reused the same peers describe('1 level tree', () => { diff --git a/test/utils.spec.js b/test/utils.spec.js index 09b4857..5004dc8 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -40,4 +40,32 @@ describe('utils', () => { expect(utils.ensureArray('hello')).to.be.eql(['hello']) expect(utils.ensureArray([1, 2])).to.be.eql([1, 2]) }) + + it('converts an IN msg.from to b58', () => { + let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') + let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const m = [ + { from: binaryId }, + { from: stringId } + ] + const expected = [ + { from: stringId }, + { from: stringId } + ] + expect(utils.normalizeInRpcMessages(m)).to.deep.eql(expected) + }) + + it('converts an OUT msg.from to binary', () => { + let binaryId = Buffer.from('1220e2187eb3e6c4fb3e7ff9ad4658610624a6315e0240fc6f37130eedb661e939cc', 'hex') + let stringId = 'QmdZEWgtaWAxBh93fELFT298La1rsZfhiC2pqwMVwy3jZM' + const m = [ + { from: binaryId }, + { from: stringId } + ] + const expected = [ + { from: binaryId }, + { from: binaryId } + ] + expect(utils.normalizeOutRpcMessages(m)).to.deep.eql(expected) + }) })