From 79ce63ffdaf852cb6235a8ab141357af30cd94aa Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 14:42:18 +0200 Subject: [PATCH 01/43] Provide separate functions for injecting in- and outbound streams --- swarm/src/connection.rs | 13 +-- swarm/src/connection/handler_wrapper.rs | 116 ++++++++++++------------ 2 files changed, 58 insertions(+), 71 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 24e54aba525..4ccf825f3bc 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -51,13 +51,6 @@ pub struct Connected { pub peer_id: PeerId, } -/// Endpoint for a received substream. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SubstreamEndpoint { - Dialer(TDialInfo), - Listener, -} - /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub enum Event { @@ -169,8 +162,7 @@ where .open_info .pop_front() .expect("`open_info` is not empty"); - let endpoint = SubstreamEndpoint::Dialer(user_data); - self.handler.inject_substream(substream, endpoint); + self.handler.inject_outbound_substream(substream, user_data); continue; // Go back to the top, handler can potentially make progress again. } } @@ -179,8 +171,7 @@ where match self.muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - self.handler - .inject_substream(substream, SubstreamEndpoint::Listener); + self.handler.inject_inbound_substream(substream); continue; // Go back to the top, handler can potentially make progress again. } } diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs index 03d09b3fbc1..838241d351b 100644 --- a/swarm/src/connection/handler_wrapper.rs +++ b/swarm/src/connection/handler_wrapper.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::SubstreamEndpoint; use crate::handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, }; @@ -246,71 +245,68 @@ impl HandlerWrapper where TConnectionHandler: ConnectionHandler, { - pub fn inject_substream( + pub fn inject_outbound_substream( &mut self, substream: SubstreamBox, // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). - endpoint: SubstreamEndpoint>, + (upgrade_id, user_data, timeout): OutboundOpenInfo, ) { - match endpoint { - SubstreamEndpoint::Listener => { - if self.negotiating_in.len() == self.max_negotiating_inbound_streams { - log::warn!( - "Incoming substream from {} exceeding maximum number \ - of negotiating inbound streams {} on connection. \ - Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", - self.remote_peer_id, - self.max_negotiating_inbound_streams, - ); - return; - } - - let protocol = self.handler.listen_protocol(); - let timeout = *protocol.timeout(); - let (upgrade, user_data) = protocol.into_upgrade(); - let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); - let timeout = Delay::new(timeout); - self.negotiating_in.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); + let pos = match self + .queued_dial_upgrades + .iter() + .position(|(id, _)| id == &upgrade_id) + { + Some(p) => p, + None => { + debug_assert!(false, "Received an upgrade with an invalid upgrade ID"); + return; } - SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => { - let pos = match self - .queued_dial_upgrades - .iter() - .position(|(id, _)| id == &upgrade_id) - { - Some(p) => p, - None => { - debug_assert!(false, "Received an upgrade with an invalid upgrade ID"); - return; - } - }; - - let (_, upgrade) = self.queued_dial_upgrades.remove(pos); - let mut version = upgrade::Version::default(); - if let Some(v) = self.substream_upgrade_protocol_override { - if v != version { - log::debug!( - "Substream upgrade protocol override: {:?} -> {:?}", - version, - v - ); - version = v; - } - } - let upgrade = upgrade::apply_outbound(substream, upgrade, version); - let timeout = Delay::new(timeout); - self.negotiating_out.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); + }; + + let (_, upgrade) = self.queued_dial_upgrades.remove(pos); + let mut version = upgrade::Version::default(); + if let Some(v) = self.substream_upgrade_protocol_override { + if v != version { + log::debug!( + "Substream upgrade protocol override: {:?} -> {:?}", + version, + v + ); + version = v; } } + let upgrade = upgrade::apply_outbound(substream, upgrade, version); + let timeout = Delay::new(timeout); + self.negotiating_out.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout, + upgrade, + }); + } + + pub fn inject_inbound_substream(&mut self, substream: SubstreamBox) { + if self.negotiating_in.len() == self.max_negotiating_inbound_streams { + log::warn!( + "Incoming substream from {} exceeding maximum number \ + of negotiating inbound streams {} on connection. \ + Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", + self.remote_peer_id, + self.max_negotiating_inbound_streams, + ); + return; + } + + let protocol = self.handler.listen_protocol(); + let timeout = *protocol.timeout(); + let (upgrade, user_data) = protocol.into_upgrade(); + let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); + let timeout = Delay::new(timeout); + self.negotiating_in.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout, + upgrade, + }); } pub fn inject_event(&mut self, event: TConnectionHandler::InEvent) { @@ -467,7 +463,7 @@ mod tests { for _ in 0..max_negotiating_inbound_streams { let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); - wrapper.inject_substream(substream, SubstreamEndpoint::Listener); + wrapper.inject_inbound_substream(substream); } assert_eq!( @@ -477,7 +473,7 @@ mod tests { ); let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); - wrapper.inject_substream(substream, SubstreamEndpoint::Listener); + wrapper.inject_inbound_substream(substream); assert_eq!( Arc::strong_count(&alive_substreams_counter), From b829642a7f0d8cfd84b3df3e2b8ad324581c2ac8 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 14:53:19 +0200 Subject: [PATCH 02/43] Inline `HandlerWrapper` into `Connection` --- swarm/src/connection.rs | 309 ++++++++++++-- swarm/src/connection/error.rs | 10 - swarm/src/connection/handler_wrapper.rs | 517 ------------------------ 3 files changed, 275 insertions(+), 561 deletions(-) delete mode 100644 swarm/src/connection/handler_wrapper.rs diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 4ccf825f3bc..04c997cf895 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. mod error; -mod handler_wrapper; pub(crate) mod pool; @@ -31,16 +30,23 @@ pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; -use crate::IntoConnectionHandler; -use handler_wrapper::HandlerWrapper; +use crate::upgrade::SendWrapper; +use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive}; +use futures::stream::FuturesUnordered; +use futures::FutureExt; +use futures::StreamExt; +use futures_timer::Delay; +use instant::Instant; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt}; -use libp2p_core::upgrade; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox}; +use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply}; use libp2p_core::PeerId; +use libp2p_core::{upgrade, UpgradeError}; use std::collections::VecDeque; use std::future::Future; -use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; +use std::time::Duration; +use std::{fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -67,10 +73,48 @@ where { /// Node that handles the muxing. muxing: StreamMuxerBox, - /// Handler that processes substreams. - handler: HandlerWrapper, - /// List of "open_info" that is waiting for new outbound substreams. - open_info: VecDeque>, + + remote_peer_id: PeerId, + /// The underlying handler. + handler: THandler, + /// Futures that upgrade incoming substreams. + negotiating_in: FuturesUnordered< + SubstreamUpgrade< + THandler::InboundOpenInfo, + InboundUpgradeApply>, + >, + >, + /// Futures that upgrade outgoing substreams. + negotiating_out: FuturesUnordered< + SubstreamUpgrade< + THandler::OutboundOpenInfo, + OutboundUpgradeApply>, + >, + >, + /// The currently planned connection & handler shutdown. + shutdown: Shutdown, + /// The substream upgrade protocol override, if any. + substream_upgrade_protocol_override: Option, + /// The maximum number of inbound streams concurrently negotiating on a + /// connection. New inbound streams exceeding the limit are dropped and thus + /// reset. + /// + /// Note: This only enforces a limit on the number of concurrently + /// negotiating inbound streams. The total number of inbound streams on a + /// connection is the sum of negotiating and negotiated streams. A limit on + /// the total number of streams can be enforced at the + /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. + max_negotiating_inbound_streams: usize, + /// Unique identifier assigned to each queued dial upgrade. + unique_dial_upgrade_id: u64, + /// For each outbound substream request, how to upgrade it. The first element of the tuple + /// is the unique identifier (see `unique_dial_upgrade_id`). + pending_dial_upgrades: VecDeque<( + u64, + ::OutboundOpenInfo, + Duration, + ::OutboundProtocol, + )>, } impl fmt::Debug for Connection @@ -81,7 +125,6 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") .field("handler", &self.handler) - .field("open_info", &self.open_info) .finish() } } @@ -102,17 +145,17 @@ where substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, ) -> Self { - let wrapped_handler = HandlerWrapper::new( - peer_id, - endpoint, - handler, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - ); Connection { muxing: muxer, - handler: wrapped_handler, - open_info: VecDeque::with_capacity(8), + remote_peer_id: peer_id, + handler: handler.into_handler(&peer_id, &endpoint), + negotiating_in: Default::default(), + negotiating_out: Default::default(), + unique_dial_upgrade_id: 0, + shutdown: Shutdown::None, + substream_upgrade_protocol_override, + max_negotiating_inbound_streams, + pending_dial_upgrades: VecDeque::with_capacity(8), } } @@ -124,7 +167,63 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. pub fn close(self) -> (THandler, impl Future>) { - (self.handler.into_connection_handler(), self.muxing.close()) + (self.handler, self.muxing.close()) + } + + fn inject_outbound_substream( + &mut self, + substream: SubstreamBox, + // The first element of the tuple is the unique upgrade identifier + // (see `unique_dial_upgrade_id`). + (_, user_data, timeout, upgrade): ( + u64, + ::OutboundOpenInfo, + Duration, + ::OutboundProtocol, + ), + ) { + let mut version = upgrade::Version::default(); + if let Some(v) = self.substream_upgrade_protocol_override { + if v != version { + log::debug!( + "Substream upgrade protocol override: {:?} -> {:?}", + version, + v + ); + version = v; + } + } + let upgrade = upgrade::apply_outbound(substream, SendWrapper(upgrade), version); + let timeout = Delay::new(timeout); + self.negotiating_out.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout, + upgrade, + }); + } + + fn inject_inbound_substream(&mut self, substream: SubstreamBox) { + if self.negotiating_in.len() == self.max_negotiating_inbound_streams { + log::warn!( + "Incoming substream from {} exceeding maximum number \ + of negotiating inbound streams {} on connection. \ + Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", + self.remote_peer_id, + self.max_negotiating_inbound_streams, + ); + return; + } + + let protocol = self.handler.listen_protocol(); + let timeout = *protocol.timeout(); + let (upgrade, user_data) = protocol.into_upgrade(); + let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); + let timeout = Delay::new(timeout); + self.negotiating_in.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout, + upgrade, + }); } /// Polls the handler and the substream, forwarding events from the former to the latter and @@ -134,15 +233,89 @@ where cx: &mut Context<'_>, ) -> Poll, ConnectionError>> { loop { - // Poll the handler for new events. - match self.handler.poll(cx)? { - Poll::Pending => {} - Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => { - self.open_info.push_back(user_data); - continue; // Poll handler until exhausted. + // Poll the [`ConnectionHandler`]. + if let Poll::Ready(handler_event) = self.handler.poll(cx) { + match handler_event { + ConnectionHandlerEvent::Custom(event) => { + return Poll::Ready(Ok(Event::Handler(event))) + } + ConnectionHandlerEvent::Close(err) => { + return Poll::Ready(Err(ConnectionError::Handler(err))) + } + ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { + let id = self.unique_dial_upgrade_id; + let timeout = *protocol.timeout(); + self.unique_dial_upgrade_id += 1; + let (upgrade, info) = protocol.into_upgrade(); + + self.pending_dial_upgrades + .push_back((id, info, timeout, upgrade)); + + continue; + } + } + } + + // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. + if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self + .handler + .inject_fully_negotiated_outbound(upgrade, user_data), + Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), + } + + // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. + continue; + } + + // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not + // make any more progress, poll the negotiating inbound streams. + if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { + match res { + Ok(upgrade) => self + .handler + .inject_fully_negotiated_inbound(upgrade, user_data), + Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), } - Poll::Ready(handler_wrapper::Event::Custom(event)) => { - return Poll::Ready(Ok(Event::Handler(event))); + + // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. + continue; + } + + // Ask the handler whether it wants the connection (and the handler itself) + // to be kept alive, which determines the planned shutdown, if any. + let keep_alive = self.handler.connection_keep_alive(); + match (&mut self.shutdown, keep_alive) { + (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { + if *deadline != t { + *deadline = t; + if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + timer.reset(dur) + } + } + } + (_, KeepAlive::Until(t)) => { + if let Some(dur) = t.checked_duration_since(Instant::now()) { + self.shutdown = Shutdown::Later(Delay::new(dur), t) + } + } + (_, KeepAlive::No) => self.shutdown = Shutdown::Asap, + (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, + }; + + // Check if the connection (and handler) should be shut down. + // As long as we're still negotiating substreams, shutdown is always postponed. + if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { + match self.shutdown { + Shutdown::None => {} + Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), + Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { + Poll::Ready(_) => { + return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + } + Poll::Pending => {} + }, } } @@ -154,15 +327,15 @@ where } } - if !self.open_info.is_empty() { + if !self.pending_dial_upgrades.is_empty() { match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { let user_data = self - .open_info + .pending_dial_upgrades .pop_front() .expect("`open_info` is not empty"); - self.handler.inject_outbound_substream(substream, user_data); + self.inject_outbound_substream(substream, user_data); continue; // Go back to the top, handler can potentially make progress again. } } @@ -171,7 +344,7 @@ where match self.muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - self.handler.inject_inbound_substream(substream); + self.inject_inbound_substream(substream); continue; // Go back to the top, handler can potentially make progress again. } } @@ -216,4 +389,72 @@ impl fmt::Display for ConnectionLimit { } /// A `ConnectionLimit` can represent an error if it has been exceeded. -impl Error for ConnectionLimit {} +impl std::error::Error for ConnectionLimit {} + +struct SubstreamUpgrade { + user_data: Option, + timeout: Delay, + upgrade: Upgrade, +} + +impl Unpin for SubstreamUpgrade {} + +impl Future for SubstreamUpgrade +where + Upgrade: Future>> + Unpin, +{ + type Output = ( + UserData, + Result>, + ); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.timeout.poll_unpin(cx) { + Poll::Ready(()) => { + return Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Err(ConnectionHandlerUpgrErr::Timeout), + )) + } + + Poll::Pending => {} + } + + match self.upgrade.poll_unpin(cx) { + Poll::Ready(Ok(upgrade)) => Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Ok(upgrade), + )), + Poll::Ready(Err(err)) => Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Err(ConnectionHandlerUpgrErr::Upgrade(err)), + )), + Poll::Pending => Poll::Pending, + } + } +} + +/// The options for a planned connection & handler shutdown. +/// +/// A shutdown is planned anew based on the the return value of +/// [`ConnectionHandler::connection_keep_alive`] of the underlying handler +/// after every invocation of [`ConnectionHandler::poll`]. +/// +/// A planned shutdown is always postponed for as long as there are ingoing +/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" +/// shutdown. +#[derive(Debug)] +enum Shutdown { + /// No shutdown is planned. + None, + /// A shut down is planned as soon as possible. + Asap, + /// A shut down is planned for when a `Delay` has elapsed. + Later(Delay, Instant), +} diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 8a6d6bbbf00..db51ebca874 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::handler_wrapper; use crate::transport::TransportError; use crate::Multiaddr; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; @@ -66,15 +65,6 @@ where } } -impl From> for ConnectionError { - fn from(error: handler_wrapper::Error) -> Self { - match error { - handler_wrapper::Error::Handler(e) => Self::Handler(e), - handler_wrapper::Error::KeepAliveTimeout => Self::KeepAliveTimeout, - } - } -} - impl From for ConnectionError { fn from(error: io::Error) -> Self { ConnectionError::IO(error) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs deleted file mode 100644 index 838241d351b..00000000000 --- a/swarm/src/connection/handler_wrapper.rs +++ /dev/null @@ -1,517 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, -}; -use crate::upgrade::SendWrapper; -use crate::IntoConnectionHandler; - -use futures::prelude::*; -use futures::stream::FuturesUnordered; -use futures_timer::Delay; -use instant::Instant; -use libp2p_core::{ - muxing::SubstreamBox, - upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, - Multiaddr, -}; -use libp2p_core::{ConnectedPoint, PeerId}; -use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; - -/// A wrapper for an underlying [`ConnectionHandler`]. -/// -/// It extends [`ConnectionHandler`] with: -/// - Enforced substream upgrade timeouts -/// - Driving substream upgrades -/// - Handling connection timeout -// TODO: add a caching system for protocols that are supported or not -pub struct HandlerWrapper -where - TConnectionHandler: ConnectionHandler, -{ - remote_peer_id: PeerId, - /// The underlying handler. - handler: TConnectionHandler, - /// Futures that upgrade incoming substreams. - negotiating_in: FuturesUnordered< - SubstreamUpgrade< - TConnectionHandler::InboundOpenInfo, - InboundUpgradeApply>, - >, - >, - /// Futures that upgrade outgoing substreams. - negotiating_out: FuturesUnordered< - SubstreamUpgrade< - TConnectionHandler::OutboundOpenInfo, - OutboundUpgradeApply>, - >, - >, - /// For each outbound substream request, how to upgrade it. The first element of the tuple - /// is the unique identifier (see `unique_dial_upgrade_id`). - queued_dial_upgrades: Vec<(u64, SendWrapper)>, - /// Unique identifier assigned to each queued dial upgrade. - unique_dial_upgrade_id: u64, - /// The currently planned connection & handler shutdown. - shutdown: Shutdown, - /// The substream upgrade protocol override, if any. - substream_upgrade_protocol_override: Option, - /// The maximum number of inbound streams concurrently negotiating on a - /// connection. New inbound streams exceeding the limit are dropped and thus - /// reset. - /// - /// Note: This only enforces a limit on the number of concurrently - /// negotiating inbound streams. The total number of inbound streams on a - /// connection is the sum of negotiating and negotiated streams. A limit on - /// the total number of streams can be enforced at the - /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. - max_negotiating_inbound_streams: usize, -} - -impl std::fmt::Debug for HandlerWrapper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("HandlerWrapper") - .field("negotiating_in", &self.negotiating_in) - .field("negotiating_out", &self.negotiating_out) - .field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id) - .field("shutdown", &self.shutdown) - .field( - "substream_upgrade_protocol_override", - &self.substream_upgrade_protocol_override, - ) - .finish() - } -} - -impl HandlerWrapper { - pub(crate) fn new( - remote_peer_id: PeerId, - endpoint: ConnectedPoint, - handler: impl IntoConnectionHandler, - substream_upgrade_protocol_override: Option, - max_negotiating_inbound_streams: usize, - ) -> Self { - Self { - remote_peer_id, - handler: handler.into_handler(&remote_peer_id, &endpoint), - negotiating_in: Default::default(), - negotiating_out: Default::default(), - queued_dial_upgrades: Vec::new(), - unique_dial_upgrade_id: 0, - shutdown: Shutdown::None, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - } - } - - pub(crate) fn into_connection_handler(self) -> TConnectionHandler { - self.handler - } -} - -struct SubstreamUpgrade { - user_data: Option, - timeout: Delay, - upgrade: Upgrade, -} - -impl Unpin for SubstreamUpgrade {} - -impl Future for SubstreamUpgrade -where - Upgrade: Future>> + Unpin, -{ - type Output = ( - UserData, - Result>, - ); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match self.timeout.poll_unpin(cx) { - Poll::Ready(()) => { - return Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Timeout), - )) - } - - Poll::Pending => {} - } - - match self.upgrade.poll_unpin(cx) { - Poll::Ready(Ok(upgrade)) => Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Ok(upgrade), - )), - Poll::Ready(Err(err)) => Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Upgrade(err)), - )), - Poll::Pending => Poll::Pending, - } - } -} - -/// The options for a planned connection & handler shutdown. -/// -/// A shutdown is planned anew based on the the return value of -/// [`ConnectionHandler::connection_keep_alive`] of the underlying handler -/// after every invocation of [`ConnectionHandler::poll`]. -/// -/// A planned shutdown is always postponed for as long as there are ingoing -/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" -/// shutdown. -#[derive(Debug)] -enum Shutdown { - /// No shutdown is planned. - None, - /// A shut down is planned as soon as possible. - Asap, - /// A shut down is planned for when a `Delay` has elapsed. - Later(Delay, Instant), -} - -/// Error generated by the [`HandlerWrapper`]. -#[derive(Debug)] -pub enum Error { - /// The connection handler encountered an error. - Handler(TErr), - /// The connection keep-alive timeout expired. - KeepAliveTimeout, -} - -impl From for Error { - fn from(err: TErr) -> Error { - Error::Handler(err) - } -} - -impl fmt::Display for Error -where - TErr: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Error::Handler(err) => write!(f, "{}", err), - Error::KeepAliveTimeout => { - write!(f, "Connection closed due to expired keep-alive timeout.") - } - } - } -} - -impl error::Error for Error -where - TErr: error::Error + 'static, -{ - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - Error::Handler(err) => Some(err), - Error::KeepAliveTimeout => None, - } - } -} - -pub type OutboundOpenInfo = ( - u64, - ::OutboundOpenInfo, - Duration, -); - -impl HandlerWrapper -where - TConnectionHandler: ConnectionHandler, -{ - pub fn inject_outbound_substream( - &mut self, - substream: SubstreamBox, - // The first element of the tuple is the unique upgrade identifier - // (see `unique_dial_upgrade_id`). - (upgrade_id, user_data, timeout): OutboundOpenInfo, - ) { - let pos = match self - .queued_dial_upgrades - .iter() - .position(|(id, _)| id == &upgrade_id) - { - Some(p) => p, - None => { - debug_assert!(false, "Received an upgrade with an invalid upgrade ID"); - return; - } - }; - - let (_, upgrade) = self.queued_dial_upgrades.remove(pos); - let mut version = upgrade::Version::default(); - if let Some(v) = self.substream_upgrade_protocol_override { - if v != version { - log::debug!( - "Substream upgrade protocol override: {:?} -> {:?}", - version, - v - ); - version = v; - } - } - let upgrade = upgrade::apply_outbound(substream, upgrade, version); - let timeout = Delay::new(timeout); - self.negotiating_out.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); - } - - pub fn inject_inbound_substream(&mut self, substream: SubstreamBox) { - if self.negotiating_in.len() == self.max_negotiating_inbound_streams { - log::warn!( - "Incoming substream from {} exceeding maximum number \ - of negotiating inbound streams {} on connection. \ - Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", - self.remote_peer_id, - self.max_negotiating_inbound_streams, - ); - return; - } - - let protocol = self.handler.listen_protocol(); - let timeout = *protocol.timeout(); - let (upgrade, user_data) = protocol.into_upgrade(); - let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); - let timeout = Delay::new(timeout); - self.negotiating_in.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); - } - - pub fn inject_event(&mut self, event: TConnectionHandler::InEvent) { - self.handler.inject_event(event); - } - - pub fn inject_address_change(&mut self, new_address: &Multiaddr) { - self.handler.inject_address_change(new_address); - } - - fn handle_connection_handler_event( - &mut self, - handler_event: ConnectionHandlerEvent< - TConnectionHandler::OutboundProtocol, - TConnectionHandler::OutboundOpenInfo, - TConnectionHandler::OutEvent, - TConnectionHandler::Error, - >, - ) -> Result< - Event, TConnectionHandler::OutEvent>, - Error, - > { - match handler_event { - ConnectionHandlerEvent::Custom(event) => Ok(Event::Custom(event)), - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - let id = self.unique_dial_upgrade_id; - let timeout = *protocol.timeout(); - self.unique_dial_upgrade_id += 1; - let (upgrade, info) = protocol.into_upgrade(); - self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - Ok(Event::OutboundSubstreamRequest((id, info, timeout))) - } - ConnectionHandlerEvent::Close(err) => Err(err.into()), - } - } - - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - Result< - Event, TConnectionHandler::OutEvent>, - Error, - >, - > { - loop { - // Poll the [`ConnectionHandler`]. - if let Poll::Ready(handler_event) = self.handler.poll(cx) { - let wrapper_event = self.handle_connection_handler_event(handler_event)?; - return Poll::Ready(Ok(wrapper_event)); - } - - // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. - if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_outbound(upgrade, user_data), - Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), - } - - // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. - continue; - } - - // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not - // make any more progress, poll the negotiating inbound streams. - if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_inbound(upgrade, user_data), - Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), - } - - // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. - continue; - } - - // None of the three can make any more progress, thus breaking the loop. - break; - } - - // Ask the handler whether it wants the connection (and the handler itself) - // to be kept alive, which determines the planned shutdown, if any. - match (&mut self.shutdown, self.handler.connection_keep_alive()) { - (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { - if *deadline != t { - *deadline = t; - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - timer.reset(dur) - } - } - } - (_, KeepAlive::Until(t)) => { - if let Some(dur) = t.checked_duration_since(Instant::now()) { - self.shutdown = Shutdown::Later(Delay::new(dur), t) - } - } - (_, KeepAlive::No) => self.shutdown = Shutdown::Asap, - (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, - }; - - // Check if the connection (and handler) should be shut down. - // As long as we're still negotiating substreams, shutdown is always postponed. - if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { - match self.shutdown { - Shutdown::None => {} - Shutdown::Asap => return Poll::Ready(Err(Error::KeepAliveTimeout)), - Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { - Poll::Ready(_) => return Poll::Ready(Err(Error::KeepAliveTimeout)), - Poll::Pending => {} - }, - } - } - - Poll::Pending - } -} - -/// Event produced by a [`HandlerWrapper`]. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum Event { - /// Require a new outbound substream to be opened with the remote. - OutboundSubstreamRequest(TOutboundOpenInfo), - - /// Other event. - Custom(TCustom), -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::handler::PendingConnectionHandler; - use quickcheck::*; - use std::sync::Arc; - - #[test] - fn max_negotiating_inbound_streams() { - fn prop(max_negotiating_inbound_streams: u8) { - let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); - let mut wrapper = HandlerWrapper::new( - PeerId::random(), - ConnectedPoint::Listener { - local_addr: Multiaddr::empty(), - send_back_addr: Multiaddr::empty(), - }, - PendingConnectionHandler::new("test".to_string()), - None, - max_negotiating_inbound_streams, - ); - let alive_substreams_counter = Arc::new(()); - - for _ in 0..max_negotiating_inbound_streams { - let substream = - SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); - wrapper.inject_inbound_substream(substream); - } - - assert_eq!( - Arc::strong_count(&alive_substreams_counter), - max_negotiating_inbound_streams + 1, - "Expect none of the substreams up to the limit to be dropped." - ); - - let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); - wrapper.inject_inbound_substream(substream); - - assert_eq!( - Arc::strong_count(&alive_substreams_counter), - max_negotiating_inbound_streams + 1, - "Expect substream exceeding the limit to be dropped." - ); - } - - QuickCheck::new().quickcheck(prop as fn(_)); - } - - struct PendingSubstream(Arc<()>); - - impl AsyncRead for PendingSubstream { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - _buf: &mut [u8], - ) -> Poll> { - Poll::Pending - } - } - - impl AsyncWrite for PendingSubstream { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - _buf: &[u8], - ) -> Poll> { - Poll::Pending - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Pending - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Pending - } - } -} From 7182fd6057848c502bb72eea4a330b09783b4fc6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 14:54:32 +0200 Subject: [PATCH 03/43] Remove unique_dial_upgrade_id This is no longer needed because we process the substreams as they come in. --- swarm/src/connection.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 04c997cf895..68432fc6f36 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -105,12 +105,9 @@ where /// the total number of streams can be enforced at the /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, - /// Unique identifier assigned to each queued dial upgrade. - unique_dial_upgrade_id: u64, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). pending_dial_upgrades: VecDeque<( - u64, ::OutboundOpenInfo, Duration, ::OutboundProtocol, @@ -151,7 +148,6 @@ where handler: handler.into_handler(&peer_id, &endpoint), negotiating_in: Default::default(), negotiating_out: Default::default(), - unique_dial_upgrade_id: 0, shutdown: Shutdown::None, substream_upgrade_protocol_override, max_negotiating_inbound_streams, @@ -173,10 +169,7 @@ where fn inject_outbound_substream( &mut self, substream: SubstreamBox, - // The first element of the tuple is the unique upgrade identifier - // (see `unique_dial_upgrade_id`). - (_, user_data, timeout, upgrade): ( - u64, + (user_data, timeout, upgrade): ( ::OutboundOpenInfo, Duration, ::OutboundProtocol, @@ -243,13 +236,11 @@ where return Poll::Ready(Err(ConnectionError::Handler(err))) } ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - let id = self.unique_dial_upgrade_id; let timeout = *protocol.timeout(); - self.unique_dial_upgrade_id += 1; let (upgrade, info) = protocol.into_upgrade(); self.pending_dial_upgrades - .push_back((id, info, timeout, upgrade)); + .push_back((info, timeout, upgrade)); continue; } From 7190a799b4288e776450ad0f19f358f547ec5a69 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:00:04 +0200 Subject: [PATCH 04/43] Don't use tuples when we can use parameters --- swarm/src/connection.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 68432fc6f36..135941f0938 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -169,11 +169,9 @@ where fn inject_outbound_substream( &mut self, substream: SubstreamBox, - (user_data, timeout, upgrade): ( - ::OutboundOpenInfo, - Duration, - ::OutboundProtocol, - ), + user_data: ::OutboundOpenInfo, + timeout: Duration, + upgrade: ::OutboundProtocol, ) { let mut version = upgrade::Version::default(); if let Some(v) = self.substream_upgrade_protocol_override { @@ -322,11 +320,11 @@ where match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let user_data = self + let (open_info, timeout, upgrade) = self .pending_dial_upgrades .pop_front() .expect("`open_info` is not empty"); - self.inject_outbound_substream(substream, user_data); + self.inject_outbound_substream(substream, open_info, timeout, upgrade); continue; // Go back to the top, handler can potentially make progress again. } } From f689617e48d0d8e6843dfbbea7caed1176367814 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:02:34 +0200 Subject: [PATCH 05/43] Delay de-construction of SubstreamUpgrade --- swarm/src/connection.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 135941f0938..f21b200b299 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -31,7 +31,10 @@ pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; use crate::upgrade::SendWrapper; -use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive}; +use crate::{ + ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive, + SubstreamProtocol, +}; use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; @@ -107,11 +110,8 @@ where max_negotiating_inbound_streams: usize, /// For each outbound substream request, how to upgrade it. The first element of the tuple /// is the unique identifier (see `unique_dial_upgrade_id`). - pending_dial_upgrades: VecDeque<( - ::OutboundOpenInfo, - Duration, - ::OutboundProtocol, - )>, + pending_dial_upgrades: + VecDeque>, } impl fmt::Debug for Connection @@ -234,11 +234,7 @@ where return Poll::Ready(Err(ConnectionError::Handler(err))) } ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - let timeout = *protocol.timeout(); - let (upgrade, info) = protocol.into_upgrade(); - - self.pending_dial_upgrades - .push_back((info, timeout, upgrade)); + self.pending_dial_upgrades.push_back(protocol); continue; } @@ -320,10 +316,13 @@ where match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let (open_info, timeout, upgrade) = self + let substream_upgrade = self .pending_dial_upgrades .pop_front() .expect("`open_info` is not empty"); + let timeout = *substream_upgrade.timeout(); + let (upgrade, open_info) = substream_upgrade.into_upgrade(); + self.inject_outbound_substream(substream, open_info, timeout, upgrade); continue; // Go back to the top, handler can potentially make progress again. } From 4d9fa5b238ef8aff3409521542e141a9e46b76a2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:05:16 +0200 Subject: [PATCH 06/43] Only poll for new inbound streams if we are below the limit --- swarm/src/connection.rs | 25 +++++++------------------ 1 file changed, 7 insertions(+), 18 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f21b200b299..44602b2aea6 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -77,7 +77,6 @@ where /// Node that handles the muxing. muxing: StreamMuxerBox, - remote_peer_id: PeerId, /// The underlying handler. handler: THandler, /// Futures that upgrade incoming substreams. @@ -144,7 +143,6 @@ where ) -> Self { Connection { muxing: muxer, - remote_peer_id: peer_id, handler: handler.into_handler(&peer_id, &endpoint), negotiating_in: Default::default(), negotiating_out: Default::default(), @@ -194,17 +192,6 @@ where } fn inject_inbound_substream(&mut self, substream: SubstreamBox) { - if self.negotiating_in.len() == self.max_negotiating_inbound_streams { - log::warn!( - "Incoming substream from {} exceeding maximum number \ - of negotiating inbound streams {} on connection. \ - Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", - self.remote_peer_id, - self.max_negotiating_inbound_streams, - ); - return; - } - let protocol = self.handler.listen_protocol(); let timeout = *protocol.timeout(); let (upgrade, user_data) = protocol.into_upgrade(); @@ -329,11 +316,13 @@ where } } - match self.muxing.poll_inbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { - self.inject_inbound_substream(substream); - continue; // Go back to the top, handler can potentially make progress again. + if self.negotiating_in.len() < self.max_negotiating_inbound_streams { + match self.muxing.poll_inbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + self.inject_inbound_substream(substream); + continue; // Go back to the top, handler can potentially make progress again. + } } } From 1af9ba316bb22b36c67c3f45c3a9c72eec2275b6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:08:06 +0200 Subject: [PATCH 07/43] Inline `inject_substream` functions --- swarm/src/connection.rs | 71 ++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 43 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 44602b2aea6..7e60653775a 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -48,7 +48,6 @@ use libp2p_core::PeerId; use libp2p_core::{upgrade, UpgradeError}; use std::collections::VecDeque; use std::future::Future; -use std::time::Duration; use std::{fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. @@ -164,46 +163,6 @@ where (self.handler, self.muxing.close()) } - fn inject_outbound_substream( - &mut self, - substream: SubstreamBox, - user_data: ::OutboundOpenInfo, - timeout: Duration, - upgrade: ::OutboundProtocol, - ) { - let mut version = upgrade::Version::default(); - if let Some(v) = self.substream_upgrade_protocol_override { - if v != version { - log::debug!( - "Substream upgrade protocol override: {:?} -> {:?}", - version, - v - ); - version = v; - } - } - let upgrade = upgrade::apply_outbound(substream, SendWrapper(upgrade), version); - let timeout = Delay::new(timeout); - self.negotiating_out.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); - } - - fn inject_inbound_substream(&mut self, substream: SubstreamBox) { - let protocol = self.handler.listen_protocol(); - let timeout = *protocol.timeout(); - let (upgrade, user_data) = protocol.into_upgrade(); - let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); - let timeout = Delay::new(timeout); - self.negotiating_in.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); - } - /// Polls the handler and the substream, forwarding events from the former to the latter and /// vice versa. pub fn poll( @@ -307,10 +266,27 @@ where .pending_dial_upgrades .pop_front() .expect("`open_info` is not empty"); + let timeout = *substream_upgrade.timeout(); let (upgrade, open_info) = substream_upgrade.into_upgrade(); - self.inject_outbound_substream(substream, open_info, timeout, upgrade); + let mut version = upgrade::Version::default(); + if let Some(v) = self.substream_upgrade_protocol_override { + if v != version { + log::debug!( + "Substream upgrade protocol override: {:?} -> {:?}", + version, + v + ); + version = v; + } + } + + self.negotiating_out.push(SubstreamUpgrade { + user_data: Some(open_info), + timeout: Delay::new(timeout), + upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), version), + }); continue; // Go back to the top, handler can potentially make progress again. } } @@ -320,7 +296,16 @@ where match self.muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - self.inject_inbound_substream(substream); + let protocol = self.handler.listen_protocol(); + + let timeout = *protocol.timeout(); + let (upgrade, user_data) = protocol.into_upgrade(); + + self.negotiating_in.push(SubstreamUpgrade { + user_data: Some(user_data), + timeout: Delay::new(timeout), + upgrade: upgrade::apply_inbound(substream, SendWrapper(upgrade)), + }); continue; // Go back to the top, handler can potentially make progress again. } } From 27274c8d3f60c6274c76e6f49b2341421e012fcf Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:20:36 +0200 Subject: [PATCH 08/43] Introduce ctor functions on SubstreamUpgrade to simplify poll-fn --- swarm/src/connection.rs | 85 +++++++++++++++++++++++++++-------------- 1 file changed, 57 insertions(+), 28 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 7e60653775a..5f12d86e7b8 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -30,7 +30,7 @@ pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; -use crate::upgrade::SendWrapper; +use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; use crate::{ ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive, SubstreamProtocol, @@ -267,26 +267,12 @@ where .pop_front() .expect("`open_info` is not empty"); - let timeout = *substream_upgrade.timeout(); - let (upgrade, open_info) = substream_upgrade.into_upgrade(); - - let mut version = upgrade::Version::default(); - if let Some(v) = self.substream_upgrade_protocol_override { - if v != version { - log::debug!( - "Substream upgrade protocol override: {:?} -> {:?}", - version, - v - ); - version = v; - } - } + self.negotiating_out.push(SubstreamUpgrade::new_outbound( + substream, + substream_upgrade, + self.substream_upgrade_protocol_override, + )); - self.negotiating_out.push(SubstreamUpgrade { - user_data: Some(open_info), - timeout: Delay::new(timeout), - upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), version), - }); continue; // Go back to the top, handler can potentially make progress again. } } @@ -296,16 +282,11 @@ where match self.muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let protocol = self.handler.listen_protocol(); + let substream_upgrade = self.handler.listen_protocol(); - let timeout = *protocol.timeout(); - let (upgrade, user_data) = protocol.into_upgrade(); + self.negotiating_in + .push(SubstreamUpgrade::new_inbound(substream, substream_upgrade)); - self.negotiating_in.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout: Delay::new(timeout), - upgrade: upgrade::apply_inbound(substream, SendWrapper(upgrade)), - }); continue; // Go back to the top, handler can potentially make progress again. } } @@ -359,6 +340,54 @@ struct SubstreamUpgrade { upgrade: Upgrade, } +impl + SubstreamUpgrade>> +where + Upgrade: Send + OutboundUpgradeSend, +{ + fn new_outbound( + substream: SubstreamBox, + protocol: SubstreamProtocol, + version_override: Option, + ) -> Self { + let timeout = *protocol.timeout(); + let (upgrade, open_info) = protocol.into_upgrade(); + + let effective_version = match version_override { + Some(version_override) if version_override != upgrade::Version::default() => { + version_override + } + _ => upgrade::Version::default(), + }; + + Self { + user_data: Some(open_info), + timeout: Delay::new(timeout), + upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version), + } + } +} + +impl + SubstreamUpgrade>> +where + Upgrade: Send + InboundUpgradeSend, +{ + fn new_inbound( + substream: SubstreamBox, + protocol: SubstreamProtocol, + ) -> Self { + let timeout = *protocol.timeout(); + let (upgrade, open_info) = protocol.into_upgrade(); + + Self { + user_data: Some(open_info), + timeout: Delay::new(timeout), + upgrade: upgrade::apply_inbound(substream, SendWrapper(upgrade)), + } + } +} + impl Unpin for SubstreamUpgrade {} impl Future for SubstreamUpgrade From 2c3a606da0d441bb16a7d2a147b4ca8019e1301e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:21:20 +0200 Subject: [PATCH 09/43] Remove stale comment --- swarm/src/connection.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 5f12d86e7b8..7a4410d30b6 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -106,8 +106,7 @@ where /// the total number of streams can be enforced at the /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, - /// For each outbound substream request, how to upgrade it. The first element of the tuple - /// is the unique identifier (see `unique_dial_upgrade_id`). + /// For each outbound substream request, how to upgrade it. pending_dial_upgrades: VecDeque>, } From 772b775f0e98a9b491308bb0075e44ca6f600763 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:21:46 +0200 Subject: [PATCH 10/43] Remove empty line --- swarm/src/connection.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 7a4410d30b6..712a3682e55 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -75,7 +75,6 @@ where { /// Node that handles the muxing. muxing: StreamMuxerBox, - /// The underlying handler. handler: THandler, /// Futures that upgrade incoming substreams. From 635913791e0173e8bf8d40231ad4b1641bce0c2c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:22:06 +0200 Subject: [PATCH 11/43] Align naming --- swarm/src/connection.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 712a3682e55..1448c350762 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -260,14 +260,14 @@ where match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let substream_upgrade = self + let protocol = self .pending_dial_upgrades .pop_front() .expect("`open_info` is not empty"); self.negotiating_out.push(SubstreamUpgrade::new_outbound( substream, - substream_upgrade, + protocol, self.substream_upgrade_protocol_override, )); @@ -280,10 +280,10 @@ where match self.muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let substream_upgrade = self.handler.listen_protocol(); + let protocol = self.handler.listen_protocol(); self.negotiating_in - .push(SubstreamUpgrade::new_inbound(substream, substream_upgrade)); + .push(SubstreamUpgrade::new_inbound(substream, protocol)); continue; // Go back to the top, handler can potentially make progress again. } From bb930c4b500687aac5994594d390c6905dd15162 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:23:19 +0200 Subject: [PATCH 12/43] Re-introduce log --- swarm/src/connection.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 1448c350762..ec5502f6d3e 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -353,6 +353,12 @@ where let effective_version = match version_override { Some(version_override) if version_override != upgrade::Version::default() => { + log::debug!( + "Substream upgrade protocol override: {:?} -> {:?}", + upgrade::Version::default(), + version_override + ); + version_override } _ => upgrade::Version::default(), From d8d8165365812679c7451712dcc5bc1d2b848ba1 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:43:18 +0200 Subject: [PATCH 13/43] Bring back the test --- swarm/src/connection.rs | 113 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index ec5502f6d3e..40bbd16309f 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -453,3 +453,116 @@ enum Shutdown { /// A shut down is planned for when a `Delay` has elapsed. Later(Delay, Instant), } + +#[cfg(test)] +mod tests { + use super::*; + use crate::handler::DummyConnectionHandler; + use futures::AsyncRead; + use futures::AsyncWrite; + use libp2p_core::StreamMuxer; + use quickcheck::*; + use std::sync::{Arc, Weak}; + use void::Void; + + #[test] + fn max_negotiating_inbound_streams() { + fn prop(max_negotiating_inbound_streams: u8) { + let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); + + let alive_substream_counter = Arc::new(()); + + let mut connection = Connection::new( + PeerId::random(), + ConnectedPoint::Listener { + local_addr: Multiaddr::empty(), + send_back_addr: Multiaddr::empty(), + }, + StreamMuxerBox::new(DummyStreamMuxer { + counter: alive_substream_counter.clone(), + }), + DummyConnectionHandler { + keep_alive: KeepAlive::Yes, + }, + None, + max_negotiating_inbound_streams, + ); + + let result = Pin::new(&mut connection) + .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + + assert!(result.is_pending()); + assert_eq!( + Arc::weak_count(&alive_substream_counter), + max_negotiating_inbound_streams, + "Expect no more than the maximum number of allowed streams" + ); + } + + QuickCheck::new().quickcheck(prop as fn(_)); + } + + struct DummyStreamMuxer { + counter: Arc<()>, + } + + impl StreamMuxer for DummyStreamMuxer { + type Substream = PendingSubstream; + type Error = Void; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(PendingSubstream(Arc::downgrade(&self.counter)))) + } + + fn poll_outbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + + struct PendingSubstream(Weak<()>); + + impl AsyncRead for PendingSubstream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll> { + Poll::Pending + } + } + + impl AsyncWrite for PendingSubstream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + } +} From a98415f6fa3df55117bc374242d4ece7743e1d8b Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:45:27 +0200 Subject: [PATCH 14/43] Simplify `Connection` constructor --- swarm/src/connection.rs | 16 +++------------- swarm/src/connection/pool.rs | 4 +--- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 40bbd16309f..17b4cebe7b7 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -31,10 +31,7 @@ pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; -use crate::{ - ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, KeepAlive, - SubstreamProtocol, -}; +use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; use futures::stream::FuturesUnordered; use futures::FutureExt; use futures::StreamExt; @@ -131,16 +128,14 @@ where /// Builds a new `Connection` from the given substream multiplexer /// and connection handler. pub fn new( - peer_id: PeerId, - endpoint: ConnectedPoint, muxer: StreamMuxerBox, - handler: impl IntoConnectionHandler, + handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, ) -> Self { Connection { muxing: muxer, - handler: handler.into_handler(&peer_id, &endpoint), + handler, negotiating_in: Default::default(), negotiating_out: Default::default(), shutdown: Shutdown::None, @@ -473,11 +468,6 @@ mod tests { let alive_substream_counter = Arc::new(()); let mut connection = Connection::new( - PeerId::random(), - ConnectedPoint::Listener { - local_addr: Multiaddr::empty(), - send_back_addr: Multiaddr::empty(), - }, StreamMuxerBox::new(DummyStreamMuxer { counter: alive_substream_counter.clone(), }), diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 62e931e9510..e4d652209bc 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -747,10 +747,8 @@ where ); let connection = super::Connection::new( - obtained_peer_id, - endpoint, muxer, - handler, + handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, ); From bbf707086fe10e7d7d0873fa2e0ab997c2bbac88 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 15:49:01 +0200 Subject: [PATCH 15/43] Use `match` to reduce indentation level --- swarm/src/connection.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 17b4cebe7b7..12f53d1adbc 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -164,19 +164,18 @@ where ) -> Poll, ConnectionError>> { loop { // Poll the [`ConnectionHandler`]. - if let Poll::Ready(handler_event) = self.handler.poll(cx) { - match handler_event { - ConnectionHandlerEvent::Custom(event) => { - return Poll::Ready(Ok(Event::Handler(event))) - } - ConnectionHandlerEvent::Close(err) => { - return Poll::Ready(Err(ConnectionError::Handler(err))) - } - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - self.pending_dial_upgrades.push_back(protocol); + match self.handler.poll(cx) { + Poll::Pending => {} + Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { + return Poll::Ready(Ok(Event::Handler(event))) + } + Poll::Ready(ConnectionHandlerEvent::Close(err)) => { + return Poll::Ready(Err(ConnectionError::Handler(err))) + } + Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { + self.pending_dial_upgrades.push_back(protocol); - continue; - } + continue; } } From 8fea3a796c10e52fa70aa8cc4ea67a06d3252a28 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 16:03:38 +0200 Subject: [PATCH 16/43] Fix rustdoc links --- swarm/src/connection/pool.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e4d652209bc..c43d5efb61e 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -19,6 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::connection::Connection; use crate::{ behaviour::{THandlerInEvent, THandlerOutEvent}, connection::{ @@ -89,7 +90,7 @@ where /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, /// The executor to use for running the background tasks. If `None`, @@ -746,7 +747,7 @@ where }, ); - let connection = super::Connection::new( + let connection = Connection::new( muxer, handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, @@ -1163,7 +1164,7 @@ pub struct PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams]. + /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, } @@ -1238,7 +1239,7 @@ impl PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + /// See [`Connection::max_negotiating_inbound_streams`]. pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self { self.max_negotiating_inbound_streams = v; self From 65a06d2ab3d64b23bc6eff02675d245fb3e46b70 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 16:49:55 +0200 Subject: [PATCH 17/43] Replace `if let` with `match` to reduce indentation --- swarm/src/connection.rs | 38 ++++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 12f53d1adbc..09542c1adeb 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -180,30 +180,32 @@ where } // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. - if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_outbound(upgrade, user_data), - Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), + match self.negotiating_out.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => {} + Poll::Ready(Some((user_data, Ok(upgrade)))) => { + self.handler + .inject_fully_negotiated_outbound(upgrade, user_data); + continue; + } + Poll::Ready(Some((user_data, Err(err)))) => { + self.handler.inject_dial_upgrade_error(user_data, err); + continue; } - - // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. - continue; } // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not // make any more progress, poll the negotiating inbound streams. - if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_inbound(upgrade, user_data), - Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), + match self.negotiating_in.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => {} + Poll::Ready(Some((user_data, Ok(upgrade)))) => { + self.handler + .inject_fully_negotiated_inbound(upgrade, user_data); + continue; + } + Poll::Ready(Some((user_data, Err(err)))) => { + self.handler.inject_listen_upgrade_error(user_data, err); + continue; } - - // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. - continue; } // Ask the handler whether it wants the connection (and the handler itself) From 91370a9075c5614c9008e4015380549c35651a36 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 21:20:07 +0200 Subject: [PATCH 18/43] Reduce diff --- swarm/src/connection.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 09542c1adeb..7df8f86d384 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -166,17 +166,16 @@ where // Poll the [`ConnectionHandler`]. match self.handler.poll(cx) { Poll::Pending => {} + Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { + self.pending_dial_upgrades.push_back(protocol); + continue; + } Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(Ok(Event::Handler(event))) } Poll::Ready(ConnectionHandlerEvent::Close(err)) => { return Poll::Ready(Err(ConnectionError::Handler(err))) } - Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { - self.pending_dial_upgrades.push_back(protocol); - - continue; - } } // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. From 0621961e91c93cfa9dc475987b893d9f100cc89a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 21:21:11 +0200 Subject: [PATCH 19/43] Reduce diff, 2nd attempt --- swarm/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 7df8f86d384..191b559ce3d 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -168,7 +168,7 @@ where Poll::Pending => {} Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { self.pending_dial_upgrades.push_back(protocol); - continue; + continue; // Poll handler until exhausted. } Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(Ok(Event::Handler(event))) From 9197de0e3392671aaf2b926c098283a32292da2a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 31 Aug 2022 21:22:56 +0200 Subject: [PATCH 20/43] Reduce diff, 3nd attempt --- swarm/src/connection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 191b559ce3d..e5d16c6c036 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -171,10 +171,10 @@ where continue; // Poll handler until exhausted. } Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { - return Poll::Ready(Ok(Event::Handler(event))) + return Poll::Ready(Ok(Event::Handler(event))); } Poll::Ready(ConnectionHandlerEvent::Close(err)) => { - return Poll::Ready(Err(ConnectionError::Handler(err))) + return Poll::Ready(Err(ConnectionError::Handler(err))); } } From 82d3bd35dfb77ba0e15871df8747a537a144f20f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Sep 2022 17:33:40 +0200 Subject: [PATCH 21/43] Don't use dial terminology for substreams --- swarm/src/connection.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index e5d16c6c036..3bc1b41a1c7 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -103,7 +103,7 @@ where /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, /// For each outbound substream request, how to upgrade it. - pending_dial_upgrades: + pending_outbound_stream_upgrades: VecDeque>, } @@ -141,7 +141,7 @@ where shutdown: Shutdown::None, substream_upgrade_protocol_override, max_negotiating_inbound_streams, - pending_dial_upgrades: VecDeque::with_capacity(8), + pending_outbound_stream_upgrades: VecDeque::with_capacity(8), } } @@ -167,7 +167,7 @@ where match self.handler.poll(cx) { Poll::Pending => {} Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { - self.pending_dial_upgrades.push_back(protocol); + self.pending_outbound_stream_upgrades.push_back(protocol); continue; // Poll handler until exhausted. } Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { @@ -251,12 +251,12 @@ where } } - if !self.pending_dial_upgrades.is_empty() { + if !self.pending_outbound_stream_upgrades.is_empty() { match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { let protocol = self - .pending_dial_upgrades + .pending_outbound_stream_upgrades .pop_front() .expect("`open_info` is not empty"); From e67745538e4532e70d206b12bd27c06d03d2e219 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Sep 2022 17:53:00 +0200 Subject: [PATCH 22/43] Fix typo --- swarm/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 3bc1b41a1c7..a70a9e6585c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -258,7 +258,7 @@ where let protocol = self .pending_outbound_stream_upgrades .pop_front() - .expect("`open_info` is not empty"); + .expect("`pending_outbound_stream_upgrades` is not empty"); self.negotiating_out.push(SubstreamUpgrade::new_outbound( substream, From 6b98c50209b59957e5b85413bc2397c2bdb83f5d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Sep 2022 18:08:25 +0200 Subject: [PATCH 23/43] yamux: Buffer inbound streams in `StreamMuxer::poll` --- muxers/yamux/src/lib.rs | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 5b109f2b3b0..6aa3d7a5451 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,10 +24,12 @@ use futures::{ future, prelude::*, + ready, stream::{BoxStream, LocalBoxStream}, }; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use std::collections::VecDeque; use std::{ fmt, io, iter, mem, pin::Pin, @@ -42,8 +44,20 @@ pub struct Yamux { incoming: S, /// Handle to control the connection. control: yamux::Control, + /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. + /// + /// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the + /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via + /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the move general + /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. + /// + /// This buffer stores inbound streams that are created whilst [`StreaMuxer::poll`] is called. + /// Once the buffer is full, new inbound streams are dropped. + inbound_stream_buffer: VecDeque, } +const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; + impl fmt::Debug for Yamux { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Yamux") @@ -69,6 +83,7 @@ where _marker: std::marker::PhantomData, }, control: ctrl, + inbound_stream_buffer: VecDeque::default(), } } } @@ -88,6 +103,7 @@ where _marker: std::marker::PhantomData, }, control: ctrl, + inbound_stream_buffer: VecDeque::default(), } } } @@ -105,6 +121,10 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + if let Some(stream) = self.inbound_stream_buffer.pop_front() { + return Poll::Ready(Ok(stream)); + } + self.incoming.poll_next_unpin(cx).map(|maybe_stream| { let stream = maybe_stream .transpose()? @@ -125,9 +145,18 @@ where fn poll( self: Pin<&mut Self>, - _: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll> { - Poll::Pending + loop { + let inbound_stream = ready!(self.poll_inbound(cx))?; + + if self.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { + drop(inbound_stream); + continue; + } + + self.inbound_stream_buffer.push_back(inbound_stream); + } } fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> { From 890cecdf9650b84dfb3e1cb4da49676539b88922 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 5 Sep 2022 18:51:07 +0200 Subject: [PATCH 24/43] Fix compile error --- muxers/yamux/src/lib.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 6aa3d7a5451..10605cec2ad 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -27,7 +27,7 @@ use futures::{ ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent, StreamMuxerExt}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; use std::{ @@ -147,15 +147,17 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { + let this = self.get_mut(); + loop { - let inbound_stream = ready!(self.poll_inbound(cx))?; + let inbound_stream = ready!(this.poll_inbound_unpin(cx))?; - if self.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { + if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { drop(inbound_stream); continue; } - self.inbound_stream_buffer.push_back(inbound_stream); + this.inbound_stream_buffer.push_back(inbound_stream); } } From 44e9163411edbdbff6a20eba6a975f7fcd16ced2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 9 Sep 2022 00:44:32 +1000 Subject: [PATCH 25/43] Fix autonat tests We need to call the general `poll` impl last, otherwise we are buffering too many streams. --- swarm/src/connection.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index a70a9e6585c..030616d3478 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -243,14 +243,6 @@ where } } - match self.muxing.poll_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); - } - } - if !self.pending_outbound_stream_upgrades.is_empty() { match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} @@ -285,6 +277,14 @@ where } } + match self.muxing.poll_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); + } + } + return Poll::Pending; // Nothing can make progress, return `Pending`. } } From 649021cfec8b2d1489b5d5d7c7f500bf4c920d39 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 9 Sep 2022 00:51:48 +1000 Subject: [PATCH 26/43] Fix rustdoc link --- muxers/yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 10605cec2ad..a556be483f2 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -51,7 +51,7 @@ pub struct Yamux { /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the move general /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. /// - /// This buffer stores inbound streams that are created whilst [`StreaMuxer::poll`] is called. + /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. /// Once the buffer is full, new inbound streams are dropped. inbound_stream_buffer: VecDeque, } From 9ca11b01d95829eea33cf5a2a6a74f483a573640 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 16:43:31 +1000 Subject: [PATCH 27/43] Fix silly end-less loop mistake and add log --- muxers/yamux/Cargo.toml | 1 + muxers/yamux/src/lib.rs | 28 +++++++++++++++++++--------- protocols/kad/src/behaviour/test.rs | 3 +++ 3 files changed, 23 insertions(+), 9 deletions(-) diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 02dfb832d8d..cf9466be434 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -16,3 +16,4 @@ libp2p-core = { version = "0.35.0", path = "../../core", default-features = fals parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" +log = "0.4" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index a556be483f2..d7aca6f3ef2 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -27,7 +27,7 @@ use futures::{ ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent, StreamMuxerExt}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use std::collections::VecDeque; use std::{ @@ -125,13 +125,7 @@ where return Poll::Ready(Ok(stream)); } - self.incoming.poll_next_unpin(cx).map(|maybe_stream| { - let stream = maybe_stream - .transpose()? - .ok_or(YamuxError(ConnectionError::Closed))?; - - Ok(stream) - }) + self.poll_inner(cx) } fn poll_outbound( @@ -150,9 +144,10 @@ where let this = self.get_mut(); loop { - let inbound_stream = ready!(this.poll_inbound_unpin(cx))?; + let inbound_stream = ready!(this.poll_inner(cx))?; if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { + log::warn!("dropping {inbound_stream} because buffer is full"); drop(inbound_stream); continue; } @@ -180,6 +175,21 @@ where } } +impl Yamux +where + S: Stream> + Unpin, +{ + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { + self.incoming.poll_next_unpin(cx).map(|maybe_stream| { + let stream = maybe_stream + .transpose()? + .ok_or(YamuxError(ConnectionError::Closed))?; + + Ok(stream) + }) + } +} + /// The yamux configuration. #[derive(Debug, Clone)] pub struct YamuxConfig { diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 1f67be5a19d..d22ac398d3d 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -46,6 +46,7 @@ use std::{ time::Duration, u64, }; +use env_logger::init; type TestSwarm = Swarm>; @@ -853,6 +854,8 @@ fn get_record_many() { /// network where X is equal to the configured replication factor. #[test] fn add_provider() { + init(); + fn prop(keys: Vec, seed: Seed) { let mut rng = StdRng::from_seed(seed.0); let replication_factor = From 34ef6537098db360feff434ae30152aa22297ebb Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 16:44:05 +1000 Subject: [PATCH 28/43] Revert "Fix autonat tests" This reverts commit 44e9163411edbdbff6a20eba6a975f7fcd16ced2. --- swarm/src/connection.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 030616d3478..a70a9e6585c 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -243,6 +243,14 @@ where } } + match self.muxing.poll_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); + } + } + if !self.pending_outbound_stream_upgrades.is_empty() { match self.muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} @@ -277,14 +285,6 @@ where } } - match self.muxing.poll_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); - } - } - return Poll::Pending; // Nothing can make progress, return `Pending`. } } From 44c2e3684d43c2b41aab498aefb696c77ee486a9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 17:52:05 +1000 Subject: [PATCH 29/43] Remove accidential log init --- protocols/kad/src/behaviour/test.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index d22ac398d3d..759c9d6973d 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -25,6 +25,7 @@ use super::*; use crate::kbucket::Distance; use crate::record::{store::MemoryStore, Key}; use crate::K_VALUE; +use env_logger::init; use futures::{executor::block_on, future::poll_fn, prelude::*}; use futures_timer::Delay; use libp2p_core::{ @@ -46,7 +47,6 @@ use std::{ time::Duration, u64, }; -use env_logger::init; type TestSwarm = Swarm>; @@ -854,8 +854,6 @@ fn get_record_many() { /// network where X is equal to the configured replication factor. #[test] fn add_provider() { - init(); - fn prop(keys: Vec, seed: Seed) { let mut rng = StdRng::from_seed(seed.0); let replication_factor = From 0ec5a586d4f078fd4416bbc6a702b068c3d57457 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 17:52:41 +1000 Subject: [PATCH 30/43] Remove import --- protocols/kad/src/behaviour/test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 759c9d6973d..1f67be5a19d 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -25,7 +25,6 @@ use super::*; use crate::kbucket::Distance; use crate::record::{store::MemoryStore, Key}; use crate::K_VALUE; -use env_logger::init; use futures::{executor::block_on, future::poll_fn, prelude::*}; use futures_timer::Delay; use libp2p_core::{ From 1ff710df2abe3e6123e779cd4fca170aa0b025fc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 20:03:13 +1000 Subject: [PATCH 31/43] Have timer start on request of substream for outbound streams --- swarm/src/connection.rs | 153 ++++++++++++++++++++++++++++------------ 1 file changed, 106 insertions(+), 47 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index a70a9e6585c..2f1608da33e 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -43,9 +43,9 @@ use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, Subs use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply}; use libp2p_core::PeerId; use libp2p_core::{upgrade, UpgradeError}; -use std::collections::VecDeque; use std::future::Future; -use std::{fmt, io, pin::Pin, task::Context, task::Poll}; +use std::time::Duration; +use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -102,9 +102,10 @@ where /// the total number of streams can be enforced at the /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, - /// For each outbound substream request, how to upgrade it. - pending_outbound_stream_upgrades: - VecDeque>, + /// TODO + requested_substreams: FuturesUnordered< + SubstreamRequested, + >, } impl fmt::Debug for Connection @@ -141,7 +142,7 @@ where shutdown: Shutdown::None, substream_upgrade_protocol_override, max_negotiating_inbound_streams, - pending_outbound_stream_upgrades: VecDeque::with_capacity(8), + requested_substreams: Default::default(), } } @@ -159,15 +160,38 @@ where /// Polls the handler and the substream, forwarding events from the former to the latter and /// vice versa. pub fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, ConnectionError>> { + let Self { + requested_substreams, + muxing, + handler, + negotiating_out, + negotiating_in, + shutdown, + max_negotiating_inbound_streams, + substream_upgrade_protocol_override, + } = self.get_mut(); + loop { + match requested_substreams.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(()))) => continue, + Poll::Ready(Some(Err(user_data))) => { + handler.inject_dial_upgrade_error(user_data, ConnectionHandlerUpgrErr::Timeout); + continue; + } + Poll::Ready(None) | Poll::Pending => {} + } + // Poll the [`ConnectionHandler`]. - match self.handler.poll(cx) { + match handler.poll(cx) { Poll::Pending => {} Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { - self.pending_outbound_stream_upgrades.push_back(protocol); + let timeout = *protocol.timeout(); + let (upgrade, user_data) = protocol.into_upgrade(); + + requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade)); continue; // Poll handler until exhausted. } Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { @@ -179,38 +203,36 @@ where } // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. - match self.negotiating_out.poll_next_unpin(cx) { + match negotiating_out.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some((user_data, Ok(upgrade)))) => { - self.handler - .inject_fully_negotiated_outbound(upgrade, user_data); + handler.inject_fully_negotiated_outbound(upgrade, user_data); continue; } Poll::Ready(Some((user_data, Err(err)))) => { - self.handler.inject_dial_upgrade_error(user_data, err); + handler.inject_dial_upgrade_error(user_data, err); continue; } } // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not // make any more progress, poll the negotiating inbound streams. - match self.negotiating_in.poll_next_unpin(cx) { + match negotiating_in.poll_next_unpin(cx) { Poll::Pending | Poll::Ready(None) => {} Poll::Ready(Some((user_data, Ok(upgrade)))) => { - self.handler - .inject_fully_negotiated_inbound(upgrade, user_data); + handler.inject_fully_negotiated_inbound(upgrade, user_data); continue; } Poll::Ready(Some((user_data, Err(err)))) => { - self.handler.inject_listen_upgrade_error(user_data, err); + handler.inject_listen_upgrade_error(user_data, err); continue; } } // Ask the handler whether it wants the connection (and the handler itself) // to be kept alive, which determines the planned shutdown, if any. - let keep_alive = self.handler.connection_keep_alive(); - match (&mut self.shutdown, keep_alive) { + let keep_alive = handler.connection_keep_alive(); + match (&mut *shutdown, keep_alive) { (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { if *deadline != t { *deadline = t; @@ -221,20 +243,20 @@ where } (_, KeepAlive::Until(t)) => { if let Some(dur) = t.checked_duration_since(Instant::now()) { - self.shutdown = Shutdown::Later(Delay::new(dur), t) + *shutdown = Shutdown::Later(Delay::new(dur), t) } } - (_, KeepAlive::No) => self.shutdown = Shutdown::Asap, - (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, + (_, KeepAlive::No) => *shutdown = Shutdown::Asap, + (_, KeepAlive::Yes) => *shutdown = Shutdown::None, }; // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. - if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { - match self.shutdown { + if negotiating_in.is_empty() && negotiating_out.is_empty() { + match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), - Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { + Shutdown::Later(delay, _) => match Future::poll(Pin::new(delay), cx) { Poll::Ready(_) => { return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) } @@ -243,27 +265,26 @@ where } } - match self.muxing.poll_unpin(cx)? { + match muxing.poll_unpin(cx)? { Poll::Pending => {} Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { - self.handler.inject_address_change(&address); + handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } } - if !self.pending_outbound_stream_upgrades.is_empty() { - match self.muxing.poll_outbound_unpin(cx)? { + if let Some(requested_substream) = requested_substreams.iter_mut().next() { + match muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let protocol = self - .pending_outbound_stream_upgrades - .pop_front() - .expect("`pending_outbound_stream_upgrades` is not empty"); + let (user_data, timeout, upgrade) = requested_substream.extract(); - self.negotiating_out.push(SubstreamUpgrade::new_outbound( + negotiating_out.push(SubstreamUpgrade::new_outbound( substream, - protocol, - self.substream_upgrade_protocol_override, + user_data, + timeout, + upgrade, + *substream_upgrade_protocol_override, )); continue; // Go back to the top, handler can potentially make progress again. @@ -271,14 +292,13 @@ where } } - if self.negotiating_in.len() < self.max_negotiating_inbound_streams { - match self.muxing.poll_inbound_unpin(cx)? { + if negotiating_in.len() < *max_negotiating_inbound_streams { + match muxing.poll_inbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let protocol = self.handler.listen_protocol(); + let protocol = handler.listen_protocol(); - self.negotiating_in - .push(SubstreamUpgrade::new_inbound(substream, protocol)); + negotiating_in.push(SubstreamUpgrade::new_inbound(substream, protocol)); continue; // Go back to the top, handler can potentially make progress again. } @@ -340,12 +360,11 @@ where { fn new_outbound( substream: SubstreamBox, - protocol: SubstreamProtocol, + user_data: UserData, + timeout: Delay, + upgrade: Upgrade, version_override: Option, ) -> Self { - let timeout = *protocol.timeout(); - let (upgrade, open_info) = protocol.into_upgrade(); - let effective_version = match version_override { Some(version_override) if version_override != upgrade::Version::default() => { log::debug!( @@ -360,8 +379,8 @@ where }; Self { - user_data: Some(open_info), - timeout: Delay::new(timeout), + user_data: Some(user_data), + timeout, upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version), } } @@ -430,6 +449,46 @@ where } } +enum SubstreamRequested { + Waiting { + user_data: UserData, + timeout: Delay, + upgrade: Upgrade, + }, + Done, +} + +impl SubstreamRequested { + fn new(user_data: UserData, timeout: Duration, upgrade: Upgrade) -> Self { + Self::Waiting { + user_data, + timeout: Delay::new(timeout), + upgrade, + } + } + + fn extract(&mut self) -> (UserData, Delay, Upgrade) { + match mem::replace(self, Self::Done) { + SubstreamRequested::Waiting { + user_data, + timeout, + upgrade, + } => (user_data, timeout, upgrade), + SubstreamRequested::Done => panic!("cannot extract from a completed future"), + } + } +} + +impl Unpin for SubstreamRequested {} + +impl Future for SubstreamRequested { + type Output = Result<(), UserData>; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + todo!() + } +} + /// The options for a planned connection & handler shutdown. /// /// A shutdown is planned anew based on the the return value of From ce4d9a70d96629a9a26caf74993133bc1b667512 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 20:09:36 +1000 Subject: [PATCH 32/43] Add stub for missing test --- swarm/src/connection.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 2f1608da33e..28b22eca064 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -551,6 +551,11 @@ mod tests { QuickCheck::new().quickcheck(prop as fn(_)); } + #[test] + fn outbound_stream_timeout_starts_on_request() { + unimplemented!() + } + struct DummyStreamMuxer { counter: Arc<()>, } From e0ba9ba7ca3e77dbfdd55575dc8aa6c29c488eed Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 23:43:44 +1000 Subject: [PATCH 33/43] Implement missing test and add waker to SubstreamRequested --- swarm/src/connection.rs | 183 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 179 insertions(+), 4 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 28b22eca064..d28e83d00af 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -44,6 +44,7 @@ use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply}; use libp2p_core::PeerId; use libp2p_core::{upgrade, UpgradeError}; use std::future::Future; +use std::task::Waker; use std::time::Duration; use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; @@ -454,6 +455,7 @@ enum SubstreamRequested { user_data: UserData, timeout: Delay, upgrade: Upgrade, + waker: Option, }, Done, } @@ -464,6 +466,7 @@ impl SubstreamRequested { user_data, timeout: Delay::new(timeout), upgrade, + waker: None, } } @@ -473,7 +476,14 @@ impl SubstreamRequested { user_data, timeout, upgrade, - } => (user_data, timeout, upgrade), + waker, + } => { + if let Some(waker) = waker { + waker.wake(); + } + + (user_data, timeout, upgrade) + } SubstreamRequested::Done => panic!("cannot extract from a completed future"), } } @@ -484,8 +494,29 @@ impl Unpin for SubstreamRequested {} impl Future for SubstreamRequested { type Output = Result<(), UserData>; - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - todo!() + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + match mem::replace(this, Self::Done) { + SubstreamRequested::Waiting { + user_data, + upgrade, + mut timeout, + .. + } => match timeout.poll_unpin(cx) { + Poll::Ready(()) => Poll::Ready(Err(user_data)), + Poll::Pending => { + *this = Self::Waiting { + user_data, + upgrade, + timeout, + waker: Some(cx.waker().clone()), + }; + Poll::Pending + } + }, + SubstreamRequested::Done => Poll::Ready(Ok(())), + } } } @@ -514,6 +545,7 @@ mod tests { use crate::handler::DummyConnectionHandler; use futures::AsyncRead; use futures::AsyncWrite; + use libp2p_core::upgrade::DeniedUpgrade; use libp2p_core::StreamMuxer; use quickcheck::*; use std::sync::{Arc, Weak}; @@ -553,7 +585,27 @@ mod tests { #[test] fn outbound_stream_timeout_starts_on_request() { - unimplemented!() + let upgrade_timeout = Duration::from_secs(1); + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + MockConnectionHandler::new(upgrade_timeout.clone()), + None, + 2, + ); + + connection.handler.open_new_outbound(); + let _ = Pin::new(&mut connection) + .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + + std::thread::sleep(upgrade_timeout + Duration::from_secs(1)); + + let _ = Pin::new(&mut connection) + .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + + assert!(matches!( + connection.handler.error.unwrap(), + ConnectionHandlerUpgrErr::Timeout + )) } struct DummyStreamMuxer { @@ -590,6 +642,39 @@ mod tests { } } + /// A [`StreamMuxer`] which never returns a stream. + struct PendingStreamMuxer; + + impl StreamMuxer for PendingStreamMuxer { + type Substream = PendingSubstream; + type Error = Void; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_outbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + struct PendingSubstream(Weak<()>); impl AsyncRead for PendingSubstream { @@ -619,4 +704,94 @@ mod tests { Poll::Pending } } + + struct MockConnectionHandler { + outbound_requested: bool, + error: Option>, + upgrade_timeout: Duration, + } + + impl MockConnectionHandler { + fn new(upgrade_timeout: Duration) -> Self { + Self { + outbound_requested: false, + error: None, + upgrade_timeout, + } + } + + fn open_new_outbound(&mut self) { + self.outbound_requested = true; + } + } + + impl ConnectionHandler for MockConnectionHandler { + type InEvent = Void; + type OutEvent = Void; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol( + &self, + ) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout) + } + + fn inject_fully_negotiated_inbound( + &mut self, + protocol: ::Output, + _: Self::InboundOpenInfo, + ) { + void::unreachable(protocol) + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol: ::Output, + _: Self::OutboundOpenInfo, + ) { + void::unreachable(protocol) + } + + fn inject_event(&mut self, event: Self::InEvent) { + void::unreachable(event) + } + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + error: ConnectionHandlerUpgrErr<::Error>, + ) { + self.error = Some(error) + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Yes + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if self.outbound_requested { + self.outbound_requested = false; + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(DeniedUpgrade, ()) + .with_timeout(self.upgrade_timeout), + }); + } + + Poll::Pending + } + } } From 9ea9ad8520a94661c19263428067bbc3987419c2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 23:46:13 +1000 Subject: [PATCH 34/43] Add docs --- swarm/src/connection.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index d28e83d00af..d69e99d230a 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -455,7 +455,11 @@ enum SubstreamRequested { user_data: UserData, timeout: Delay, upgrade: Upgrade, - waker: Option, + /// A waker to notify our [`FuturesUnordered`] that we have extracted the data. + /// + /// This will ensure that we will get polled again in the next iteration which allows us to + /// resolve with `Ok(())` and be removed from the [`FuturesUnordered`]. + extracted_waker: Option, }, Done, } @@ -466,7 +470,7 @@ impl SubstreamRequested { user_data, timeout: Delay::new(timeout), upgrade, - waker: None, + extracted_waker: None, } } @@ -476,7 +480,7 @@ impl SubstreamRequested { user_data, timeout, upgrade, - waker, + extracted_waker: waker, } => { if let Some(waker) = waker { waker.wake(); @@ -510,7 +514,7 @@ impl Future for SubstreamRequested { user_data, upgrade, timeout, - waker: Some(cx.waker().clone()), + extracted_waker: Some(cx.waker().clone()), }; Poll::Pending } From a2f220ab05c1a5ae44a49436a6baf4d38d7fe860 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 23:46:29 +1000 Subject: [PATCH 35/43] Improve panic message --- swarm/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index d69e99d230a..4b52409eec7 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -488,7 +488,7 @@ impl SubstreamRequested { (user_data, timeout, upgrade) } - SubstreamRequested::Done => panic!("cannot extract from a completed future"), + SubstreamRequested::Done => panic!("cannot extract twice"), } } } From e7cfb46877c3e234f13372993a10064015288b8c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 23:47:58 +1000 Subject: [PATCH 36/43] Add docs --- swarm/src/connection.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 4b52409eec7..a751ef2e826 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -103,7 +103,10 @@ where /// the total number of streams can be enforced at the /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. max_negotiating_inbound_streams: usize, - /// TODO + /// Contains all upgrades that are waiting for a new outbound substream. + /// + /// The upgrade timeout is already ticking here so this may fail in case the remote is not quick + /// enough in providing us with a new stream. requested_substreams: FuturesUnordered< SubstreamRequested, >, From ee686ede1435ce6337ea62538ab8e318bfebfbdc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 15 Sep 2022 23:56:20 +1000 Subject: [PATCH 37/43] Ensure no shutdown as long as we want to open new streams --- swarm/src/connection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index a751ef2e826..e5064e312ab 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -256,7 +256,7 @@ where // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. - if negotiating_in.is_empty() && negotiating_out.is_empty() { + if negotiating_in.is_empty() && negotiating_out.is_empty() && requested_substreams.is_empty() { match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), From 9bfa0cbb9c9bf1ce6ba9c2b4091786172549da34 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 16 Sep 2022 00:03:14 +1000 Subject: [PATCH 38/43] Bump libp2p-yamux version --- Cargo.toml | 2 +- muxers/yamux/CHANGELOG.md | 7 +++++++ muxers/yamux/Cargo.toml | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b64c2f68756..fe6690dec8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,7 +96,7 @@ libp2p-swarm = { version = "0.39.0", path = "swarm" } libp2p-swarm-derive = { version = "0.30.0", path = "swarm-derive" } libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true } -libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true } +libp2p-yamux = { version = "0.39.1", path = "muxers/yamux", optional = true } multiaddr = { version = "0.14.0" } parking_lot = "0.12.0" pin-project = "1.0.0" diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 5544ad15ab4..4591fc3e6a5 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.39.1 + +- Drive connection also via `StreamMuxer::poll`. Any received streams will be buffered up to a maximum of 25 streams. + See [PR 2861]. + +[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/ + # 0.39.0 - Update to `libp2p-core` `v0.35.0` diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index cf9466be434..657770f31a9 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-yamux" edition = "2021" rust-version = "1.56.1" description = "Yamux multiplexing protocol for libp2p" -version = "0.39.0" +version = "0.39.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From ee612447290ca9d68f57e213bbcc39c6d488de40 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 16 Sep 2022 00:05:27 +1000 Subject: [PATCH 39/43] Add changelog entry to libp2p-swarm --- swarm/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 75add0d524a..5e79a6fa989 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -3,7 +3,12 @@ - Remove deprecated `NetworkBehaviourEventProcess`. See [libp2p-swarm v0.38.0 changelog entry] for migration path. +- Enforce backpressure on incoming streams via `StreamMuxer` interface. In case we hit the configured limit of maximum + number of inbound streams, we will stop polling the `StreamMuxer` for new inbound streams. Depending on the muxer + implementation in use, this may lead to instant dropping of inbound streams. See [PR 2861]. + [libp2p-swarm v0.38.0 changelog entry]: https://github.com/libp2p/rust-libp2p/blob/master/swarm/CHANGELOG.md#0380 +[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/ # 0.38.0 From dcedb4e34c2dd0e0493ce622d88fa8f4d9f2cc2e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Sep 2022 16:17:06 +1000 Subject: [PATCH 40/43] Update muxers/yamux/src/lib.rs Co-authored-by: Max Inden --- muxers/yamux/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index d7aca6f3ef2..231ad353485 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -48,7 +48,7 @@ pub struct Yamux { /// /// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via - /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the move general + /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. /// /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. From a25abeb70d2290195e87b8321bcdf54d0701b9ed Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Sep 2022 16:20:08 +1000 Subject: [PATCH 41/43] Update Cargo.toml --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 86b6075516b..3f82880528b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ libp2p-swarm = { version = "0.39.0", path = "swarm" } libp2p-swarm-derive = { version = "0.30.1", path = "swarm-derive" } libp2p-uds = { version = "0.35.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.36.0", path = "transports/wasm-ext", default-features = false, optional = true } -libp2p-yamux = { version = "0.40.1", path = "muxers/yamux", optional = true } +libp2p-yamux = { version = "0.40.0", path = "muxers/yamux", optional = true } multiaddr = { version = "0.14.0" } parking_lot = "0.12.0" pin-project = "1.0.0" From b8ad471dcf217574d4c6fd1349b32cc774df01fc Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Sep 2022 16:20:36 +1000 Subject: [PATCH 42/43] Update muxers/yamux/Cargo.toml --- muxers/yamux/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index c181ce46e16..c5a59e327c6 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-yamux" edition = "2021" rust-version = "1.56.1" description = "Yamux multiplexing protocol for libp2p" -version = "0.40.1" +version = "0.40.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 0cb8a8739d3dfd9d97eec5fc0d8e5d39eefa834d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Mon, 19 Sep 2022 23:03:08 +1000 Subject: [PATCH 43/43] Fmt --- swarm/src/connection.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index e5064e312ab..dc9a2eb92e3 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -256,7 +256,10 @@ where // Check if the connection (and handler) should be shut down. // As long as we're still negotiating substreams, shutdown is always postponed. - if negotiating_in.is_empty() && negotiating_out.is_empty() && requested_substreams.is_empty() { + if negotiating_in.is_empty() + && negotiating_out.is_empty() + && requested_substreams.is_empty() + { match shutdown { Shutdown::None => {} Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)),