-
Notifications
You must be signed in to change notification settings - Fork 965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(gossipsub): introduce backpressure #4914
Conversation
bbac280
to
a85f1f8
Compare
a85f1f8
to
a15e72b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start! Left some comments! :)
protocols/gossipsub/src/behaviour.rs
Outdated
@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> { | |||
|
|||
/// Keep track of a set of internal metrics relating to gossipsub. | |||
metrics: Option<Metrics>, | |||
|
|||
/// Connection handler message queue channels. | |||
handler_send_queues: HashMap<PeerId, RpcSender>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we might have to make this by ConnectionId
. Or make the value of the hashmap a Vec
of RpcSender
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, because ConnectionHandler
s may send to different PeerId
s during its existence right? But then how do we relate the mesh peers which are identified by PeerId
s to ConnectionId
s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because
ConnectionHandler
s may send to differentPeerId
s during its existence right?
No. A ConnectionHandler
only ever connects the local node to a single remote node (i.e. remote PeerId
) during its existence. Though note that the local node might have multiple ConnectionHandler
s to a single remote node, each with a different ConnectionId
.
I think the problem that Thomas is hinting at, is that we might have multiple connections to a single PeerId
. In other words, there might be multiple RpcSender
s per PeerId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining Max, If I understand correctly it's also why Thomas mentioned that we'd need to distinguish between Full
and Closed
when sending the message to the Receiver
so that with the idea of a Vec
of RpcSender
we try all the ConnectionHandlers
right?
If that is so, since async-channel
is mpmc ( where each message can be received by only one of all existing consumers) why don't we clone the RpcReceiver
so that the first ConnectionHandler
reading the message sends it and we assure the channel never gets closed? Presented this idea on dd13fcd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the latest refactorings, is this field still needed? The RpcSender
is now part of PeerConnections
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no no, thanks Max updated!
protocols/gossipsub/src/types.rs
Outdated
let (priority_sender, priority_receiver) = async_channel::unbounded(); | ||
let (non_priority_sender, non_priority_receiver) = async_channel::bounded(cap / 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async-channel
is an MPMC channel but we don't ever .clone()
the Receiver
, right? So why not use the mpsc
channel from futures
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The futures::channel::mpsc
implementation gives you one slot per Sender
+ whatever capacity you initialize the channel with. So by doing sender.clone().try_send
, I think we are guaranteed to be able to send a message, even if the Sender
is immediately dropped after. Technically, that makes the channel unbounded but if we only do it for Control
messages, that shouldn't matter much because they are so small.
I think that is what @mxinden meant in #4667 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
async-channel is an MPMC channel but we don't ever .clone() the Receiver, right? So why not use the mpsc channel from futures?
because we need to check if the channel is empty on the connection handler when determining if we need to create the outbound stream:
if !self.send_queue.is_empty()
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
handler.rs:231
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can wrap the Receiver
in a Peekable
and check if it has an item: https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.peekable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks Thomas I wasn't aware of this! Nor of the property Max mentioned regarding the Sender
clone. See my question regarding MPMC on #4914 (comment)
protocols/gossipsub/src/behaviour.rs
Outdated
let event = RpcOut::Subscribe(topic_hash.clone()); | ||
self.send_message(peer, event); | ||
self.send_message(peer, event) | ||
.expect("Subscribe messages should be always sent"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be useful if we'd split RpcOut
to avoid this. Something like:
ControlRpcOut
Publish
Forward
And then we can have:
fn send_control_message(); // Never fails
fn send_publish() -> Result<(), ...>; // Can fail
fn send_forward(); // Never fails but pushes into a different channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah thought about it, but ControlAction
has different types of priorities, IHAVE/IWANT are low and GRAFT/PRUNE are high priority, all of them are under the ControlAction
enum as struct variants. We could split them into isolated structures to then use both on PriorityRpcOut
and and RpcOut
so that we could do:
fn prune(&mut self, graft: Graft);
fn graft(&mut self, graft: Graft);
but ControlAction
is pub so that would be a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but
ControlAction
has different types of priorities, IHAVE/IWANT are low and GRAFT/PRUNE are high priority
Why is this distinction relevant?
The goal of this pull request is to prevent unbounded growth of send_queue
. Neither IHAVE/IWANT nor GRAFT/PRUNE are large. Correct me if I am wrong. Thus differentiating among the two is not relevant for this pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following @AgeManning on #4667 (comment) but you are probably right Max, updated the code to implement Thomas suggested methods into RpcSender
which allow us to not split RpcOut
and meanwhile also remove send_message
and the double iterations on Behaviour
, ptal Thomas
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the distinction is important.
GRAFT/PRUNE are very important in maintaining the overlay mesh network. If we drop these messages we could break the whole network, i.e nodes could start randomly having broken links and will not know to find new mesh peers and messages can stop being forwarded through the network.
IHAVE/IWANT are very low priority. In fact if we are struggling with send/receiving messages we probably want to drop these along with forward messages. Although the messages themselves are fairly small (but we can send a large number of message-ids, like a few thousand (configurable)), they both induce a greater message load. IHAVE can induce IWANT requests from other peers, which ends up making us publish more messages (which we don't want if we are struggling). IWANT will request more messages to be sent to us and further consume bandwidth. An IWANT can request a fair amount more messages (up to 500 in lighthhouse).
For these reasons, I think if we are in the unhappy case where we are struggling, we want to drop IHAVE/IWANT and forward messages, but we definitely do not want to drop GRAFT/PRUNE if we can help it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I write this, we probably also want to make sure that messages that we respond to an IWANT with, gets sent as a "forward" message and not a "publish" message.
If it is sent too late, we will have already broken our promise, as they are time-bound anyway. We want to remove these from sending if we can't send them in time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Distinguishing in GRAFT/PRUNE and IHAVE/IWANT makes sense to me. Thank you for expanding on it.
That said, I suggest not enlarging the scope of this pull request. I suggest to fix the unbounded send_queue
problem in this pull request (#4667). Once merged I suggest following up with one or more pull requests introducing various optimizations, e.g. time based prioritization of forward messages, differentiation of GRAFT/PRUNE and IHAVE/IWANT, ...
I won't block this pull request in case you want to do all of this in one. Though I expect separate pull requests to be significantly either to implement, review and debug.
protocols/gossipsub/src/behaviour.rs
Outdated
if sender.try_send(rpc.clone()).is_err() { | ||
tracing::debug!(peer=%peer_id, "Dropping message as peer is full"); | ||
return Err(()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should differentiate between Closed
and Full
here and perhaps return the message back to the caller in Err
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but Closed
is unreachable no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily. A connection runs in a different task and on a multi-threaded executor, this connection could get dropped and thus the receiver freed before we receive the event that the connection is closed (where we'd clean up the state).
It might not matter though for this case so probably fine :)
protocols/gossipsub/src/behaviour.rs
Outdated
tracing::debug!(peer=%peer_id, "Dropping message as peer is full"); | ||
return Err(()); | ||
} | ||
|
||
if let Some(m) = self.metrics.as_mut() { | ||
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc { | ||
// register bytes sent on the internal metrics. | ||
m.msg_sent(&message.topic, message.raw_protobuf_len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you compute this before we use try_send
, you can avoid the .clone()
on the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but if the send fails when we are adding wrong metrics data as the message was not actually sent no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, this is so painful ...
We should really rewrite these tests to go against the Swarm
API using libp2p-swarm-test
. Changing tests AND implementation in one PR makes me a lot less confident in a green CI.
There are a lot of tests to change. Can we perhaps make this a group effort? (cc @mxinden @AgeManning)
It probably makes sense to first rewrite a few in order to create some utility functions for setting up networks. But once those are in place, perhaps we can chip away at them together? There are 85 tests to change ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't want to blow the scope of this work too much. It is just that reviewing all the changes to these tests also takes a lot of time but it is an effort that we aren't going to benefit from in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree Thomas, let me look into that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I'm happy to help out also. My opinion is to try and get these changes in first (we'll be doing fairly extensive tests on live networks as well as small scale simulations) to give us confidence in the changes.
Then in a future PR we re-write all the tests.
self.send_queue.shrink_to_fit(); | ||
self.outbound_substream = | ||
Some(OutboundSubstreamState::PendingSend(substream, message)); | ||
if let Poll::Ready(Some(message)) = self.send_queue.poll_next_unpin(cx) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we now get rid of OutboundSubstreamState
and just use an async move
with a loop
inside? I guess that would require us to be able to .clone()
the Receiver
so we can keep one around in case the stream fails and we want to re-establish it.
Probably not worth doing as part of this to keep the scope small but it would be good to debate, why we are retrying to establish the stream if it failed. Any form of loop that retries something is always a bit sus in my eyes. We could as well rely on an upper layer to re-establish the connection. I don't see why trying 5 times to re-establish the stream is fundamentally different to just disabling ourselves after the first one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agree we should try it on a subsequent PR
and clone it per ConnectionHandler, this will allow us to always have an open Receiver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did not forget about Thomas remark on https://github.com/libp2p/rust-libp2p/pull/4914/files#r1402807803. I am just trying to close the api so that we know what we need from the updated tests
protocols/gossipsub/src/behaviour.rs
Outdated
@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> { | |||
|
|||
/// Keep track of a set of internal metrics relating to gossipsub. | |||
metrics: Option<Metrics>, | |||
|
|||
/// Connection handler message queue channels. | |||
handler_send_queues: HashMap<PeerId, RpcSender>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for explaining Max, If I understand correctly it's also why Thomas mentioned that we'd need to distinguish between Full
and Closed
when sending the message to the Receiver
so that with the idea of a Vec
of RpcSender
we try all the ConnectionHandlers
right?
If that is so, since async-channel
is mpmc ( where each message can be received by only one of all existing consumers) why don't we clone the RpcReceiver
so that the first ConnectionHandler
reading the message sends it and we assure the channel never gets closed? Presented this idea on dd13fcd
protocols/gossipsub/src/behaviour.rs
Outdated
let event = RpcOut::Subscribe(topic_hash.clone()); | ||
self.send_message(peer, event); | ||
self.send_message(peer, event) | ||
.expect("Subscribe messages should be always sent"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was following @AgeManning on #4667 (comment) but you are probably right Max, updated the code to implement Thomas suggested methods into RpcSender
which allow us to not split RpcOut
and meanwhile also remove send_message
and the double iterations on Behaviour
, ptal Thomas
to allow for better handling of each message send.
40e651c
to
92a171c
Compare
bf76c6f
to
4422990
Compare
4422990
to
09143b5
Compare
…-backpressure-cont
09143b5
to
e421174
Compare
protocols/gossipsub/Cargo.toml
Outdated
@@ -37,12 +37,13 @@ sha2 = "0.10.8" | |||
smallvec = "1.11.2" | |||
tracing = "0.1.37" | |||
void = "1.0.2" | |||
async-channel = "1.9.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no no, thanks Max, updated
protocols/gossipsub/src/behaviour.rs
Outdated
@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> { | |||
|
|||
/// Keep track of a set of internal metrics relating to gossipsub. | |||
metrics: Option<Metrics>, | |||
|
|||
/// Connection handler message queue channels. | |||
handler_send_queues: HashMap<PeerId, RpcSender>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the latest refactorings, is this field still needed? The RpcSender
is now part of PeerConnections
, right?
protocols/gossipsub/src/behaviour.rs
Outdated
.connected_peers | ||
.get_mut(&peer_id) | ||
.expect("Peerid should exist") | ||
.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this clone needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not, thanks Max!
/// Send a `RpcOut::Subscribe` message to the `RpcReceiver` | ||
/// this is high priority. By cloning `futures::channel::mpsc::Sender` | ||
/// we get one extra slot in the channel's capacity. | ||
pub(crate) fn subscribe(&mut self, topic: TopicHash) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should subscribe
return a Result
for the case where all channels to the remote peer are closed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we assume that situation as unreachable? i.e. there will always be at least one connection id per peer? as we add them and remove them on new and closed connections.
If not all PeerConnections
send methods should account for that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See also #4914 (comment).
} | ||
} | ||
} | ||
unreachable!("At least one peer should be available"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this unreachable? Say that all channels to the given peer are closed, the for
loop above will not return
and thus this unreachable
is hit, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, it's as I asked above, isn't the situation of having all channels to a given peer closed unreachable
?
On handle_established_{inbound,outbound}_connection
we add the PeerId
and the ConnectionId
to connected_peers
, which is then removed on on_connection_closed
removing the ConnectionId
, and the PeerId
if `remaining_established_ is zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Say that each connection has an error, e.g. here:
rust-libp2p/swarm/src/connection/pool/task.rs
Line 237 in c3500bb
Err(error) => { |
Then each connection is going to close their channel.
Say that "the same" time (concurrently) the user calls Behaviour::publish
.
In such case, the Behaviour
would still track the connections, even though each channel would be dropped.
Does the above make sense?
Err(err) if err.is_full() => { | ||
tracing::trace!("Queue is full, dropped Forward message"); | ||
return; | ||
} | ||
// Channel is closed, try another sender. | ||
Err(err) => { | ||
rpc = err.into_inner(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still have to give this more thought, though of the top of my head, trying a different connection in both cases is fine, no?
protocols/gossipsub/src/types.rs
Outdated
pub(crate) non_priority: Sender<RpcOut>, | ||
} | ||
|
||
/// `RpcOut` sender that is priority aware. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// `RpcOut` sender that is priority aware. | |
/// `RpcOut` receiver that is priority aware. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks Max, addressed!
This pull request has merge conflicts. Could you please resolve them @jxs? 🙏 |
ffe9e74
to
a9d74c2
Compare
a9d74c2
to
1072cc7
Compare
superseeds #4914 with some changes and improvements, namely: - introduce a `Delay` for `Forward` and `Publish` messages, messages that take more than the configured delay to be sent are discarded - introduce scoring and penalize slow peers - remove control pool - report slow peers with the number of failed messages Pull-Request: #5595.
yup thanks @drHuangMHT! |
Follows @mxinden implementation suggestion on #4667 (comment)
Notes & open questions
Publish, GRAFT, PRUNE and Subscription messages are prioritized, Publish messages are cap'ed, using
sender.clone().try_send
doesn't work as the queue is shared by theReceiver
. I used anAtomicUsize
for that.Used
async-channel
to be able to check ifis_empty()
in theConnectionHandler
.Reused the
InsufficientPeers
peersError
as to not introduce another variant and make the changes breaking, but this is probably a breaking change in the sense that the internal behaviour is changing no?if this design makes sense I can then add a test to test queues fill.
Change checklist