Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
fix: various floodsub issues (#51)
Browse files Browse the repository at this point in the history
* fix: startup

* Revert "fix: Published message field names (#49)"

This reverts commit b8f66cd.

* fix: close should always invoke callback

* fix: avoid race conditions, by quietly ignoring unsub when shutdown

* fix: only delete peer when connections match

* fix: lint errors

* fix: more work on connection shutdown

* fix: RPC msg.from is now binary

* fix: lint errors

* fix: multiple connections to/from same peer

This implements a refernce counting scheme.  See #51 (comment)

One test is still failing.

* fix: topicCIDs => topicIDs

* test: can not get this test to work!

* fix: lint errors

* fix: review changes

* fix: lint errors on a comment, this going too far

* test: get test working

* test: add tests for publishing an array of messages

* subscribe: polling for connectivity replaced by handling peer connection event

* fix typo
  • Loading branch information
richardschneider authored and daviddias committed Nov 22, 2017
1 parent dca9b1e commit 45c9b11
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 37 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
},
"dependencies": {
"async": "^2.6.0",
"bs58": "^4.0.1",
"debug": "^3.1.0",
"length-prefixed-stream": "^1.5.1",
"libp2p-crypto": "~0.10.3",
Expand Down
94 changes: 68 additions & 26 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,53 @@ class FloodSub extends EventEmitter {
this._dialPeer = this._dialPeer.bind(this)
}

_addPeer (peer) {
const id = peer.info.id.toB58String()

/*
Always use an existing peer.
What is happening here is: "If the other peer has already dialed to me, we already have
an establish link between the two, what might be missing is a
Connection specifically between me and that Peer"
*/
let existing = this.peers.get(id)
if (existing) {
log('already existing peer', id)
++existing._references
} else {
log('new peer', id)
this.peers.set(id, peer)
existing = peer
}

return existing
}

_removePeer (peer) {
const id = peer.info.id.toB58String()

log('remove', id, peer._references)
// Only delete when no one else is referencing this peer.
if (--peer._references === 0) {
log('delete peer', id)
this.peers.delete(id)
}

return peer
}

_dialPeer (peerInfo, callback) {
callback = callback || function noop () {}
const idB58Str = peerInfo.id.toB58String()
log('dialing %s', idB58Str)

// If already have a PubSub conn, ignore
const peer = this.peers.get(idB58Str)
if (peer && peer.isConnected) {
return setImmediate(() => callback())
}

log('dialing %s', idB58Str)
this.libp2p.dial(peerInfo, multicodec, (err, conn) => {
if (err) {
log.err(err)
Expand All @@ -82,13 +118,9 @@ class FloodSub extends EventEmitter {

_onDial (peerInfo, conn, callback) {
const idB58Str = peerInfo.id.toB58String()
log('connected', idB58Str)

// If already had a dial to me, just add the conn
if (!this.peers.has(idB58Str)) {
this.peers.set(idB58Str, new Peer(peerInfo))
}

const peer = this.peers.get(idB58Str)
const peer = this._addPeer(new Peer(peerInfo))
peer.attachConnection(conn)

// Immediately send my own subscriptions to the newly established conn
Expand All @@ -104,24 +136,20 @@ class FloodSub extends EventEmitter {
}

const idB58Str = peerInfo.id.toB58String()
const peer = this._addPeer(new Peer(peerInfo))

if (!this.peers.has(idB58Str)) {
log('new peer', idB58Str)
this.peers.set(idB58Str, new Peer(peerInfo))
}

this._processConnection(idB58Str, conn)
this._processConnection(idB58Str, conn, peer)
})
}

_processConnection (idB58Str, conn) {
_processConnection (idB58Str, conn, peer) {
pull(
conn,
lp.decode(),
pull.map((data) => pb.rpc.RPC.decode(data)),
pull.drain(
(rpc) => this._onRpc(idB58Str, rpc),
(err) => this._onConnectionEnd(idB58Str, err)
(err) => this._onConnectionEnd(idB58Str, peer, err)
)
)
}
Expand All @@ -131,11 +159,12 @@ class FloodSub extends EventEmitter {
return
}

log('rpc from', idB58Str)
const subs = rpc.subscriptions
const msgs = rpc.msgs

if (msgs && msgs.length) {
this._processRpcMessages(rpc.msgs)
this._processRpcMessages(utils.normalizeInRpcMessages(rpc.msgs))
}

if (subs && subs.length) {
Expand Down Expand Up @@ -164,13 +193,14 @@ class FloodSub extends EventEmitter {
})
}

_onConnectionEnd (idB58Str, err) {
_onConnectionEnd (idB58Str, peer, err) {
// socket hang up, means the one side canceled
if (err && err.message !== 'socket hang up') {
log.err(err)
}

this.peers.delete(idB58Str)
log('connection ended', idB58Str, err ? err.message : '')
this._removePeer(peer)
}

_emitMessages (topics, messages) {
Expand All @@ -191,7 +221,7 @@ class FloodSub extends EventEmitter {
return
}

peer.sendMessages(messages)
peer.sendMessages(utils.normalizeOutRpcMessages(messages))

log('publish msgs on topics', topics, peer.info.id.toB58String())
})
Expand Down Expand Up @@ -241,11 +271,15 @@ class FloodSub extends EventEmitter {
this.libp2p.unhandle(multicodec)
this.libp2p.removeListener('peer:connect', this._dialPeer)

log('stopping')
asyncEach(this.peers.values(), (peer, cb) => peer.close(cb), (err) => {
if (err) {
return callback(err)
}

log('stopped')
this.peers = new Map()
this.subscriptions = new Set()
this.started = false
callback()
})
Expand Down Expand Up @@ -287,7 +321,7 @@ class FloodSub extends EventEmitter {
this._emitMessages(topics, msgObjects)

// send to all the other peers
this._forwardMessages(topics, messages.map(buildMessage))
this._forwardMessages(topics, msgObjects)
}

/**
Expand All @@ -303,14 +337,18 @@ class FloodSub extends EventEmitter {

topics.forEach((topic) => this.subscriptions.add(topic))

this.peers.forEach((peer) => checkIfReady(peer))
this.peers.forEach((peer) => sendSubscriptionsOnceReady(peer))
// make sure that FloodSub is already mounted
function checkIfReady (peer) {
function sendSubscriptionsOnceReady (peer) {
if (peer && peer.isWritable) {
peer.sendSubscriptions(topics)
} else {
setImmediate(checkIfReady.bind(peer))
return peer.sendSubscriptions(topics)
}
const onConnection = () => {
peer.removeListener('connection', onConnection)
sendSubscriptionsOnceReady(peer)
}
peer.on('connection', onConnection)
peer.once('close', () => peer.removeListener('connection', onConnection))
}
}

Expand All @@ -321,7 +359,11 @@ class FloodSub extends EventEmitter {
* @returns {undefined}
*/
unsubscribe (topics) {
assert(this.started, 'FloodSub is not started')
// Avoid race conditions, by quietly ignoring unsub when shutdown.
if (!this.started) {
return
}

topics = ensureArray(topics)

topics.forEach((topic) => this.subscriptions.delete(topic))
Expand Down
4 changes: 2 additions & 2 deletions src/message/rpc.proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ message RPC {
}
message Message {
optional string from = 1;
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topicIDs = 4; // CID of topic descriptor object
repeated string topicIDs = 4;
}
}`
26 changes: 20 additions & 6 deletions src/peer.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ const lp = require('pull-length-prefixed')
const Pushable = require('pull-pushable')
const pull = require('pull-stream')
const setImmediate = require('async/setImmediate')
const EventEmitter = require('events')

const rpc = require('./message').rpc.RPC

/**
* The known state of a connected peer.
*/
class Peer {
class Peer extends EventEmitter {
/**
* @param {PeerInfo} info
*/
constructor (info) {
super()

/**
* @type {PeerInfo}
*/
Expand All @@ -31,6 +34,8 @@ class Peer {
* @type {Pushable}
*/
this.stream = null

this._references = 1
}

/**
Expand Down Expand Up @@ -80,8 +85,15 @@ class Peer {
pull(
this.stream,
lp.encode(),
conn
conn,
pull.onEnd(() => {
this.conn = null
this.stream = null
this.emit('close')
})
)

this.emit('connection')
}

_sendRawSubscriptions (topics, subscribe) {
Expand Down Expand Up @@ -155,16 +167,18 @@ class Peer {
* @returns {undefined}
*/
close (callback) {
if (!this.conn || !this.stream) {
// no connection to close
}
// end the pushable pull-stream
// Force removal of peer
this._references = 1

// End the pushable
if (this.stream) {
this.stream.end()
}

setImmediate(() => {
this.conn = null
this.stream = null
this.emit('close')
callback()
})
}
Expand Down
27 changes: 27 additions & 0 deletions src/utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const crypto = require('libp2p-crypto')
const bs58 = require('bs58')

exports = module.exports

Expand Down Expand Up @@ -66,3 +67,29 @@ exports.ensureArray = (maybeArray) => {

return maybeArray
}

exports.normalizeInRpcMessages = (messages) => {
if (!messages) {
return messages
}
return messages.map((msg) => {
const m = Object.assign({}, msg)
if (Buffer.isBuffer(msg.from)) {
m.from = bs58.encode(msg.from)
}
return m
})
}

exports.normalizeOutRpcMessages = (messages) => {
if (!messages) {
return messages
}
return messages.map((msg) => {
const m = Object.assign({}, msg)
if (typeof msg.from === 'string' || msg.from instanceof String) {
m.from = bs58.decode(msg.from)
}
return m
})
}
30 changes: 27 additions & 3 deletions test/2-nodes.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ describe('basics between 2 nodes', () => {
_times(10, () => fsB.publish('Z', new Buffer('banana')))
})

it('Publish 10 msg to a topic:Z in nodeB as array', (done) => {
let counter = 0

fsB.once('Z', shouldNotHappen)

fsA.on('Z', receivedMsg)

function receivedMsg (msg) {
expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(fsB.libp2p.peerInfo.id.toB58String())
expect(Buffer.isBuffer(msg.seqno)).to.be.true()
expect(msg.topicIDs).to.be.eql(['Z'])

if (++counter === 10) {
fsA.removeListener('Z', receivedMsg)
done()
}
}

let msgs = []
_times(10, () => msgs.push(new Buffer('banana')))
fsB.publish('Z', msgs)
})

it('Unsubscribe from topic:Z in nodeA', (done) => {
fsA.unsubscribe('Z')
expect(fsA.subscriptions.size).to.equal(0)
Expand Down Expand Up @@ -291,11 +315,11 @@ describe('basics between 2 nodes', () => {
nodeA.dial(nodeB.peerInfo, (err) => {
expect(err).to.not.exist()
setTimeout(() => {
expect(fsA.peers.size).to.equal(1)
expect(fsB.peers.size).to.equal(1)
expect(first(fsA.peers)._references).to.equal(2)
expect(first(fsB.peers)._references).to.equal(2)

fsA.stop(() => setTimeout(() => {
expect(fsB.peers.size).to.equal(0)
expect(first(fsB.peers)._references).to.equal(1)
done()
}, 250))
}, 1000)
Expand Down
Loading

0 comments on commit 45c9b11

Please sign in to comment.