Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gossipsub): gracefully disable handler on stream errors #3625

Merged
merged 42 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
1264345
Gossipsub: remove `ConnectionHandlerEvent::Close`
Mar 16, 2023
3c2fbce
Move error handling closer to the source.
Mar 18, 2023
b8fed53
Address PR comments.
Mar 21, 2023
f4cfbc3
Address PR comments.
Mar 21, 2023
e7e96ed
Address PR comments.
Mar 21, 2023
e37ba58
Use `void` instead of panic
thomaseizinger Mar 21, 2023
b6be9ce
Check created streams counter also for failed upgrades
thomaseizinger Mar 21, 2023
1e06367
Merge branch 'libp2p:master' into deprecate/gossipsub-close-event
vnermolaev Mar 22, 2023
415f648
Update changelog.
Mar 22, 2023
f87949d
Fix typo in changelog
thomaseizinger Mar 22, 2023
9e12f9d
Remove outdated comments
thomaseizinger Mar 22, 2023
ee6cb02
Extract utility functions for classifying `ConnectionEvent`
thomaseizinger Mar 22, 2023
3443a69
Set `outbound_substream_establishing` in a single place
thomaseizinger Mar 22, 2023
fef9751
Flatten match
thomaseizinger Mar 22, 2023
7dec223
Avoid being stuck in "Poisoned" state for outbound streams
thomaseizinger Mar 23, 2023
3163213
Don't handle error that is never constructed
thomaseizinger Mar 23, 2023
e28af53
Deprecate `HandlerError` entirely
thomaseizinger Mar 23, 2023
0507493
Track # of outbound streams requested, not successfully established
mxinden Mar 29, 2023
b572895
Re-enqueue message when outbound stream fails
mxinden Mar 29, 2023
12e9b53
Use early return instead of if-else
thomaseizinger Mar 29, 2023
fd4958d
Only send messages in `poll`
thomaseizinger Mar 29, 2023
44dce05
Fix use of `DialUpgradeError` in `is_inbound` check
thomaseizinger Mar 30, 2023
6a5f1d0
Merge branch 'master' into deprecate/gossipsub-close-event
thomaseizinger Apr 3, 2023
3432ac0
Move changelog entry
thomaseizinger Apr 3, 2023
db59d23
Bump version
thomaseizinger Apr 3, 2023
b94ec28
Change log level to warn for bad events
thomaseizinger Apr 3, 2023
c5e3c41
Don't end log messages with periods
thomaseizinger Apr 3, 2023
c02a3a3
Use exhaustive match
thomaseizinger Apr 4, 2023
e94c2c7
Make error message consistently `debug` and use same wording
thomaseizinger Apr 4, 2023
a7ed378
Merge branch 'master' into deprecate/gossipsub-close-event
thomaseizinger Apr 4, 2023
798ef5c
Update protocols/gossipsub/src/handler.rs
mxinden Apr 5, 2023
bbdf8f5
chore: bump libp2p-swarm to v0.42.2
mxinden Apr 5, 2023
af21589
Update Cargo.lock
mxinden Apr 5, 2023
f999f3e
Refactor keep alive mechanism
mxinden Apr 6, 2023
397afa2
Do minor clean up
mxinden Apr 6, 2023
9f44adc
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into d…
mxinden Apr 6, 2023
b01e86f
Add debug for dropped message
mxinden Apr 6, 2023
552cb08
Track MAX_SUBSTREAM_ATTEMPTS in on_connection_event
mxinden Apr 6, 2023
b42e71e
Update swarm/CHANGELOG.md
mxinden Apr 6, 2023
a958b60
Merge branch 'master' into deprecate/gossipsub-close-event
mxinden Apr 11, 2023
d673ed2
Merge branch 'master' into deprecate/gossipsub-close-event
mxinden Apr 12, 2023
7cb4e41
Merge branch 'master' into deprecate/gossipsub-close-event
mxinden Apr 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@

- Signed messages now use sequential integers in the sequence number field.
See [PR 3551].
- Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`.
See [PR 3625].
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved

[PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551
[PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551git pull
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325

# 0.44.1

Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ serde = { version = "1", optional = true, features = ["derive"] }
thiserror = "1.0"
wasm-timer = "0.2.5"
instant = "0.1.11"
void = "1.0.2"
# Metrics dependencies
prometheus-client = "0.19.0"

Expand Down
4 changes: 4 additions & 0 deletions protocols/gossipsub/src/error_priv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ impl From<SigningError> for PublishError {
/// Errors that can occur in the protocols handler.
#[derive(Debug, Error)]
pub enum HandlerError {
#[deprecated(note = "This error will no longer be emitted")]
#[error("The maximum number of inbound substreams created has been exceeded.")]
MaxInboundSubstreams,
#[deprecated(note = "This error will no longer be emitted")]
#[error("The maximum number of outbound substreams created has been exceeded.")]
MaxOutboundSubstreams,
#[error("The message exceeds the maximum transmission size.")]
MaxTransmissionSize,
#[deprecated(note = "This error will no longer be emitted")]
#[error("Protocol negotiation timeout.")]
NegotiationTimeout,
#[deprecated(note = "This error will no longer be emitted")]
#[error("Protocol negotiation failed.")]
NegotiationProtocolError(ProtocolError),
#[error("Failed to encode or decode")]
Expand Down
159 changes: 80 additions & 79 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ use libp2p_swarm::handler::{
SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
use log::{error, trace, warn};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
io,
pin::Pin,
task::{Context, Poll},
time::Duration,
Expand Down Expand Up @@ -124,9 +121,6 @@ pub struct Handler {
/// The amount of time we allow idle connections before disconnecting.
idle_timeout: Duration,

/// Collection of errors from attempting an upgrade.
upgrade_errors: VecDeque<ConnectionHandlerUpgrErr<HandlerError>>,

/// Flag determining whether to maintain the connection to the peer.
keep_alive: KeepAlive,

Expand Down Expand Up @@ -174,7 +168,6 @@ impl Handler {
peer_kind_sent: false,
protocol_unsupported: false,
idle_timeout,
upgrade_errors: VecDeque::new(),
keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)),
in_mesh: false,
}
Expand Down Expand Up @@ -202,7 +195,7 @@ impl Handler {
}

// new inbound substream. Replace the current one, if it exists.
trace!("New inbound substream request");
log::trace!("New inbound substream request");
self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream));
}

Expand Down Expand Up @@ -234,7 +227,7 @@ impl Handler {
// Should never establish a new outbound substream if one already exists.
// If this happens, an outbound message is not sent.
if self.outbound_substream.is_some() {
warn!("Established an outbound substream with one already available");
log::warn!("Established an outbound substream with one already available");
// Add the message back to the send queue
self.send_queue.push(message);
} else {
Expand Down Expand Up @@ -289,44 +282,15 @@ impl ConnectionHandler for Handler {
Self::Error,
>,
> {
// Handle any upgrade errors
if let Some(error) = self.upgrade_errors.pop_front() {
let reported_error = match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
Some(HandlerError::NegotiationTimeout)
}
// There was an error post negotiation, close the connection.
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => {
match negotiation_error {
NegotiationError::Failed => {
// The protocol is not supported
self.protocol_unsupported = true;
if !self.peer_kind_sent {
self.peer_kind_sent = true;
// clear all substreams so the keep alive returns false
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
return Poll::Ready(ConnectionHandlerEvent::Custom(
HandlerEvent::PeerKind(PeerKind::NotSupported),
));
} else {
None
}
}
NegotiationError::ProtocolError(e) => {
Some(HandlerError::NegotiationProtocolError(e))
}
}
}
};

// If there was a fatal error, close the connection.
if let Some(error) = reported_error {
return Poll::Ready(ConnectionHandlerEvent::Close(error));
}
if self.protocol_unsupported && !self.peer_kind_sent {
self.peer_kind_sent = true;
// clear all substreams so the keep alive returns false
self.inbound_substream = None;
self.outbound_substream = None;
self.keep_alive = KeepAlive::No;
return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind(
PeerKind::NotSupported,
)));
}

if !self.peer_kind_sent {
Expand All @@ -338,23 +302,14 @@ impl ConnectionHandler for Handler {
}
}

if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION {
// Too many inbound substreams have been created, end the connection.
return Poll::Ready(ConnectionHandlerEvent::Close(
HandlerError::MaxInboundSubstreams,
));
}
// Invariant: `self.inbound_substreams_created < MAX_SUBSTREAM_CREATION`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding an assert! or debug_assert! (preference for the former) here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taking a closer look, I don't think this invariant actually holds.

In on_connection_event we drop new inbound requests on ==:

        if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION {
            // Too many inbound substreams have been created, disable the handler.
            self.keep_alive = KeepAlive::No;
            log::warn!("The maximum number of inbound substreams created has been exceeded");
            return;
        }

Thus self.inbound_substreams may never be > but it could be == and thus the invariant of < does not hold.

@thomaseizinger am I missing something?


// determine if we need to create the stream
if !self.send_queue.is_empty()
&& self.outbound_substream.is_none()
&& !self.outbound_substream_establishing
{
if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION {
return Poll::Ready(ConnectionHandlerEvent::Close(
HandlerError::MaxOutboundSubstreams,
));
}
// Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`.
let message = self.send_queue.remove(0);
self.send_queue.shrink_to_fit();
self.outbound_substream_establishing = true;
Expand Down Expand Up @@ -383,12 +338,12 @@ impl ConnectionHandler for Handler {
Poll::Ready(Some(Err(error))) => {
match error {
HandlerError::MaxTransmissionSize => {
warn!("Message exceeded the maximum transmission size");
log::warn!("Message exceeded the maximum transmission size");
self.inbound_substream =
Some(InboundSubstreamState::WaitingInput(substream));
}
_ => {
warn!("Inbound stream error: {}", error);
log::warn!("Inbound stream error: {}", error);
// More serious errors, close this side of the stream. If the
// peer is still around, they will re-establish their
// connection
Expand All @@ -399,7 +354,7 @@ impl ConnectionHandler for Handler {
}
// peer closed the stream
Poll::Ready(None) => {
warn!("Peer closed their outbound stream");
log::warn!("Peer closed their outbound stream");
self.inbound_substream =
Some(InboundSubstreamState::Closing(substream));
}
Expand All @@ -417,7 +372,7 @@ impl ConnectionHandler for Handler {
// Don't close the connection but just drop the inbound substream.
// In case the remote has more to send, they will open up a new
// substream.
warn!("Inbound substream error while closing: {:?}", e);
log::warn!("Inbound substream error while closing: {e}");
}
self.inbound_substream = None;
if self.outbound_substream.is_none() {
Expand Down Expand Up @@ -470,19 +425,22 @@ impl ConnectionHandler for Handler {
Some(OutboundSubstreamState::PendingFlush(substream))
}
Err(HandlerError::MaxTransmissionSize) => {
error!("Message exceeded the maximum transmission size and was not sent.");
log::error!("Message exceeded the maximum transmission size and was not sent.");
self.outbound_substream =
Some(OutboundSubstreamState::WaitingOutput(substream));
}
Err(e) => {
error!("Error sending message: {}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
log::debug!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logs for errors on outbound substreams have all been downgraded to debug while the inbound ones are kept as warn. I would expect the pub part of gossipsub to be as important (if not more) as the sub part to keep these all as warn at least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, a warn always means "Should I wake an ops person at 3am because of this"? Connections can die at any time because somebody e.g. closes their laptop. That is not a reason to wake an ops person IMO, hence I am gonna downgrade the inbound streams to debug instead upgrading the outbound ones to warn.

"Outbound substream error while sending output: {e}"
);
self.outbound_substream = None;
break;
}
}
}
Poll::Ready(Err(e)) => {
error!("Outbound substream error while sending output: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(e));
log::debug!("Outbound substream error while sending output: {e}");
break;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -504,7 +462,8 @@ impl ConnectionHandler for Handler {
Some(OutboundSubstreamState::WaitingOutput(substream))
}
Poll::Ready(Err(e)) => {
return Poll::Ready(ConnectionHandlerEvent::Close(e))
log::debug!("Outbound substream error while flushing output: {e}");
break;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Pending => {
self.keep_alive = KeepAlive::Yes;
Expand All @@ -525,14 +484,8 @@ impl ConnectionHandler for Handler {
break;
}
Poll::Ready(Err(e)) => {
warn!("Outbound substream error while closing: {:?}", e);
return Poll::Ready(ConnectionHandlerEvent::Close(
io::Error::new(
io::ErrorKind::BrokenPipe,
"Failed to close outbound substream",
)
.into(),
));
log::debug!("Outbound substream error while closing: {e}");
break;
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}
Poll::Pending => {
self.keep_alive = KeepAlive::No;
Expand Down Expand Up @@ -564,17 +517,65 @@ impl ConnectionHandler for Handler {
Self::OutboundOpenInfo,
>,
) {
// Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605.
if matches!(
event,
ConnectionEvent::FullyNegotiatedInbound(_)
| ConnectionEvent::DialUpgradeError(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too.
..
})
) && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION
{
// Too many inbound substreams have been created, disable the handler.
self.keep_alive = KeepAlive::No;
log::info!("The maximum number of inbound substreams created has been exceeded.");
Copy link
Contributor

@AgeManning AgeManning Mar 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be a warn, error or a crit. In principle this should never happen and we don't want the connection to just end silently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thinking was that info! is typically used for a state change (we are disabling the handler). I don't want to wake an ops person at 3am because we are emitting warn or error here :)

Happy to be convinced otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not all that concerned. I;d be curious to know when to use a warn then. I'd typically use a warn if something has not behaved as it should and is worth notifying a user.
In this case, we pretty much should never see this log, unless something is pretty broken. I find it handy to grep warn/error etc to find things that are broken. This fits into that classification imo.

But if other parts of libp2p are not doing it, happy to leave as an info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we never see this log? We would see this if we connect to a node that doesn't support the gossipsub protocol right? Nothing is inherently broken if that happens so I am not sure a warn is appropriate.

Copy link
Member

@mxinden mxinden Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that this should rather be a warn instead of a info.

Why would we never see this log?

By default env-logger does not log on info level. I would argue that users should see this by default, thus it should be a level higher than info which is warn.

In this case, we pretty much should never see this log, unless something is pretty broken. I find it handy to grep warn/error etc to find things that are broken. This fits into that classification imo.

Agreed.

We would see this if we connect to a node that doesn't support the gossipsub protocol right?

As far as I can tell a UpgradeError::Select(NegotiationError::Failed) would lead to self.protocol_unsupported = true and thus a direct disabling (i.e. self.keep_alive = KeepAlive::No) in the next ConnectionHandler::poll invocation. Thus we would never request another outbound stream and thus the log line above is never printed. Am I missing something @thomaseizinger?

Just to avoid confusion, I am in favor of the above behavior. I don't think there is value in directly requesting another stream in case the remote signaled that it does not support the protocol on the previous stream. Objections?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense!

return;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

if matches!(
event,
ConnectionEvent::FullyNegotiatedOutbound(_) | ConnectionEvent::DialUpgradeError(_)
) && self.outbound_substreams_created == MAX_SUBSTREAM_CREATION
{
// Too many outbound substreams have been created, disable the handler.
self.keep_alive = KeepAlive::No;
log::info!("The maximum number of outbound substreams created has been exceeded.");
AgeManning marked this conversation as resolved.
Show resolved Hide resolved
return;
}

match event {
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound)
}
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => {
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.outbound_substream_establishing = false;
warn!("Dial upgrade error {:?}", e);
self.upgrade_errors.push_back(e);

match error {
// Timeout errors get mapped to NegotiationTimeout and we close the connection.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => {
log::debug!("Dial upgrade error: Protocol negotiation timeout.");
}
// There was an error post negotiation, close the connection.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => {
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
void::unreachable(e)
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)) => {
// The protocol is not supported
self.protocol_unsupported = true;
log::debug!(
"The remote peer does not support gossipsub on this connection"
);
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => log::debug!("Protocol negotiation failed: {e}"),
}
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
}
Expand Down
5 changes: 3 additions & 2 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use log::{debug, warn};
use quick_protobuf::Writer;
use std::pin::Pin;
use unsigned_varint::codec;
use void::Void;

pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";

Expand Down Expand Up @@ -147,7 +148,7 @@ where
TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
type Error = HandlerError;
type Error = Void;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
Expand All @@ -168,7 +169,7 @@ where
TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
{
type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
type Error = HandlerError;
type Error = Void;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
Expand Down