-
Notifications
You must be signed in to change notification settings - Fork 16
Various floodsub issues #51
Conversation
@@ -155,9 +155,6 @@ class Peer { | |||
* @returns {undefined} | |||
*/ | |||
close (callback) { | |||
if (!this.conn || !this.stream) { | |||
// no connection to close | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it never invoked the callback, read the rest of the code.
This is still WIP, reviews are fine but DO NOT merge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take it back. This code does nothing.
@whyrusleeping IMHO libp2p/go-libp2p-pubsub@25ef943#diff-9b6fbbdd6fa2676b0d87d60d96f4b556 does not help JSON marshalling. Now RPC messages are different than API messages ( Also, doesn't this break old go nodes that are expected a string. But its way to lateto change this. Will just have to cope with. |
@richardschneider bytes and string are the same thing from a protobuf perspective. Old go nodes arent a problem because we check api versioning. In any case, the api json is meant to be human readable (for the most part). The random mess of binary data that we had before was unusable, which is why it got changed. |
@diasdavid when a connection is closed, then the peer is removed from However, if a peer has multiple connections, then we loose all knowledge of it. Basically, Sounds academic, but in the real world (js-go interop testing) I see two connections being made. And when one of the connections is closed, we forgot all about the peer.
|
This implements a refernce counting scheme. See #51 (comment) One test is still failing.
@diasdavid finally its ready for review |
Two Connections are made (note, two connections in the sense of libp2p, which for PubSub, are two muxed streams), one for outgoing and another for incoming (peerA -> peerB and peerB -> peerA)
Agree, it should just remove the information about a Peer when everything is shutdown. The current logic was to just ignore that it has a link with another Peer if we do not intend to send anything else (no outgoing connection), it should not try to use the incoming connection. I'm reviewing this PR, just wanted to clarify this first :) |
log('new peer', id) | ||
this.peers.set(id, peer) | ||
existing = peer | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if can be inverted for great readability (by this I mean, avoid do negative equals with things like undefined or null)
let existing = this.peers.get(id)
if (!existing) {
log('new peer', id)
this.peers.set(id, peer)
existing = peer
} else {
log('already existing peer', id)
++existing._references
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, positive logic is always better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better
if (existing) {
log('already existing peer', id)
++existing._references
} else {
log('new peer', id)
this.peers.set(id, peer)
existing = peer
}
// If already had a dial to me, just add the conn | ||
if (!this.peers.has(idB58Str)) { | ||
this.peers.set(idB58Str, new Peer(peerInfo)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is important logic.
What is happening here is: "If the other peer has already dialed to me, we already have an establish link between the two, what might be missing is a Connection specifically between me and that Peer"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copy and paste from original code. will add the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code was moved to _addPeer
test/2-nodes.js
Outdated
@@ -287,7 +287,7 @@ describe('basics between 2 nodes', () => { | |||
], done) | |||
}) | |||
|
|||
it('peer is removed from the state when connection ends', (done) => { | |||
it.skip('peer is removed from the state when connection ends', (done) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not skip tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't get it to work. Please tell me what to do. It's the whole multiple connections issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that what we need is to track the Outgoing Connection and the Incoming Connection. This test should check that our Outgoing Connection disappears when the connection ends.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@diasdavid Here's an approach
it('peer is removed from the state when connection ends', (done) => {
nodeA.dial(nodeB.peerInfo, (err) => {
expect(err).to.not.exist()
setTimeout(() => {
expect(first(fsA.peers)._references).to.equal(2)
expect(first(fsB.peers)._references).to.equal(2)
fsA.stop(() => setTimeout(() => {
expect(first(fsB.peers)._references).to.equal(1)
done()
}, 250))
}, 1000)
})
})
{ from: binaryId } | ||
] | ||
expect(utils.normalizeOutRpcMessages(m)).to.deep.eql(expected) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ta
@diasdavid requested changes made. What do you want to do about the skipped test. |
src/index.js
Outdated
return peer.sendSubscriptions(topics) | ||
} | ||
const onConnection = () => { | ||
peer.removeListened('connection', onConnection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pgte Should this be removeListener
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@richardschneider oops, thanks! @diasdavid fixed it for me..
Some of the issues were introduced by me in #49. Basically, the field
topicCIDs
is used in RPC messages andtopicIDs
in API messages.See issue #50
Summary of changes
from