From 126434557244f17e3b1d884736d8cbc73807fe92 Mon Sep 17 00:00:00 2001 From: Victor Ermolaev Date: Thu, 16 Mar 2023 10:14:08 +0100 Subject: [PATCH 01/35] Gossipsub: remove `ConnectionHandlerEvent::Close` --- protocols/gossipsub/src/handler.rs | 55 +++++++++++++----------------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 2c921286553..1b1e0030bd0 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -37,7 +37,6 @@ use log::{error, trace, warn}; use smallvec::SmallVec; use std::{ collections::VecDeque, - io, pin::Pin, task::{Context, Poll}, time::Duration, @@ -291,13 +290,17 @@ impl ConnectionHandler for Handler { > { // Handle any upgrade errors if let Some(error) = self.upgrade_errors.pop_front() { - let reported_error = match error { + match error { // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - Some(HandlerError::NegotiationTimeout) + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {}", HandlerError::NegotiationTimeout); } // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => Some(e), + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {e}"); + } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { match negotiation_error { NegotiationError::Failed => { @@ -312,20 +315,17 @@ impl ConnectionHandler for Handler { return Poll::Ready(ConnectionHandlerEvent::Custom( HandlerEvent::PeerKind(PeerKind::NotSupported), )); - } else { - None } } NegotiationError::ProtocolError(e) => { - Some(HandlerError::NegotiationProtocolError(e)) + self.keep_alive = KeepAlive::No; + log::info!( + "Gossipsub error: {}", + HandlerError::NegotiationProtocolError(e) + ); } } } - }; - - // If there was a fatal error, close the connection. - if let Some(error) = reported_error { - return Poll::Ready(ConnectionHandlerEvent::Close(error)); } } @@ -340,9 +340,8 @@ impl ConnectionHandler for Handler { if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, end the connection. - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::MaxInboundSubstreams, - )); + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams); } // determine if we need to create the stream @@ -351,9 +350,8 @@ impl ConnectionHandler for Handler { && !self.outbound_substream_establishing { if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - return Poll::Ready(ConnectionHandlerEvent::Close( - HandlerError::MaxOutboundSubstreams, - )); + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams); } let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); @@ -475,14 +473,14 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)); } Err(e) => { - error!("Error sending message: {}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + error!("Error sending message: {e}"); + break; } } } Poll::Ready(Err(e)) => { - error!("Outbound substream error while sending output: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close(e)); + error!("Outbound substream error while sending output: {e:?}"); + break; } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -504,7 +502,8 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - return Poll::Ready(ConnectionHandlerEvent::Close(e)) + log::info!("NegotaitedSubstream error: {e}"); + break; } Poll::Pending => { self.keep_alive = KeepAlive::Yes; @@ -525,14 +524,8 @@ impl ConnectionHandler for Handler { break; } Poll::Ready(Err(e)) => { - warn!("Outbound substream error while closing: {:?}", e); - return Poll::Ready(ConnectionHandlerEvent::Close( - io::Error::new( - io::ErrorKind::BrokenPipe, - "Failed to close outbound substream", - ) - .into(), - )); + warn!("Outbound substream error while closing: {e:?}"); + break; } Poll::Pending => { self.keep_alive = KeepAlive::No; From 3c2fbcec1e1a2282f020e3796059d17b67693245 Mon Sep 17 00:00:00 2001 From: Victor Ermolaev Date: Sat, 18 Mar 2023 17:10:04 +0100 Subject: [PATCH 02/35] Move error handling closer to the source. --- protocols/gossipsub/src/handler.rs | 86 ++++++++++++++---------------- 1 file changed, 39 insertions(+), 47 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 1b1e0030bd0..c23f3d9ae7d 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -36,7 +36,6 @@ use libp2p_swarm::NegotiatedSubstream; use log::{error, trace, warn}; use smallvec::SmallVec; use std::{ - collections::VecDeque, pin::Pin, task::{Context, Poll}, time::Duration, @@ -123,9 +122,6 @@ pub struct Handler { /// The amount of time we allow idle connections before disconnecting. idle_timeout: Duration, - /// Collection of errors from attempting an upgrade. - upgrade_errors: VecDeque>, - /// Flag determining whether to maintain the connection to the peer. keep_alive: KeepAlive, @@ -173,7 +169,6 @@ impl Handler { peer_kind_sent: false, protocol_unsupported: false, idle_timeout, - upgrade_errors: VecDeque::new(), keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), in_mesh: false, } @@ -288,45 +283,15 @@ impl ConnectionHandler for Handler { Self::Error, >, > { - // Handle any upgrade errors - if let Some(error) = self.upgrade_errors.pop_front() { - match error { - // Timeout errors get mapped to NegotiationTimeout and we close the connection. - ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - self.keep_alive = KeepAlive::No; - log::info!("Gossipsub error: {}", HandlerError::NegotiationTimeout); - } - // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { - self.keep_alive = KeepAlive::No; - log::info!("Gossipsub error: {e}"); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { - match negotiation_error { - NegotiationError::Failed => { - // The protocol is not supported - self.protocol_unsupported = true; - if !self.peer_kind_sent { - self.peer_kind_sent = true; - // clear all substreams so the keep alive returns false - self.inbound_substream = None; - self.outbound_substream = None; - self.keep_alive = KeepAlive::No; - return Poll::Ready(ConnectionHandlerEvent::Custom( - HandlerEvent::PeerKind(PeerKind::NotSupported), - )); - } - } - NegotiationError::ProtocolError(e) => { - self.keep_alive = KeepAlive::No; - log::info!( - "Gossipsub error: {}", - HandlerError::NegotiationProtocolError(e) - ); - } - } - } - } + if self.protocol_unsupported && !self.peer_kind_sent { + self.peer_kind_sent = true; + // clear all substreams so the keep alive returns false + self.inbound_substream = None; + self.outbound_substream = None; + self.keep_alive = KeepAlive::No; + return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( + PeerKind::NotSupported, + ))); } if !self.peer_kind_sent { @@ -564,10 +529,37 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(DialUpgradeError { error: e, .. }) => { + ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { self.outbound_substream_establishing = false; - warn!("Dial upgrade error {:?}", e); - self.upgrade_errors.push_back(e); + + match error { + // Timeout errors get mapped to NegotiationTimeout and we close the connection. + ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { + self.keep_alive = KeepAlive::No; + log::info!("Dial upgrade error: {}", HandlerError::NegotiationTimeout); + } + // There was an error post negotiation, close the connection. + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + self.keep_alive = KeepAlive::No; + log::info!("Dial upgrade error: {e}"); + } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { + match negotiation_error { + NegotiationError::Failed => { + // The protocol is not supported + self.protocol_unsupported = true; + log::info!("Dial upgrade error: {}", NegotiationError::Failed); + } + NegotiationError::ProtocolError(e) => { + self.keep_alive = KeepAlive::No; + log::info!( + "Gossipsub error: {}", + HandlerError::NegotiationProtocolError(e) + ); + } + } + } + } } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } From b8fed53555374aba80c9036f03868289ac2bc1fa Mon Sep 17 00:00:00 2001 From: Victor Ermolaev Date: Tue, 21 Mar 2023 11:44:49 +0100 Subject: [PATCH 03/35] Address PR comments. --- protocols/gossipsub/src/error_priv.rs | 9 ------ protocols/gossipsub/src/handler.rs | 40 +++++++++++++++------------ 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/protocols/gossipsub/src/error_priv.rs b/protocols/gossipsub/src/error_priv.rs index 09bbfbb3543..e77a551d285 100644 --- a/protocols/gossipsub/src/error_priv.rs +++ b/protocols/gossipsub/src/error_priv.rs @@ -21,7 +21,6 @@ //! Error types that can result from gossipsub. use libp2p_core::identity::error::SigningError; -use libp2p_core::upgrade::ProtocolError; use thiserror::Error; /// Error associated with publishing a gossipsub message. @@ -89,16 +88,8 @@ impl From for PublishError { /// Errors that can occur in the protocols handler. #[derive(Debug, Error)] pub enum HandlerError { - #[error("The maximum number of inbound substreams created has been exceeded.")] - MaxInboundSubstreams, - #[error("The maximum number of outbound substreams created has been exceeded.")] - MaxOutboundSubstreams, #[error("The message exceeds the maximum transmission size.")] MaxTransmissionSize, - #[error("Protocol negotiation timeout.")] - NegotiationTimeout, - #[error("Protocol negotiation failed.")] - NegotiationProtocolError(ProtocolError), #[error("Failed to encode or decode")] Codec(#[from] quick_protobuf_codec::Error), } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index c23f3d9ae7d..a9e97f28b25 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -181,6 +181,13 @@ impl Handler { ::InboundOpenInfo, >, ) { + if self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { + // Too many inbound substreams have been created, end the connection. + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: The maximum number of inbound substreams created has been exceeded."); + return; + } + let (substream, peer_kind) = protocol; // If the peer doesn't support the protocol, reject all substreams @@ -210,6 +217,12 @@ impl Handler { ::OutboundOpenInfo, >, ) { + if self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { + self.keep_alive = KeepAlive::No; + log::info!("Gossipsub error: The maximum number of outbound substreams created has been exceeded"); + return; + } + let (substream, peer_kind) = protocol; // If the peer doesn't support the protocol, reject all substreams @@ -303,21 +316,14 @@ impl ConnectionHandler for Handler { } } - if self.inbound_substreams_created > MAX_SUBSTREAM_CREATION { - // Too many inbound substreams have been created, end the connection. - self.keep_alive = KeepAlive::No; - log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams); - } + // Invariant: `self.inbound_substreams_created < MAX_SUBSTREAM_CREATION`. // determine if we need to create the stream if !self.send_queue.is_empty() && self.outbound_substream.is_none() && !self.outbound_substream_establishing { - if self.outbound_substreams_created >= MAX_SUBSTREAM_CREATION { - self.keep_alive = KeepAlive::No; - log::info!("Gossipsub error: {}", HandlerError::MaxInboundSubstreams); - } + // Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`. let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); self.outbound_substream_establishing = true; @@ -438,7 +444,10 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)); } Err(e) => { - error!("Error sending message: {e}"); + log::debug!( + "Outbound substream error while sending output: {e}" + ); + self.outbound_substream = None; break; } } @@ -467,7 +476,7 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - log::info!("NegotaitedSubstream error: {e}"); + log::debug!("Outbound substream error while flushing output: {e}"); break; } Poll::Pending => { @@ -536,11 +545,10 @@ impl ConnectionHandler for Handler { // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { self.keep_alive = KeepAlive::No; - log::info!("Dial upgrade error: {}", HandlerError::NegotiationTimeout); + log::info!("Dial upgrade error: Protocol negotiation timeout."); } // There was an error post negotiation, close the connection. ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { - self.keep_alive = KeepAlive::No; log::info!("Dial upgrade error: {e}"); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { @@ -551,11 +559,7 @@ impl ConnectionHandler for Handler { log::info!("Dial upgrade error: {}", NegotiationError::Failed); } NegotiationError::ProtocolError(e) => { - self.keep_alive = KeepAlive::No; - log::info!( - "Gossipsub error: {}", - HandlerError::NegotiationProtocolError(e) - ); + log::info!("Gossipsub error: Protocol negotiation failed. {e}"); } } } From f4cfbc3a37fbfcf4f53667494dee280ed37440a4 Mon Sep 17 00:00:00 2001 From: Victor Ermolaev Date: Tue, 21 Mar 2023 13:51:55 +0100 Subject: [PATCH 04/35] Address PR comments. --- protocols/gossipsub/src/error_priv.rs | 13 +++++++ protocols/gossipsub/src/handler.rs | 51 ++++++++++++++------------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/protocols/gossipsub/src/error_priv.rs b/protocols/gossipsub/src/error_priv.rs index e77a551d285..5e0c18ecd7d 100644 --- a/protocols/gossipsub/src/error_priv.rs +++ b/protocols/gossipsub/src/error_priv.rs @@ -21,6 +21,7 @@ //! Error types that can result from gossipsub. use libp2p_core::identity::error::SigningError; +use libp2p_core::upgrade::ProtocolError; use thiserror::Error; /// Error associated with publishing a gossipsub message. @@ -88,8 +89,20 @@ impl From for PublishError { /// Errors that can occur in the protocols handler. #[derive(Debug, Error)] pub enum HandlerError { + #[deprecated(note = "This error will not be emitted")] + #[error("The maximum number of inbound substreams created has been exceeded.")] + MaxInboundSubstreams, + #[deprecated(note = "This error will not be emitted")] + #[error("The maximum number of outbound substreams created has been exceeded.")] + MaxOutboundSubstreams, #[error("The message exceeds the maximum transmission size.")] MaxTransmissionSize, + #[deprecated(note = "This error will not be emitted")] + #[error("Protocol negotiation timeout.")] + NegotiationTimeout, + #[deprecated(note = "This error will not be emitted")] + #[error("Protocol negotiation failed.")] + NegotiationProtocolError(ProtocolError), #[error("Failed to encode or decode")] Codec(#[from] quick_protobuf_codec::Error), } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index a9e97f28b25..1296e918418 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -181,13 +181,6 @@ impl Handler { ::InboundOpenInfo, >, ) { - if self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { - // Too many inbound substreams have been created, end the connection. - self.keep_alive = KeepAlive::No; - log::info!("Gossipsub error: The maximum number of inbound substreams created has been exceeded."); - return; - } - let (substream, peer_kind) = protocol; // If the peer doesn't support the protocol, reject all substreams @@ -217,12 +210,6 @@ impl Handler { ::OutboundOpenInfo, >, ) { - if self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { - self.keep_alive = KeepAlive::No; - log::info!("Gossipsub error: The maximum number of outbound substreams created has been exceeded"); - return; - } - let (substream, peer_kind) = protocol; // If the peer doesn't support the protocol, reject all substreams @@ -533,9 +520,26 @@ impl ConnectionHandler for Handler { ) { match event { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { + if self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { + // Too many inbound substreams have been created, end the connection. + self.keep_alive = KeepAlive::No; + log::info!( + "The maximum number of inbound substreams created has been exceeded." + ); + return; + } + self.on_fully_negotiated_inbound(fully_negotiated_inbound) } ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + if self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { + self.keep_alive = KeepAlive::No; + log::info!( + "The maximum number of outbound substreams created has been exceeded" + ); + return; + } + self.on_fully_negotiated_outbound(fully_negotiated_outbound) } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { @@ -544,25 +548,22 @@ impl ConnectionHandler for Handler { match error { // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - self.keep_alive = KeepAlive::No; log::info!("Dial upgrade error: Protocol negotiation timeout."); } // There was an error post negotiation, close the connection. ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { log::info!("Dial upgrade error: {e}"); } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(negotiation_error)) => { - match negotiation_error { - NegotiationError::Failed => { - // The protocol is not supported - self.protocol_unsupported = true; - log::info!("Dial upgrade error: {}", NegotiationError::Failed); - } - NegotiationError::ProtocolError(e) => { - log::info!("Gossipsub error: Protocol negotiation failed. {e}"); - } - } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )) => { + // The protocol is not supported + self.protocol_unsupported = true; + log::info!("Dial upgrade error: {}", NegotiationError::Failed); } + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(e), + )) => log::info!("Protocol negotiation failed. {e}"), } } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} From e7e96edcb57ed91efc6fe199b924ce76779714c8 Mon Sep 17 00:00:00 2001 From: Victor Ermolaev Date: Tue, 21 Mar 2023 17:21:47 +0100 Subject: [PATCH 05/35] Address PR comments. --- protocols/gossipsub/src/error_priv.rs | 8 +++---- protocols/gossipsub/src/handler.rs | 31 ++++++++++++++------------- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/protocols/gossipsub/src/error_priv.rs b/protocols/gossipsub/src/error_priv.rs index 5e0c18ecd7d..53b47e53e96 100644 --- a/protocols/gossipsub/src/error_priv.rs +++ b/protocols/gossipsub/src/error_priv.rs @@ -89,18 +89,18 @@ impl From for PublishError { /// Errors that can occur in the protocols handler. #[derive(Debug, Error)] pub enum HandlerError { - #[deprecated(note = "This error will not be emitted")] + #[deprecated(note = "This error will no longer be emitted")] #[error("The maximum number of inbound substreams created has been exceeded.")] MaxInboundSubstreams, - #[deprecated(note = "This error will not be emitted")] + #[deprecated(note = "This error will no longer be emitted")] #[error("The maximum number of outbound substreams created has been exceeded.")] MaxOutboundSubstreams, #[error("The message exceeds the maximum transmission size.")] MaxTransmissionSize, - #[deprecated(note = "This error will not be emitted")] + #[deprecated(note = "This error will no longer be emitted")] #[error("Protocol negotiation timeout.")] NegotiationTimeout, - #[deprecated(note = "This error will not be emitted")] + #[deprecated(note = "This error will no longer be emitted")] #[error("Protocol negotiation failed.")] NegotiationProtocolError(ProtocolError), #[error("Failed to encode or decode")] diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 1296e918418..77b39e5603d 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -33,7 +33,6 @@ use libp2p_swarm::handler::{ SubstreamProtocol, }; use libp2p_swarm::NegotiatedSubstream; -use log::{error, trace, warn}; use smallvec::SmallVec; use std::{ pin::Pin, @@ -196,7 +195,7 @@ impl Handler { } // new inbound substream. Replace the current one, if it exists. - trace!("New inbound substream request"); + log::trace!("New inbound substream request"); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); } @@ -228,7 +227,7 @@ impl Handler { // Should never establish a new outbound substream if one already exists. // If this happens, an outbound message is not sent. if self.outbound_substream.is_some() { - warn!("Established an outbound substream with one already available"); + log::warn!("Established an outbound substream with one already available"); // Add the message back to the send queue self.send_queue.push(message); } else { @@ -339,12 +338,12 @@ impl ConnectionHandler for Handler { Poll::Ready(Some(Err(error))) => { match error { HandlerError::MaxTransmissionSize => { - warn!("Message exceeded the maximum transmission size"); + log::warn!("Message exceeded the maximum transmission size"); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); } _ => { - warn!("Inbound stream error: {}", error); + log::warn!("Inbound stream error: {}", error); // More serious errors, close this side of the stream. If the // peer is still around, they will re-establish their // connection @@ -355,7 +354,7 @@ impl ConnectionHandler for Handler { } // peer closed the stream Poll::Ready(None) => { - warn!("Peer closed their outbound stream"); + log::warn!("Peer closed their outbound stream"); self.inbound_substream = Some(InboundSubstreamState::Closing(substream)); } @@ -373,7 +372,7 @@ impl ConnectionHandler for Handler { // Don't close the connection but just drop the inbound substream. // In case the remote has more to send, they will open up a new // substream. - warn!("Inbound substream error while closing: {:?}", e); + log::warn!("Inbound substream error while closing: {e}"); } self.inbound_substream = None; if self.outbound_substream.is_none() { @@ -426,7 +425,7 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::PendingFlush(substream)) } Err(HandlerError::MaxTransmissionSize) => { - error!("Message exceeded the maximum transmission size and was not sent."); + log::error!("Message exceeded the maximum transmission size and was not sent."); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); } @@ -440,7 +439,7 @@ impl ConnectionHandler for Handler { } } Poll::Ready(Err(e)) => { - error!("Outbound substream error while sending output: {e:?}"); + log::debug!("Outbound substream error while sending output: {e}"); break; } Poll::Pending => { @@ -485,7 +484,7 @@ impl ConnectionHandler for Handler { break; } Poll::Ready(Err(e)) => { - warn!("Outbound substream error while closing: {e:?}"); + log::debug!("Outbound substream error while closing: {e}"); break; } Poll::Pending => { @@ -548,22 +547,24 @@ impl ConnectionHandler for Handler { match error { // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - log::info!("Dial upgrade error: Protocol negotiation timeout."); + log::debug!("Dial upgrade error: Protocol negotiation timeout."); } // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { - log::info!("Dial upgrade error: {e}"); + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { + unreachable!("Error occurred during stream upgrade"); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::Failed, )) => { // The protocol is not supported self.protocol_unsupported = true; - log::info!("Dial upgrade error: {}", NegotiationError::Failed); + log::debug!( + "The remote peer does not support gossipsub on this connection" + ); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::ProtocolError(e), - )) => log::info!("Protocol negotiation failed. {e}"), + )) => log::debug!("Protocol negotiation failed: {e}"), } } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} From e37ba58bc829580804998d9cab9f601f21372b95 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 21 Mar 2023 18:38:52 +0100 Subject: [PATCH 06/35] Use `void` instead of panic --- Cargo.lock | 1 + protocols/gossipsub/Cargo.toml | 1 + protocols/gossipsub/src/handler.rs | 4 ++-- protocols/gossipsub/src/protocol.rs | 5 +++-- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f987cf4680a..ae1f545de47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2359,6 +2359,7 @@ dependencies = [ "smallvec", "thiserror", "unsigned-varint", + "void", "wasm-timer", ] diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index ef82ee553ac..05abdf73c05 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -33,6 +33,7 @@ serde = { version = "1", optional = true, features = ["derive"] } thiserror = "1.0" wasm-timer = "0.2.5" instant = "0.1.11" +void = "1.0.2" # Metrics dependencies prometheus-client = "0.19.0" diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 77b39e5603d..7e67dfd3c5c 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -550,8 +550,8 @@ impl ConnectionHandler for Handler { log::debug!("Dial upgrade error: Protocol negotiation timeout."); } // There was an error post negotiation, close the connection. - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(_)) => { - unreachable!("Error occurred during stream upgrade"); + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { + void::unreachable(e) } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::Failed, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index f7b04269c92..15824db443a 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -37,6 +37,7 @@ use log::{debug, warn}; use quick_protobuf::Writer; use std::pin::Pin; use unsigned_varint::codec; +use void::Void; pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:"; @@ -147,7 +148,7 @@ where TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static, { type Output = (Framed, PeerKind); - type Error = HandlerError; + type Error = Void; type Future = Pin> + Send>>; fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { @@ -168,7 +169,7 @@ where TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static, { type Output = (Framed, PeerKind); - type Error = HandlerError; + type Error = Void; type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future { From b6be9ce42d06bcf629c83b0cac9faf92f0e1ac8a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 21 Mar 2023 18:50:09 +0100 Subject: [PATCH 07/35] Check created streams counter also for failed upgrades --- protocols/gossipsub/src/handler.rs | 44 ++++++++++++++++++------------ 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 7e67dfd3c5c..5aa25d20749 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -517,28 +517,38 @@ impl ConnectionHandler for Handler { Self::OutboundOpenInfo, >, ) { + // Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605. + if matches!( + event, + ConnectionEvent::FullyNegotiatedInbound(_) + | ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. + .. + }) + ) && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION + { + // Too many inbound substreams have been created, disable the handler. + self.keep_alive = KeepAlive::No; + log::info!("The maximum number of inbound substreams created has been exceeded."); + return; + } + + if matches!( + event, + ConnectionEvent::FullyNegotiatedOutbound(_) | ConnectionEvent::DialUpgradeError(_) + ) && self.outbound_substreams_created == MAX_SUBSTREAM_CREATION + { + // Too many outbound substreams have been created, disable the handler. + self.keep_alive = KeepAlive::No; + log::info!("The maximum number of outbound substreams created has been exceeded."); + return; + } + match event { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { - if self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { - // Too many inbound substreams have been created, end the connection. - self.keep_alive = KeepAlive::No; - log::info!( - "The maximum number of inbound substreams created has been exceeded." - ); - return; - } - self.on_fully_negotiated_inbound(fully_negotiated_inbound) } ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { - if self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { - self.keep_alive = KeepAlive::No; - log::info!( - "The maximum number of outbound substreams created has been exceeded" - ); - return; - } - self.on_fully_negotiated_outbound(fully_negotiated_outbound) } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { From 415f6488b54595d52d988a1f016452a594a36279 Mon Sep 17 00:00:00 2001 From: Victor Ermolaev Date: Wed, 22 Mar 2023 10:02:30 +0100 Subject: [PATCH 08/35] Update changelog. --- protocols/gossipsub/CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b3accf6ef18..9ec6ea32f1d 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -2,8 +2,11 @@ - Signed messages now use sequential integers in the sequence number field. See [PR 3551]. +- Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`. + See [PR 3625]. -[PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551 +[PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551git pull +[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325 # 0.44.1 From f87949dbe7dc74eccf09a667008a295e846e3e01 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Mar 2023 22:11:27 +1100 Subject: [PATCH 09/35] Fix typo in changelog --- protocols/gossipsub/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 9ec6ea32f1d..60122326842 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -5,7 +5,7 @@ - Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`. See [PR 3625]. -[PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551git pull +[PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551 [PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325 # 0.44.1 From 9e12f9df95f53758eb3989f8d204f5868a957cd6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Mar 2023 18:49:13 +0100 Subject: [PATCH 10/35] Remove outdated comments --- protocols/gossipsub/src/handler.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 5aa25d20749..cfe165fdf49 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -555,11 +555,9 @@ impl ConnectionHandler for Handler { self.outbound_substream_establishing = false; match error { - // Timeout errors get mapped to NegotiationTimeout and we close the connection. ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { log::debug!("Dial upgrade error: Protocol negotiation timeout."); } - // There was an error post negotiation, close the connection. ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { void::unreachable(e) } From ee6cb026fbacbdb71689b781dab950e763e97866 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Mar 2023 18:52:51 +0100 Subject: [PATCH 11/35] Extract utility functions for classifying `ConnectionEvent` --- protocols/gossipsub/Cargo.toml | 2 +- protocols/gossipsub/src/handler.rs | 17 ++--------------- swarm/src/handler.rs | 25 +++++++++++++++++++++++++ 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 05abdf73c05..50e19fed673 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.42.0", path = "../../swarm" } +libp2p-swarm = { version = "0.42.1", path = "../../swarm" } libp2p-core = { version = "0.39.0", path = "../../core" } libp2p-identity = { version = "0.1.0", path = "../../identity" } bytes = "1.4" diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index cfe165fdf49..602b7549802 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -517,27 +517,14 @@ impl ConnectionHandler for Handler { Self::OutboundOpenInfo, >, ) { - // Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605. - if matches!( - event, - ConnectionEvent::FullyNegotiatedInbound(_) - | ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. - .. - }) - ) && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION - { + if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; log::info!("The maximum number of inbound substreams created has been exceeded."); return; } - if matches!( - event, - ConnectionEvent::FullyNegotiatedOutbound(_) | ConnectionEvent::DialUpgradeError(_) - ) && self.outbound_substreams_created == MAX_SUBSTREAM_CREATION - { + if event.is_outbound() && self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { // Too many outbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; log::info!("The maximum number of outbound substreams created has been exceeded."); diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 4093ecaee32..6fb4be11b58 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -212,6 +212,31 @@ pub enum ConnectionEvent<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IO ListenUpgradeError(ListenUpgradeError), } +impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> + ConnectionEvent<'a, IP, OP, IOI, OOI> +{ + /// Whether the event concerns an outbound stream. + pub fn is_outbound(&self) -> bool { + matches!( + self, + Self::FullyNegotiatedOutbound(_) | Self::DialUpgradeError(_) + ) + } + + /// Whether the event concerns an inbound stream. + pub fn is_inbound(&self) -> bool { + // Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605. + matches!( + self, + Self::FullyNegotiatedInbound(_) + | Self::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. + .. + }) + ) + } +} + /// [`ConnectionEvent`] variant that informs the handler about /// the output of a successful upgrade on a new inbound substream. /// From 3443a698afa664b4c41b71fa54d8744a29f4db17 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Mar 2023 18:53:51 +0100 Subject: [PATCH 12/35] Set `outbound_substream_establishing` in a single place --- protocols/gossipsub/src/handler.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 602b7549802..6902fae2f0e 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -216,7 +216,6 @@ impl Handler { return; } - self.outbound_substream_establishing = false; self.outbound_substreams_created += 1; // update the known kind of peer @@ -517,6 +516,10 @@ impl ConnectionHandler for Handler { Self::OutboundOpenInfo, >, ) { + if event.is_outbound() { + self.outbound_substream_establishing = false; + } + if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; @@ -539,8 +542,6 @@ impl ConnectionHandler for Handler { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { - self.outbound_substream_establishing = false; - match error { ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { log::debug!("Dial upgrade error: Protocol negotiation timeout."); From fef9751ee6567eb82f7ea3db7ef1effe82564e3d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Mar 2023 18:57:30 +0100 Subject: [PATCH 13/35] Flatten match --- protocols/gossipsub/src/handler.rs | 44 +++++++++++++++++------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 6902fae2f0e..43f005e5c00 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -541,27 +541,33 @@ impl ConnectionHandler for Handler { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { self.on_fully_negotiated_outbound(fully_negotiated_outbound) } - ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => { - match error { - ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer => { - log::debug!("Dial upgrade error: Protocol negotiation timeout."); - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)) => { - void::unreachable(e) - } - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::Failed, - )) => { - // The protocol is not supported - self.protocol_unsupported = true; - log::debug!( - "The remote peer does not support gossipsub on this connection" - ); - } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, + .. + }) => { + log::debug!("Dial upgrade error: Protocol negotiation timeout."); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + .. + }) => void::unreachable(e), + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)), + .. + }) => { + // The protocol is not supported + self.protocol_unsupported = true; + log::debug!("The remote peer does not support gossipsub on this connection"); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( NegotiationError::ProtocolError(e), - )) => log::debug!("Protocol negotiation failed: {e}"), - } + )), + .. + }) => { + log::debug!("Protocol negotiation failed: {e}") } ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} } From 7dec2234e26731692fc418da34d4e4224db2cf48 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Mar 2023 17:10:48 +0100 Subject: [PATCH 14/35] Avoid being stuck in "Poisoned" state for outbound streams --- protocols/gossipsub/src/handler.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 43f005e5c00..76283a0e3bb 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -439,6 +439,7 @@ impl ConnectionHandler for Handler { } Poll::Ready(Err(e)) => { log::debug!("Outbound substream error while sending output: {e}"); + self.outbound_substream = None; break; } Poll::Pending => { @@ -462,6 +463,7 @@ impl ConnectionHandler for Handler { } Poll::Ready(Err(e)) => { log::debug!("Outbound substream error while flushing output: {e}"); + self.outbound_substream = None; break; } Poll::Pending => { @@ -484,6 +486,7 @@ impl ConnectionHandler for Handler { } Poll::Ready(Err(e)) => { log::debug!("Outbound substream error while closing: {e}"); + self.outbound_substream = None; break; } Poll::Pending => { From 31632130fe9edb9363420cc439e41d84ff4b609b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Mar 2023 17:19:54 +0100 Subject: [PATCH 15/35] Don't handle error that is never constructed --- protocols/gossipsub/src/error_priv.rs | 1 + protocols/gossipsub/src/handler.rs | 26 ++++++-------------------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/protocols/gossipsub/src/error_priv.rs b/protocols/gossipsub/src/error_priv.rs index 53b47e53e96..cbdd7540adf 100644 --- a/protocols/gossipsub/src/error_priv.rs +++ b/protocols/gossipsub/src/error_priv.rs @@ -95,6 +95,7 @@ pub enum HandlerError { #[deprecated(note = "This error will no longer be emitted")] #[error("The maximum number of outbound substreams created has been exceeded.")] MaxOutboundSubstreams, + #[deprecated(note = "This error will no longer be emitted")] #[error("The message exceeds the maximum transmission size.")] MaxTransmissionSize, #[deprecated(note = "This error will no longer be emitted")] diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 76283a0e3bb..dce053be9d6 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -335,21 +335,12 @@ impl ConnectionHandler for Handler { return Poll::Ready(ConnectionHandlerEvent::Custom(message)); } Poll::Ready(Some(Err(error))) => { - match error { - HandlerError::MaxTransmissionSize => { - log::warn!("Message exceeded the maximum transmission size"); - self.inbound_substream = - Some(InboundSubstreamState::WaitingInput(substream)); - } - _ => { - log::warn!("Inbound stream error: {}", error); - // More serious errors, close this side of the stream. If the - // peer is still around, they will re-establish their - // connection - self.inbound_substream = - Some(InboundSubstreamState::Closing(substream)); - } - } + log::warn!("Inbound stream error: {}", error); + // Close this side of the stream. If the + // peer is still around, they will re-establish their + // connection + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); } // peer closed the stream Poll::Ready(None) => { @@ -423,11 +414,6 @@ impl ConnectionHandler for Handler { self.outbound_substream = Some(OutboundSubstreamState::PendingFlush(substream)) } - Err(HandlerError::MaxTransmissionSize) => { - log::error!("Message exceeded the maximum transmission size and was not sent."); - self.outbound_substream = - Some(OutboundSubstreamState::WaitingOutput(substream)); - } Err(e) => { log::debug!( "Outbound substream error while sending output: {e}" From e28af538318c146d5b19ab7098d9e0017c356ec8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Mar 2023 17:24:21 +0100 Subject: [PATCH 16/35] Deprecate `HandlerError` entirely --- protocols/gossipsub/src/error.rs | 10 ++-------- protocols/gossipsub/src/error_priv.rs | 11 ----------- protocols/gossipsub/src/handler.rs | 4 ++-- protocols/gossipsub/src/lib.rs | 5 ++++- protocols/gossipsub/src/protocol.rs | 12 ++++++------ 5 files changed, 14 insertions(+), 28 deletions(-) diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs index aa04144ff79..61ef13bd248 100644 --- a/protocols/gossipsub/src/error.rs +++ b/protocols/gossipsub/src/error.rs @@ -30,16 +30,10 @@ pub type PublishError = crate::error_priv::PublishError; )] pub type SubscriptionError = crate::error_priv::SubscriptionError; -#[deprecated( - since = "0.44.0", - note = "Use re-exports that omit `Gossipsub` prefix, i.e. `libp2p::gossipsub::HandlerError" -)] +#[deprecated(note = "This error will no longer be emitted")] pub type GossipsubHandlerError = crate::error_priv::HandlerError; -#[deprecated( - since = "0.44.0", - note = "Use `libp2p::gossipsub::HandlerError` instead, as the `error` module will become crate-private in the future." -)] +#[deprecated(note = "This error will no longer be emitted")] pub type HandlerError = crate::error_priv::HandlerError; #[deprecated( diff --git a/protocols/gossipsub/src/error_priv.rs b/protocols/gossipsub/src/error_priv.rs index cbdd7540adf..04cc72028cd 100644 --- a/protocols/gossipsub/src/error_priv.rs +++ b/protocols/gossipsub/src/error_priv.rs @@ -89,19 +89,14 @@ impl From for PublishError { /// Errors that can occur in the protocols handler. #[derive(Debug, Error)] pub enum HandlerError { - #[deprecated(note = "This error will no longer be emitted")] #[error("The maximum number of inbound substreams created has been exceeded.")] MaxInboundSubstreams, - #[deprecated(note = "This error will no longer be emitted")] #[error("The maximum number of outbound substreams created has been exceeded.")] MaxOutboundSubstreams, - #[deprecated(note = "This error will no longer be emitted")] #[error("The message exceeds the maximum transmission size.")] MaxTransmissionSize, - #[deprecated(note = "This error will no longer be emitted")] #[error("Protocol negotiation timeout.")] NegotiationTimeout, - #[deprecated(note = "This error will no longer be emitted")] #[error("Protocol negotiation failed.")] NegotiationProtocolError(ProtocolError), #[error("Failed to encode or decode")] @@ -139,12 +134,6 @@ impl std::fmt::Display for ValidationError { impl std::error::Error for ValidationError {} -impl From for HandlerError { - fn from(error: std::io::Error) -> HandlerError { - HandlerError::Codec(quick_protobuf_codec::Error::from(error)) - } -} - impl From for PublishError { fn from(error: std::io::Error) -> PublishError { PublishError::TransformFailed(error) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index dce053be9d6..79bd9df9599 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -21,7 +21,7 @@ use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::rpc_proto::proto; use crate::types::{PeerKind, RawMessage, Rpc}; -use crate::{HandlerError, ValidationError}; +use crate::ValidationError; use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; @@ -238,7 +238,7 @@ impl Handler { impl ConnectionHandler for Handler { type InEvent = HandlerIn; type OutEvent = HandlerEvent; - type Error = HandlerError; + type Error = crate::error_priv::HandlerError; // TODO: Replace this with `Void`. type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; type OutboundOpenInfo = proto::RPC; diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index 222a2f34f93..4a1d63d93da 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -158,9 +158,12 @@ mod types; mod rpc_proto; +#[deprecated(note = "This error will no longer be emitted")] +pub type HandlerError = error_priv::HandlerError; + pub use self::behaviour::{Behaviour, Event, MessageAuthenticity}; pub use self::config::{Config, ConfigBuilder, ValidationMode, Version}; -pub use self::error_priv::{HandlerError, PublishError, SubscriptionError, ValidationError}; +pub use self::error_priv::{PublishError, SubscriptionError, ValidationError}; pub use self::peer_score::{ score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds, TopicScoreParams, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 15824db443a..3a2de0f5fc2 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -24,8 +24,8 @@ use crate::topic::TopicHash; use crate::types::{ ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction, }; +use crate::ValidationError; use crate::{rpc_proto::proto, Config}; -use crate::{HandlerError, ValidationError}; use asynchronous_codec::{Decoder, Encoder, Framed}; use byteorder::{BigEndian, ByteOrder}; use bytes::BytesMut; @@ -273,18 +273,18 @@ impl GossipsubCodec { impl Encoder for GossipsubCodec { type Item = proto::RPC; - type Error = HandlerError; + type Error = quick_protobuf_codec::Error; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), HandlerError> { - Ok(self.codec.encode(item, dst)?) + fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + self.codec.encode(item, dst) } } impl Decoder for GossipsubCodec { type Item = HandlerEvent; - type Error = HandlerError; + type Error = quick_protobuf_codec::Error; - fn decode(&mut self, src: &mut BytesMut) -> Result, HandlerError> { + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let rpc = match self.codec.decode(src)? { Some(p) => p, None => return Ok(None), From 0507493a1e5232047c8495319e82b00bd80303eb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 29 Mar 2023 15:03:34 +0200 Subject: [PATCH 17/35] Track # of outbound streams requested, not successfully established The GossipSub protocol uses two streams, namely one from the local to the remote and one from the remote to the local node. The rust-libp2p implementation enforces an upper limit on the churn of both via `MAX_SUBSTREAM_CREATION`. Thus far `self.outbound_substreams_created` was increased for every successful stream, but ignored failed streams. Thus, on a given connection, a GossipSub handler could potentially indefinitely retry creating an outbound stream, where each of those streams fails instead of succeeds to upgrade. With this commit the handler tracks the number of requested outbound streams instead. --- protocols/gossipsub/src/handler.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 79bd9df9599..5a45584677d 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -97,8 +97,8 @@ pub struct Handler { /// requests. outbound_substream_establishing: bool, - /// The number of outbound substreams we have created. - outbound_substreams_created: usize, + /// The number of outbound substreams we have requested. + outbound_substreams_requested: usize, /// The number of inbound substreams that have been created by the peer. inbound_substreams_created: usize, @@ -161,7 +161,7 @@ impl Handler { inbound_substream: None, outbound_substream: None, outbound_substream_establishing: false, - outbound_substreams_created: 0, + outbound_substreams_requested: 0, inbound_substreams_created: 0, send_queue: SmallVec::new(), peer_kind: None, @@ -216,8 +216,6 @@ impl Handler { return; } - self.outbound_substreams_created += 1; - // update the known kind of peer if self.peer_kind.is_none() { self.peer_kind = Some(peer_kind); @@ -311,6 +309,7 @@ impl ConnectionHandler for Handler { // Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`. let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); + self.outbound_substreams_requested += 1; self.outbound_substream_establishing = true; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: self.listen_protocol.clone().map_info(|()| message), @@ -516,7 +515,7 @@ impl ConnectionHandler for Handler { return; } - if event.is_outbound() && self.outbound_substreams_created == MAX_SUBSTREAM_CREATION { + if event.is_outbound() && self.outbound_substreams_requested == MAX_SUBSTREAM_CREATION { // Too many outbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; log::info!("The maximum number of outbound substreams created has been exceeded."); From b5728952464f570c5636f5e6a4fa2448e28f3cbe Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 29 Mar 2023 15:09:14 +0200 Subject: [PATCH 18/35] Re-enqueue message when outbound stream fails An outbound stream carries the first message for the remote node when upgrading in the `OutboundOpenInfo`. Thus far the GossipSub handler would drop the message when the outbound stream fails to upgrade. This could e.g. lead to a remote peer receiving a GRAFT without a previous SUBSCRIBE. With this commit the message carried in the upgrade's `OutboundOpenInfo` is re-enqueued. --- protocols/gossipsub/src/handler.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 5a45584677d..cba58981bc3 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -531,9 +531,12 @@ impl ConnectionHandler for Handler { } ConnectionEvent::DialUpgradeError(DialUpgradeError { error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, - .. + info, }) => { log::debug!("Dial upgrade error: Protocol negotiation timeout."); + + // Re-enqueue message. + self.send_queue.insert(0, info); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), From 12e9b537639a6454d1a71b2b816f7a423d8e2908 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 29 Mar 2023 22:56:17 +0200 Subject: [PATCH 19/35] Use early return instead of if-else --- protocols/gossipsub/src/handler.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index cba58981bc3..ba6dbf0a328 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -227,9 +227,10 @@ impl Handler { log::warn!("Established an outbound substream with one already available"); // Add the message back to the send queue self.send_queue.push(message); - } else { - self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); + return; } + + self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); } } From fd4958ded1fca4b8bc87e0a3975fac8131349a15 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 29 Mar 2023 23:02:50 +0200 Subject: [PATCH 20/35] Only send messages in `poll` Instead of popping messages early and having to re-queue them on error, we can remove the use of `OutboundOpenInfo` and only send messages in one place. --- protocols/gossipsub/src/handler.rs | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index ba6dbf0a328..049d2aad7d9 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -201,10 +201,7 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, - FullyNegotiatedOutbound { - protocol, - info: message, - }: FullyNegotiatedOutbound< + FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound< ::OutboundProtocol, ::OutboundOpenInfo, >, @@ -225,12 +222,10 @@ impl Handler { // If this happens, an outbound message is not sent. if self.outbound_substream.is_some() { log::warn!("Established an outbound substream with one already available"); - // Add the message back to the send queue - self.send_queue.push(message); return; } - self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); + self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); } } @@ -240,7 +235,7 @@ impl ConnectionHandler for Handler { type Error = crate::error_priv::HandlerError; // TODO: Replace this with `Void`. type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; - type OutboundOpenInfo = proto::RPC; + type OutboundOpenInfo = (); type OutboundProtocol = ProtocolConfig; fn listen_protocol(&self) -> SubstreamProtocol { @@ -307,13 +302,10 @@ impl ConnectionHandler for Handler { && self.outbound_substream.is_none() && !self.outbound_substream_establishing { - // Invariant: `self.outbound_substreams_created < MAX_SUBSTREAM_CREATION`. - let message = self.send_queue.remove(0); - self.send_queue.shrink_to_fit(); self.outbound_substreams_requested += 1; self.outbound_substream_establishing = true; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: self.listen_protocol.clone().map_info(|()| message), + protocol: self.listen_protocol.clone(), }); } @@ -532,12 +524,9 @@ impl ConnectionHandler for Handler { } ConnectionEvent::DialUpgradeError(DialUpgradeError { error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, - info, + .. }) => { log::debug!("Dial upgrade error: Protocol negotiation timeout."); - - // Re-enqueue message. - self.send_queue.insert(0, info); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), From 44dce056fc1abc3f410b63a572f2aa8ba96c6e5f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 30 Mar 2023 23:22:24 +0200 Subject: [PATCH 21/35] Fix use of `DialUpgradeError` in `is_inbound` check --- swarm/src/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index 6fb4be11b58..d21e090f22a 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -229,7 +229,7 @@ impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> matches!( self, Self::FullyNegotiatedInbound(_) - | Self::DialUpgradeError(DialUpgradeError { + | Self::ListenUpgradeError(ListenUpgradeError { error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. .. }) From 3432ac0d2f511bfe49309d1215b62d82b0374e86 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Apr 2023 17:47:54 +0200 Subject: [PATCH 22/35] Move changelog entry --- protocols/gossipsub/CHANGELOG.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 6ac64b30717..541cc8aa4d8 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,12 +1,16 @@ +## 0.44.3 - unreleased + +- Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`. + See [PR 3625]. + +[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325 + ## 0.44.2 - Signed messages now use sequential integers in the sequence number field. See [PR 3551]. -- Gracefully disable handler on stream errors. Deprecate a few variants of `HandlerError`. - See [PR 3625]. [PR 3551]: https://github.com/libp2p/rust-libp2p/pull/3551 -[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3325 ## 0.44.1 From db59d23f8cb445a9bdd83b6f3c89037f05fc1248 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Apr 2023 17:48:17 +0200 Subject: [PATCH 23/35] Bump version --- Cargo.lock | 2 +- protocols/gossipsub/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7eb0fff15eb..512fc4095bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2381,7 +2381,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" -version = "0.44.2" +version = "0.44.3" dependencies = [ "async-std", "asynchronous-codec", diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 631d17e8c5f..983eecbb07b 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-gossipsub" edition = "2021" rust-version = "1.62.0" description = "Gossipsub protocol for libp2p" -version = "0.44.2" +version = "0.44.3" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From b94ec283e1f0abe992bda95807a1c3a2d7df110e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Apr 2023 17:53:56 +0200 Subject: [PATCH 24/35] Change log level to warn for bad events Hitting the max substream creation means something bad is happening. This is worth reporting as `warn`. --- protocols/gossipsub/src/handler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 049d2aad7d9..4f7de244ca3 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -504,14 +504,14 @@ impl ConnectionHandler for Handler { if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; - log::info!("The maximum number of inbound substreams created has been exceeded."); + log::warn!("The maximum number of inbound substreams created has been exceeded."); return; } if event.is_outbound() && self.outbound_substreams_requested == MAX_SUBSTREAM_CREATION { // Too many outbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; - log::info!("The maximum number of outbound substreams created has been exceeded."); + log::warn!("The maximum number of outbound substreams created has been exceeded."); return; } From c5e3c41cb2924bb9e9c50db5455a3292aa5db1b1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 3 Apr 2023 17:56:08 +0200 Subject: [PATCH 25/35] Don't end log messages with periods --- protocols/gossipsub/src/handler.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 4f7de244ca3..1696efd5b92 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -504,14 +504,14 @@ impl ConnectionHandler for Handler { if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { // Too many inbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; - log::warn!("The maximum number of inbound substreams created has been exceeded."); + log::warn!("The maximum number of inbound substreams created has been exceeded"); return; } if event.is_outbound() && self.outbound_substreams_requested == MAX_SUBSTREAM_CREATION { // Too many outbound substreams have been created, disable the handler. self.keep_alive = KeepAlive::No; - log::warn!("The maximum number of outbound substreams created has been exceeded."); + log::warn!("The maximum number of outbound substreams created has been exceeded"); return; } @@ -526,7 +526,7 @@ impl ConnectionHandler for Handler { error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, .. }) => { - log::debug!("Dial upgrade error: Protocol negotiation timeout."); + log::debug!("Dial upgrade error: Protocol negotiation timeout"); } ConnectionEvent::DialUpgradeError(DialUpgradeError { error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), From c02a3a34abd0e222a60763a7cea97be51cea06c3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 4 Apr 2023 12:41:38 +0200 Subject: [PATCH 26/35] Use exhaustive match --- swarm/src/handler.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/swarm/src/handler.rs b/swarm/src/handler.rs index d21e090f22a..1917117c44e 100644 --- a/swarm/src/handler.rs +++ b/swarm/src/handler.rs @@ -217,23 +217,30 @@ impl<'a, IP: InboundUpgradeSend, OP: OutboundUpgradeSend, IOI, OOI> { /// Whether the event concerns an outbound stream. pub fn is_outbound(&self) -> bool { - matches!( - self, - Self::FullyNegotiatedOutbound(_) | Self::DialUpgradeError(_) - ) + match self { + ConnectionEvent::DialUpgradeError(_) | ConnectionEvent::FullyNegotiatedOutbound(_) => { + true + } + ConnectionEvent::FullyNegotiatedInbound(_) + | ConnectionEvent::AddressChange(_) + | ConnectionEvent::ListenUpgradeError(_) => false, + } } /// Whether the event concerns an inbound stream. pub fn is_inbound(&self) -> bool { // Note: This will get simpler with https://github.com/libp2p/rust-libp2p/pull/3605. - matches!( - self, - Self::FullyNegotiatedInbound(_) - | Self::ListenUpgradeError(ListenUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. - .. - }) - ) + match self { + ConnectionEvent::FullyNegotiatedInbound(_) + | ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(_)), // Only `Select` is relevant, the others may be for other handlers too. + .. + }) => true, + ConnectionEvent::FullyNegotiatedOutbound(_) + | ConnectionEvent::ListenUpgradeError(_) + | ConnectionEvent::AddressChange(_) + | ConnectionEvent::DialUpgradeError(_) => false, + } } } From e94c2c7a433e896b19f3b24d9390f0b3a75ba5d6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 4 Apr 2023 13:23:17 +0200 Subject: [PATCH 27/35] Make error message consistently `debug` and use same wording --- protocols/gossipsub/src/handler.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 1696efd5b92..6b9cfe3fb3c 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -327,7 +327,7 @@ impl ConnectionHandler for Handler { return Poll::Ready(ConnectionHandlerEvent::Custom(message)); } Poll::Ready(Some(Err(error))) => { - log::warn!("Inbound stream error: {}", error); + log::debug!("Failed to read from inbound stream: {error}"); // Close this side of the stream. If the // peer is still around, they will re-establish their // connection @@ -336,7 +336,7 @@ impl ConnectionHandler for Handler { } // peer closed the stream Poll::Ready(None) => { - log::warn!("Peer closed their outbound stream"); + log::debug!("Inbound stream closed by remote"); self.inbound_substream = Some(InboundSubstreamState::Closing(substream)); } @@ -354,7 +354,7 @@ impl ConnectionHandler for Handler { // Don't close the connection but just drop the inbound substream. // In case the remote has more to send, they will open up a new // substream. - log::warn!("Inbound substream error while closing: {e}"); + log::debug!("Inbound substream error while closing: {e}"); } self.inbound_substream = None; if self.outbound_substream.is_none() { @@ -407,16 +407,14 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::PendingFlush(substream)) } Err(e) => { - log::debug!( - "Outbound substream error while sending output: {e}" - ); + log::debug!("Failed to send message on outbound stream: {e}"); self.outbound_substream = None; break; } } } Poll::Ready(Err(e)) => { - log::debug!("Outbound substream error while sending output: {e}"); + log::debug!("Failed to send message on outbound stream: {e}"); self.outbound_substream = None; break; } @@ -440,7 +438,7 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => { - log::debug!("Outbound substream error while flushing output: {e}"); + log::debug!("Failed to flush outbound stream: {e}"); self.outbound_substream = None; break; } @@ -463,7 +461,7 @@ impl ConnectionHandler for Handler { break; } Poll::Ready(Err(e)) => { - log::debug!("Outbound substream error while closing: {e}"); + log::debug!("Failed to close outbound stream: {e}"); self.outbound_substream = None; break; } From 798ef5ca7be7c4ddc872485ec332e8d488f056cc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 5 Apr 2023 20:26:53 +0200 Subject: [PATCH 28/35] Update protocols/gossipsub/src/handler.rs --- protocols/gossipsub/src/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 6b9cfe3fb3c..ab9658f3f8a 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -330,7 +330,7 @@ impl ConnectionHandler for Handler { log::debug!("Failed to read from inbound stream: {error}"); // Close this side of the stream. If the // peer is still around, they will re-establish their - // connection + // outbound stream i.e. our inbound stream. self.inbound_substream = Some(InboundSubstreamState::Closing(substream)); } From bbdf8f5fb6a73b24244131feeb758b9942d8696a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 5 Apr 2023 20:39:44 +0200 Subject: [PATCH 29/35] chore: bump libp2p-swarm to v0.42.2 --- protocols/gossipsub/Cargo.toml | 2 +- swarm/CHANGELOG.md | 6 ++++++ swarm/Cargo.toml | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 983eecbb07b..0e3cea758bf 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.42.1", path = "../../swarm" } +libp2p-swarm = { version = "0.42.2", path = "../../swarm" } libp2p-core = { version = "0.39.0", path = "../../core" } libp2p-identity = { version = "0.1.0", path = "../../identity" } bytes = "1.4" diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 67a31cf8e94..eff4fac0f02 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.42.2 - unreleased + +- Add `ConnectionEvent::` `is_outbound` and `is_inbound`. See [PR 3625]. + +[PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3625 + ## 0.42.1 - Deprecate `ConnectionLimits` in favor of `libp2p::connection_limits`. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 0b56b6cc702..4b627da8e01 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.62.0" description = "The libp2p swarm" -version = "0.42.1" +version = "0.42.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From af215893db2c7cbcb56e2bc828089558604ec813 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 5 Apr 2023 20:42:15 +0200 Subject: [PATCH 30/35] Update Cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 072a412ce2c..9bfd027b719 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2862,7 +2862,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.42.1" +version = "0.42.2" dependencies = [ "async-std", "either", From f999f3e489ed33d0d13ff612ca853a85df727d99 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 6 Apr 2023 13:25:13 +0200 Subject: [PATCH 31/35] Refactor keep alive mechanism - Split handler into `EnabledHandler` and `DisabledHandler`, reducing the amount of potential state variants (e.g. protocol-not-supported but keep-alive-yes) and enforce the possible states at compile time. - Return `DeniedUpgrade` when handler is disabled (i.e. `DisabledHandler`). - Don't update `KeepAlive` across all handler methods, but instead derive it ad-hoc in `ConnectionHandler::connection_keep_alive()` based on the state of the handler. Again reduces number of possible invalid states at compile time. - Remove INITIAL_KEEP_ALIVE. The default idle timeout is 120s. This should be plenty enough time to establish in and outbound streams. --- Cargo.lock | 1 + protocols/gossipsub/Cargo.toml | 1 + protocols/gossipsub/src/handler.rs | 392 +++++++++++++++-------------- 3 files changed, 204 insertions(+), 190 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9bfd027b719..49f2d05d166 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2449,6 +2449,7 @@ dependencies = [ "base64 0.21.0", "byteorder", "bytes", + "either", "env_logger 0.10.0", "fnv", "futures", diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 0e3cea758bf..0218559aab4 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +either = "1.5" libp2p-swarm = { version = "0.42.2", path = "../../swarm" } libp2p-core = { version = "0.39.0", path = "../../core" } libp2p-identity = { version = "0.1.0", path = "../../identity" } diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index ab9658f3f8a..2473f07cec0 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -26,7 +26,7 @@ use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; use instant::Instant; -use libp2p_core::upgrade::{NegotiationError, UpgradeError}; +use libp2p_core::upgrade::{DeniedUpgrade, NegotiationError, UpgradeError}; use libp2p_swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, @@ -39,9 +39,7 @@ use std::{ task::{Context, Poll}, time::Duration, }; - -/// The initial time (in seconds) we set the keep alive for protocol negotiations to occur. -const INITIAL_KEEP_ALIVE: u64 = 30; +use void::Void; /// The event emitted by the Handler. This informs the behaviour of various events created /// by the handler. @@ -79,10 +77,15 @@ pub enum HandlerIn { /// connection faulty and disconnect. This also prevents against potential substream creation loops. const MAX_SUBSTREAM_CREATION: usize = 5; +pub enum Handler { + Enabled(EnabledHandler), + Disabled(DisabledHandler), +} + /// Protocol Handler that manages a single long-lived substream with a peer. -pub struct Handler { +pub struct EnabledHandler { /// Upgrade configuration for the gossipsub protocol. - listen_protocol: SubstreamProtocol, + listen_protocol: ProtocolConfig, /// The single long-lived outbound substream. outbound_substream: Option, @@ -111,24 +114,29 @@ pub struct Handler { // NOTE: Use this flag rather than checking the substream count each poll. peer_kind_sent: bool, - /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect. - /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being - /// established. - /// - /// This value is set to true to indicate the peer doesn't support gossipsub. - protocol_unsupported: bool, + last_io_activity: Instant, - /// The amount of time we allow idle connections before disconnecting. + /// The amount of time we keep an idle connection alive. idle_timeout: Duration, - /// Flag determining whether to maintain the connection to the peer. - keep_alive: KeepAlive, - /// Keeps track of whether this connection is for a peer in the mesh. This is used to make /// decisions about the keep alive state for this connection. in_mesh: bool, } +pub enum DisabledHandler { + /// If the peer doesn't support the gossipsub protocol we do not immediately disconnect. + /// Rather, we disable the handler and prevent any incoming or outgoing substreams from being + /// established. + ProtocolUnsupported { + /// Keeps track on whether we have sent the peer kind to the behaviour. + peer_kind_sent: bool, + }, + /// The maximum number of inbound or outbound streams have been created and thereby the handler + /// has been disabled. + MaxStreamCreation, +} + /// State of the inbound substream, opened either by us or by the remote. enum InboundSubstreamState { /// Waiting for a message from the remote. The idle state for an inbound substream. @@ -147,8 +155,6 @@ enum OutboundSubstreamState { PendingSend(Framed, proto::RPC), /// Waiting to flush the substream so that the data arrives to the remote. PendingFlush(Framed), - /// The substream is being closed. Used by either substream. - _Closing(Framed), /// An error occurred during processing. Poisoned, } @@ -156,8 +162,8 @@ enum OutboundSubstreamState { impl Handler { /// Builds a new [`Handler`]. pub fn new(protocol_config: ProtocolConfig, idle_timeout: Duration) -> Self { - Handler { - listen_protocol: SubstreamProtocol::new(protocol_config, ()), + Handler::Enabled(EnabledHandler { + listen_protocol: protocol_config, inbound_substream: None, outbound_substream: None, outbound_substream_establishing: false, @@ -166,27 +172,18 @@ impl Handler { send_queue: SmallVec::new(), peer_kind: None, peer_kind_sent: false, - protocol_unsupported: false, + last_io_activity: Instant::now(), idle_timeout, - keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), in_mesh: false, - } + }) } +} +impl EnabledHandler { fn on_fully_negotiated_inbound( &mut self, - FullyNegotiatedInbound { protocol, .. }: FullyNegotiatedInbound< - ::InboundProtocol, - ::InboundOpenInfo, - >, + (substream, peer_kind): (Framed, PeerKind), ) { - let (substream, peer_kind) = protocol; - - // If the peer doesn't support the protocol, reject all substreams - if self.protocol_unsupported { - return; - } - self.inbound_substreams_created += 1; // update the known kind of peer @@ -202,90 +199,35 @@ impl Handler { fn on_fully_negotiated_outbound( &mut self, FullyNegotiatedOutbound { protocol, .. }: FullyNegotiatedOutbound< - ::OutboundProtocol, - ::OutboundOpenInfo, + ::OutboundProtocol, + ::OutboundOpenInfo, >, ) { let (substream, peer_kind) = protocol; - // If the peer doesn't support the protocol, reject all substreams - if self.protocol_unsupported { - return; - } - // update the known kind of peer if self.peer_kind.is_none() { self.peer_kind = Some(peer_kind); } - // Should never establish a new outbound substream if one already exists. - // If this happens, an outbound message is not sent. - if self.outbound_substream.is_some() { - log::warn!("Established an outbound substream with one already available"); - return; - } - + assert!( + self.outbound_substream.is_none(), + "Established an outbound substream with one already available" + ); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)); } -} - -impl ConnectionHandler for Handler { - type InEvent = HandlerIn; - type OutEvent = HandlerEvent; - type Error = crate::error_priv::HandlerError; // TODO: Replace this with `Void`. - type InboundOpenInfo = (); - type InboundProtocol = ProtocolConfig; - type OutboundOpenInfo = (); - type OutboundProtocol = ProtocolConfig; - - fn listen_protocol(&self) -> SubstreamProtocol { - self.listen_protocol.clone() - } - - fn on_behaviour_event(&mut self, message: HandlerIn) { - if !self.protocol_unsupported { - match message { - HandlerIn::Message(m) => self.send_queue.push(m), - // If we have joined the mesh, keep the connection alive. - HandlerIn::JoinedMesh => { - self.in_mesh = true; - self.keep_alive = KeepAlive::Yes; - } - // If we have left the mesh, start the idle timer. - HandlerIn::LeftMesh => { - self.in_mesh = false; - self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout); - } - } - } - } - - fn connection_keep_alive(&self) -> KeepAlive { - self.keep_alive - } fn poll( &mut self, cx: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< - Self::OutboundProtocol, - Self::OutboundOpenInfo, - Self::OutEvent, - Self::Error, + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, >, > { - if self.protocol_unsupported && !self.peer_kind_sent { - self.peer_kind_sent = true; - // clear all substreams so the keep alive returns false - self.inbound_substream = None; - self.outbound_substream = None; - self.keep_alive = KeepAlive::No; - return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( - PeerKind::NotSupported, - ))); - } - if !self.peer_kind_sent { if let Some(peer_kind) = self.peer_kind.as_ref() { self.peer_kind_sent = true; @@ -295,9 +237,7 @@ impl ConnectionHandler for Handler { } } - // Invariant: `self.inbound_substreams_created < MAX_SUBSTREAM_CREATION`. - - // determine if we need to create the stream + // determine if we need to create the outbound stream if !self.send_queue.is_empty() && self.outbound_substream.is_none() && !self.outbound_substream_establishing @@ -305,7 +245,7 @@ impl ConnectionHandler for Handler { self.outbound_substreams_requested += 1; self.outbound_substream_establishing = true; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { - protocol: self.listen_protocol.clone(), + protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()), }); } @@ -318,10 +258,7 @@ impl ConnectionHandler for Handler { Some(InboundSubstreamState::WaitingInput(mut substream)) => { match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(message))) => { - if !self.in_mesh { - self.keep_alive = - KeepAlive::Until(Instant::now() + self.idle_timeout); - } + self.last_io_activity = Instant::now(); self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); return Poll::Ready(ConnectionHandlerEvent::Custom(message)); @@ -357,9 +294,6 @@ impl ConnectionHandler for Handler { log::debug!("Inbound substream error while closing: {e}"); } self.inbound_substream = None; - if self.outbound_substream.is_none() { - self.keep_alive = KeepAlive::No; - } break; } Poll::Pending => { @@ -419,7 +353,6 @@ impl ConnectionHandler for Handler { break; } Poll::Pending => { - self.keep_alive = KeepAlive::Yes; self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); break; @@ -429,11 +362,7 @@ impl ConnectionHandler for Handler { Some(OutboundSubstreamState::PendingFlush(mut substream)) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { - if !self.in_mesh { - // if not in the mesh, reset the idle timeout - self.keep_alive = - KeepAlive::Until(Instant::now() + self.idle_timeout); - } + self.last_io_activity = Instant::now(); self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)) } @@ -443,36 +372,12 @@ impl ConnectionHandler for Handler { break; } Poll::Pending => { - self.keep_alive = KeepAlive::Yes; self.outbound_substream = Some(OutboundSubstreamState::PendingFlush(substream)); break; } } } - // Currently never used - manual shutdown may implement this in the future - Some(OutboundSubstreamState::_Closing(mut substream)) => { - match Sink::poll_close(Pin::new(&mut substream), cx) { - Poll::Ready(Ok(())) => { - self.outbound_substream = None; - if self.inbound_substream.is_none() { - self.keep_alive = KeepAlive::No; - } - break; - } - Poll::Ready(Err(e)) => { - log::debug!("Failed to close outbound stream: {e}"); - self.outbound_substream = None; - break; - } - Poll::Pending => { - self.keep_alive = KeepAlive::No; - self.outbound_substream = - Some(OutboundSubstreamState::_Closing(substream)); - break; - } - } - } None => { self.outbound_substream = None; break; @@ -485,6 +390,90 @@ impl ConnectionHandler for Handler { Poll::Pending } +} + +impl ConnectionHandler for Handler { + type InEvent = HandlerIn; + type OutEvent = HandlerEvent; + type Error = Void; + type InboundOpenInfo = (); + type InboundProtocol = either::Either; + type OutboundOpenInfo = (); + type OutboundProtocol = ProtocolConfig; + + fn listen_protocol(&self) -> SubstreamProtocol { + match self { + Handler::Enabled(handler) => { + SubstreamProtocol::new(either::Either::Left(handler.listen_protocol.clone()), ()) + } + Handler::Disabled(_) => { + SubstreamProtocol::new(either::Either::Right(DeniedUpgrade), ()) + } + } + } + + fn on_behaviour_event(&mut self, message: HandlerIn) { + match self { + Handler::Enabled(handler) => match message { + HandlerIn::Message(m) => handler.send_queue.push(m), + HandlerIn::JoinedMesh => { + handler.in_mesh = true; + } + HandlerIn::LeftMesh => { + handler.in_mesh = false; + } + }, + Handler::Disabled(_) => {} + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + match self { + Handler::Enabled(handler) => { + if handler.in_mesh { + return KeepAlive::Yes; + } + + if let Some( + OutboundSubstreamState::PendingSend(_, _) + | OutboundSubstreamState::PendingFlush(_), + ) = handler.outbound_substream + { + return KeepAlive::Yes; + } + + KeepAlive::Until(handler.last_io_activity + handler.idle_timeout) + } + Handler::Disabled(_) => KeepAlive::No, + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + match self { + Handler::Enabled(handler) => handler.poll(cx), + Handler::Disabled(DisabledHandler::ProtocolUnsupported { peer_kind_sent }) => { + if !*peer_kind_sent { + *peer_kind_sent = true; + return Poll::Ready(ConnectionHandlerEvent::Custom(HandlerEvent::PeerKind( + PeerKind::NotSupported, + ))); + } + + Poll::Pending + } + Handler::Disabled(DisabledHandler::MaxStreamCreation) => Poll::Pending, + } + } fn on_connection_event( &mut self, @@ -495,60 +484,83 @@ impl ConnectionHandler for Handler { Self::OutboundOpenInfo, >, ) { - if event.is_outbound() { - self.outbound_substream_establishing = false; - } + match self { + Handler::Enabled(handler) => { + if event.is_outbound() { + handler.outbound_substream_establishing = false; + } - if event.is_inbound() && self.inbound_substreams_created == MAX_SUBSTREAM_CREATION { - // Too many inbound substreams have been created, disable the handler. - self.keep_alive = KeepAlive::No; - log::warn!("The maximum number of inbound substreams created has been exceeded"); - return; - } + if event.is_inbound() + && handler.inbound_substreams_created == MAX_SUBSTREAM_CREATION + { + log::warn!( + "The maximum number of inbound substreams created has been exceeded" + ); + *self = Handler::Disabled(DisabledHandler::MaxStreamCreation); + return; + } - if event.is_outbound() && self.outbound_substreams_requested == MAX_SUBSTREAM_CREATION { - // Too many outbound substreams have been created, disable the handler. - self.keep_alive = KeepAlive::No; - log::warn!("The maximum number of outbound substreams created has been exceeded"); - return; - } + if event.is_outbound() + && handler.outbound_substreams_requested == MAX_SUBSTREAM_CREATION + { + log::warn!( + "The maximum number of outbound substreams created has been exceeded" + ); + *self = Handler::Disabled(DisabledHandler::MaxStreamCreation); + return; + } - match event { - ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { - self.on_fully_negotiated_inbound(fully_negotiated_inbound) - } - ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { - self.on_fully_negotiated_outbound(fully_negotiated_outbound) - } - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, - .. - }) => { - log::debug!("Dial upgrade error: Protocol negotiation timeout"); - } - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), - .. - }) => void::unreachable(e), - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)), - .. - }) => { - // The protocol is not supported - self.protocol_unsupported = true; - log::debug!("The remote peer does not support gossipsub on this connection"); - } - ConnectionEvent::DialUpgradeError(DialUpgradeError { - error: - ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( - NegotiationError::ProtocolError(e), - )), - .. - }) => { - log::debug!("Protocol negotiation failed: {e}") + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, + .. + }) => match protocol { + futures::future::Either::Left(protocol) => { + handler.on_fully_negotiated_inbound(protocol) + } + futures::future::Either::Right(v) => void::unreachable(v), + }, + ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { + handler.on_fully_negotiated_outbound(fully_negotiated_outbound) + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Timeout | ConnectionHandlerUpgrErr::Timer, + .. + }) => { + log::debug!("Dial upgrade error: Protocol negotiation timeout"); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), + .. + }) => void::unreachable(e), + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::Failed, + )), + .. + }) => { + // The protocol is not supported + log::debug!( + "The remote peer does not support gossipsub on this connection" + ); + *self = Handler::Disabled(DisabledHandler::ProtocolUnsupported { + peer_kind_sent: false, + }); + } + ConnectionEvent::DialUpgradeError(DialUpgradeError { + error: + ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( + NegotiationError::ProtocolError(e), + )), + .. + }) => { + log::debug!("Protocol negotiation failed: {e}") + } + ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + } } - ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {} + Handler::Disabled(_) => {} } } } From 397afa2e7553fb7fe76687c2e62fd6f1c8f7da72 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 6 Apr 2023 13:44:16 +0200 Subject: [PATCH 32/35] Do minor clean up --- protocols/gossipsub/src/handler.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 2473f07cec0..d0e0b04c0d0 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -23,6 +23,7 @@ use crate::rpc_proto::proto; use crate::types::{PeerKind, RawMessage, Rpc}; use crate::ValidationError; use asynchronous_codec::Framed; +use futures::future::Either; use futures::prelude::*; use futures::StreamExt; use instant::Instant; @@ -70,11 +71,12 @@ pub enum HandlerIn { LeftMesh, } -/// The maximum number of substreams we accept or create before disconnecting from the peer. +/// The maximum number of substreams we accept or create before disabling the handler. /// /// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we /// attempt to recreate these. This imposes an upper bound of new substreams before we consider the -/// connection faulty and disconnect. This also prevents against potential substream creation loops. +/// connection faulty and disable the handler. This also prevents against potential substream +/// creation loops. const MAX_SUBSTREAM_CREATION: usize = 5; pub enum Handler { @@ -515,10 +517,8 @@ impl ConnectionHandler for Handler { protocol, .. }) => match protocol { - futures::future::Either::Left(protocol) => { - handler.on_fully_negotiated_inbound(protocol) - } - futures::future::Either::Right(v) => void::unreachable(v), + Either::Left(protocol) => handler.on_fully_negotiated_inbound(protocol), + Either::Right(v) => void::unreachable(v), }, ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { handler.on_fully_negotiated_outbound(fully_negotiated_outbound) From b01e86f59fe48708ca172737859402d8e079245d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 6 Apr 2023 15:14:43 +0200 Subject: [PATCH 33/35] Add debug for dropped message --- protocols/gossipsub/src/handler.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 748190ffcf6..33ab2b9f555 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -425,7 +425,9 @@ impl ConnectionHandler for Handler { handler.in_mesh = false; } }, - Handler::Disabled(_) => {} + Handler::Disabled(_) => { + log::debug!("Handler is disabled. Dropping message {:?}", message); + } } } From 552cb0891378c7d9cd2df033545a55821d9edd23 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 6 Apr 2023 15:32:56 +0200 Subject: [PATCH 34/35] Track MAX_SUBSTREAM_ATTEMPTS in on_connection_event --- protocols/gossipsub/src/handler.rs | 61 +++++++++++++++--------------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 33ab2b9f555..609bb81a306 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -71,13 +71,13 @@ pub enum HandlerIn { LeftMesh, } -/// The maximum number of substreams we accept or create before disabling the handler. +/// The maximum number of inbound or outbound substreams attempts we allow. /// /// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we /// attempt to recreate these. This imposes an upper bound of new substreams before we consider the /// connection faulty and disable the handler. This also prevents against potential substream /// creation loops. -const MAX_SUBSTREAM_CREATION: usize = 5; +const MAX_SUBSTREAM_ATTEMPTS: usize = 5; pub enum Handler { Enabled(EnabledHandler), @@ -103,10 +103,10 @@ pub struct EnabledHandler { outbound_substream_establishing: bool, /// The number of outbound substreams we have requested. - outbound_substreams_requested: usize, + outbound_substream_attempts: usize, /// The number of inbound substreams that have been created by the peer. - inbound_substreams_created: usize, + inbound_substream_attempts: usize, /// The type of peer this handler is associated to. peer_kind: Option, @@ -134,9 +134,9 @@ pub enum DisabledHandler { /// Keeps track on whether we have sent the peer kind to the behaviour. peer_kind_sent: bool, }, - /// The maximum number of inbound or outbound streams have been created and thereby the handler - /// has been disabled. - MaxStreamCreation, + /// The maximum number of inbound or outbound substream attempts have happened and thereby the + /// handler has been disabled. + MaxSubstreamAttempts, } /// State of the inbound substream, opened either by us or by the remote. @@ -169,8 +169,8 @@ impl Handler { inbound_substream: None, outbound_substream: None, outbound_substream_establishing: false, - outbound_substreams_requested: 0, - inbound_substreams_created: 0, + outbound_substream_attempts: 0, + inbound_substream_attempts: 0, send_queue: SmallVec::new(), peer_kind: None, peer_kind_sent: false, @@ -186,8 +186,6 @@ impl EnabledHandler { &mut self, (substream, peer_kind): (Framed, PeerKind), ) { - self.inbound_substreams_created += 1; - // update the known kind of peer if self.peer_kind.is_none() { self.peer_kind = Some(peer_kind); @@ -244,7 +242,6 @@ impl EnabledHandler { && self.outbound_substream.is_none() && !self.outbound_substream_establishing { - self.outbound_substreams_requested += 1; self.outbound_substream_establishing = true; return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(self.listen_protocol.clone(), ()), @@ -475,7 +472,7 @@ impl ConnectionHandler for Handler { Poll::Pending } - Handler::Disabled(DisabledHandler::MaxStreamCreation) => Poll::Pending, + Handler::Disabled(DisabledHandler::MaxSubstreamAttempts) => Poll::Pending, } } @@ -490,28 +487,30 @@ impl ConnectionHandler for Handler { ) { match self { Handler::Enabled(handler) => { + if event.is_inbound() { + handler.inbound_substream_attempts += 1; + + if handler.inbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS { + log::warn!( + "The maximum number of inbound substreams attempts has been exceeded" + ); + *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts); + return; + } + } + if event.is_outbound() { handler.outbound_substream_establishing = false; - } - if event.is_inbound() - && handler.inbound_substreams_created == MAX_SUBSTREAM_CREATION - { - log::warn!( - "The maximum number of inbound substreams created has been exceeded" - ); - *self = Handler::Disabled(DisabledHandler::MaxStreamCreation); - return; - } + handler.outbound_substream_attempts += 1; - if event.is_outbound() - && handler.outbound_substreams_requested == MAX_SUBSTREAM_CREATION - { - log::warn!( - "The maximum number of outbound substreams created has been exceeded" - ); - *self = Handler::Disabled(DisabledHandler::MaxStreamCreation); - return; + if handler.outbound_substream_attempts == MAX_SUBSTREAM_ATTEMPTS { + log::warn!( + "The maximum number of outbound substream attempts has been exceeded" + ); + *self = Handler::Disabled(DisabledHandler::MaxSubstreamAttempts); + return; + } } match event { From b42e71e16df08ae2abbc99aaf9844d0b93a2e3ff Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 6 Apr 2023 15:34:18 +0200 Subject: [PATCH 35/35] Update swarm/CHANGELOG.md Co-authored-by: Thomas Eizinger --- swarm/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index eff4fac0f02..03d128d76b7 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,6 +1,6 @@ ## 0.42.2 - unreleased -- Add `ConnectionEvent::` `is_outbound` and `is_inbound`. See [PR 3625]. +- Add `ConnectionEvent::{is_outbound,is_inbound}`. See [PR 3625]. [PR 3625]: https://github.com/libp2p/rust-libp2p/pull/3625