diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 54eb19865a2..ce64ed1c182 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -4,7 +4,11 @@ - Remove `OpenSubstreamToken` as it is dead code. See [PR 2873]. +- Drive connection also via `StreamMuxer::poll`. Any received streams will be buffered up to a maximum of 25 streams. + See [PR 2861]. + [PR 2873]: https://github.com/libp2p/rust-libp2p/pull/2873/ +[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/ # 0.39.0 diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 1ee7b4ae667..c5a59e327c6 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -16,3 +16,4 @@ libp2p-core = { version = "0.36.0", path = "../../core", default-features = fals parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" +log = "0.4" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 1c4c9e7c7c9..50e8a3ecc5e 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,10 +24,12 @@ use futures::{ future, prelude::*, + ready, stream::{BoxStream, LocalBoxStream}, }; use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use std::collections::VecDeque; use std::{ fmt, io, iter, mem, pin::Pin, @@ -42,8 +44,20 @@ pub struct Yamux { incoming: S, /// Handle to control the connection. control: yamux::Control, + /// Temporarily buffers inbound streams in case our node is performing backpressure on the remote. + /// + /// The only way how yamux can make progress is by driving the [`Incoming`] stream. However, the + /// [`StreamMuxer`] interface is designed to allow a caller to selectively make progress via + /// [`StreamMuxer::poll_inbound`] and [`StreamMuxer::poll_outbound`] whilst the more general + /// [`StreamMuxer::poll`] is designed to make progress on existing streams etc. + /// + /// This buffer stores inbound streams that are created whilst [`StreamMuxer::poll`] is called. + /// Once the buffer is full, new inbound streams are dropped. + inbound_stream_buffer: VecDeque, } +const MAX_BUFFERED_INBOUND_STREAMS: usize = 25; + impl fmt::Debug for Yamux { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("Yamux") @@ -65,6 +79,7 @@ where _marker: std::marker::PhantomData, }, control: ctrl, + inbound_stream_buffer: VecDeque::default(), } } } @@ -84,6 +99,7 @@ where _marker: std::marker::PhantomData, }, control: ctrl, + inbound_stream_buffer: VecDeque::default(), } } } @@ -101,13 +117,11 @@ where mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - self.incoming.poll_next_unpin(cx).map(|maybe_stream| { - let stream = maybe_stream - .transpose()? - .ok_or(YamuxError(ConnectionError::Closed))?; + if let Some(stream) = self.inbound_stream_buffer.pop_front() { + return Poll::Ready(Ok(stream)); + } - Ok(stream) - }) + self.poll_inner(cx) } fn poll_outbound( @@ -121,9 +135,21 @@ where fn poll( self: Pin<&mut Self>, - _: &mut Context<'_>, + cx: &mut Context<'_>, ) -> Poll> { - Poll::Pending + let this = self.get_mut(); + + loop { + let inbound_stream = ready!(this.poll_inner(cx))?; + + if this.inbound_stream_buffer.len() >= MAX_BUFFERED_INBOUND_STREAMS { + log::warn!("dropping {inbound_stream} because buffer is full"); + drop(inbound_stream); + continue; + } + + this.inbound_stream_buffer.push_back(inbound_stream); + } } fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> { @@ -145,6 +171,21 @@ where } } +impl Yamux +where + S: Stream> + Unpin, +{ + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll> { + self.incoming.poll_next_unpin(cx).map(|maybe_stream| { + let stream = maybe_stream + .transpose()? + .ok_or(YamuxError(ConnectionError::Closed))?; + + Ok(stream) + }) + } +} + /// The yamux configuration. #[derive(Debug, Clone)] pub struct YamuxConfig { diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index abf54657cda..3937c265d31 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -5,7 +5,12 @@ - Update to `libp2p-core` `v0.36.0`. +- Enforce backpressure on incoming streams via `StreamMuxer` interface. In case we hit the configured limit of maximum + number of inbound streams, we will stop polling the `StreamMuxer` for new inbound streams. Depending on the muxer + implementation in use, this may lead to instant dropping of inbound streams. See [PR 2861]. + [libp2p-swarm v0.38.0 changelog entry]: https://github.com/libp2p/rust-libp2p/blob/master/swarm/CHANGELOG.md#0380 +[PR 2861]: https://github.com/libp2p/rust-libp2p/pull/2861/ # 0.38.0 diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 24e54aba525..dc9a2eb92e3 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -19,7 +19,6 @@ // DEALINGS IN THE SOFTWARE. mod error; -mod handler_wrapper; pub(crate) mod pool; @@ -31,16 +30,23 @@ pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; -use crate::IntoConnectionHandler; -use handler_wrapper::HandlerWrapper; +use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; +use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; +use futures::stream::FuturesUnordered; +use futures::FutureExt; +use futures::StreamExt; +use futures_timer::Delay; +use instant::Instant; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt}; -use libp2p_core::upgrade; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt, SubstreamBox}; +use libp2p_core::upgrade::{InboundUpgradeApply, OutboundUpgradeApply}; use libp2p_core::PeerId; -use std::collections::VecDeque; +use libp2p_core::{upgrade, UpgradeError}; use std::future::Future; -use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; +use std::task::Waker; +use std::time::Duration; +use std::{fmt, io, mem, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -51,13 +57,6 @@ pub struct Connected { pub peer_id: PeerId, } -/// Endpoint for a received substream. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SubstreamEndpoint { - Dialer(TDialInfo), - Listener, -} - /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub enum Event { @@ -74,10 +73,43 @@ where { /// Node that handles the muxing. muxing: StreamMuxerBox, - /// Handler that processes substreams. - handler: HandlerWrapper, - /// List of "open_info" that is waiting for new outbound substreams. - open_info: VecDeque>, + /// The underlying handler. + handler: THandler, + /// Futures that upgrade incoming substreams. + negotiating_in: FuturesUnordered< + SubstreamUpgrade< + THandler::InboundOpenInfo, + InboundUpgradeApply>, + >, + >, + /// Futures that upgrade outgoing substreams. + negotiating_out: FuturesUnordered< + SubstreamUpgrade< + THandler::OutboundOpenInfo, + OutboundUpgradeApply>, + >, + >, + /// The currently planned connection & handler shutdown. + shutdown: Shutdown, + /// The substream upgrade protocol override, if any. + substream_upgrade_protocol_override: Option, + /// The maximum number of inbound streams concurrently negotiating on a + /// connection. New inbound streams exceeding the limit are dropped and thus + /// reset. + /// + /// Note: This only enforces a limit on the number of concurrently + /// negotiating inbound streams. The total number of inbound streams on a + /// connection is the sum of negotiating and negotiated streams. A limit on + /// the total number of streams can be enforced at the + /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. + max_negotiating_inbound_streams: usize, + /// Contains all upgrades that are waiting for a new outbound substream. + /// + /// The upgrade timeout is already ticking here so this may fail in case the remote is not quick + /// enough in providing us with a new stream. + requested_substreams: FuturesUnordered< + SubstreamRequested, + >, } impl fmt::Debug for Connection @@ -88,7 +120,6 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") .field("handler", &self.handler) - .field("open_info", &self.open_info) .finish() } } @@ -102,24 +133,20 @@ where /// Builds a new `Connection` from the given substream multiplexer /// and connection handler. pub fn new( - peer_id: PeerId, - endpoint: ConnectedPoint, muxer: StreamMuxerBox, - handler: impl IntoConnectionHandler, + handler: THandler, substream_upgrade_protocol_override: Option, max_negotiating_inbound_streams: usize, ) -> Self { - let wrapped_handler = HandlerWrapper::new( - peer_id, - endpoint, + Connection { + muxing: muxer, handler, + negotiating_in: Default::default(), + negotiating_out: Default::default(), + shutdown: Shutdown::None, substream_upgrade_protocol_override, max_negotiating_inbound_streams, - ); - Connection { - muxing: muxer, - handler: wrapped_handler, - open_info: VecDeque::with_capacity(8), + requested_substreams: Default::default(), } } @@ -131,57 +158,157 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. pub fn close(self) -> (THandler, impl Future>) { - (self.handler.into_connection_handler(), self.muxing.close()) + (self.handler, self.muxing.close()) } /// Polls the handler and the substream, forwarding events from the former to the latter and /// vice versa. pub fn poll( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, ConnectionError>> { + let Self { + requested_substreams, + muxing, + handler, + negotiating_out, + negotiating_in, + shutdown, + max_negotiating_inbound_streams, + substream_upgrade_protocol_override, + } = self.get_mut(); + loop { - // Poll the handler for new events. - match self.handler.poll(cx)? { + match requested_substreams.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(()))) => continue, + Poll::Ready(Some(Err(user_data))) => { + handler.inject_dial_upgrade_error(user_data, ConnectionHandlerUpgrErr::Timeout); + continue; + } + Poll::Ready(None) | Poll::Pending => {} + } + + // Poll the [`ConnectionHandler`]. + match handler.poll(cx) { Poll::Pending => {} - Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => { - self.open_info.push_back(user_data); + Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol }) => { + let timeout = *protocol.timeout(); + let (upgrade, user_data) = protocol.into_upgrade(); + + requested_substreams.push(SubstreamRequested::new(user_data, timeout, upgrade)); continue; // Poll handler until exhausted. } - Poll::Ready(handler_wrapper::Event::Custom(event)) => { + Poll::Ready(ConnectionHandlerEvent::Custom(event)) => { return Poll::Ready(Ok(Event::Handler(event))); } + Poll::Ready(ConnectionHandlerEvent::Close(err)) => { + return Poll::Ready(Err(ConnectionError::Handler(err))); + } } - match self.muxing.poll_unpin(cx)? { + // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. + match negotiating_out.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => {} + Poll::Ready(Some((user_data, Ok(upgrade)))) => { + handler.inject_fully_negotiated_outbound(upgrade, user_data); + continue; + } + Poll::Ready(Some((user_data, Err(err)))) => { + handler.inject_dial_upgrade_error(user_data, err); + continue; + } + } + + // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not + // make any more progress, poll the negotiating inbound streams. + match negotiating_in.poll_next_unpin(cx) { + Poll::Pending | Poll::Ready(None) => {} + Poll::Ready(Some((user_data, Ok(upgrade)))) => { + handler.inject_fully_negotiated_inbound(upgrade, user_data); + continue; + } + Poll::Ready(Some((user_data, Err(err)))) => { + handler.inject_listen_upgrade_error(user_data, err); + continue; + } + } + + // Ask the handler whether it wants the connection (and the handler itself) + // to be kept alive, which determines the planned shutdown, if any. + let keep_alive = handler.connection_keep_alive(); + match (&mut *shutdown, keep_alive) { + (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { + if *deadline != t { + *deadline = t; + if let Some(dur) = deadline.checked_duration_since(Instant::now()) { + timer.reset(dur) + } + } + } + (_, KeepAlive::Until(t)) => { + if let Some(dur) = t.checked_duration_since(Instant::now()) { + *shutdown = Shutdown::Later(Delay::new(dur), t) + } + } + (_, KeepAlive::No) => *shutdown = Shutdown::Asap, + (_, KeepAlive::Yes) => *shutdown = Shutdown::None, + }; + + // Check if the connection (and handler) should be shut down. + // As long as we're still negotiating substreams, shutdown is always postponed. + if negotiating_in.is_empty() + && negotiating_out.is_empty() + && requested_substreams.is_empty() + { + match shutdown { + Shutdown::None => {} + Shutdown::Asap => return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)), + Shutdown::Later(delay, _) => match Future::poll(Pin::new(delay), cx) { + Poll::Ready(_) => { + return Poll::Ready(Err(ConnectionError::KeepAliveTimeout)) + } + Poll::Pending => {} + }, + } + } + + match muxing.poll_unpin(cx)? { Poll::Pending => {} Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { - self.handler.inject_address_change(&address); + handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } } - if !self.open_info.is_empty() { - match self.muxing.poll_outbound_unpin(cx)? { + if let Some(requested_substream) = requested_substreams.iter_mut().next() { + match muxing.poll_outbound_unpin(cx)? { Poll::Pending => {} Poll::Ready(substream) => { - let user_data = self - .open_info - .pop_front() - .expect("`open_info` is not empty"); - let endpoint = SubstreamEndpoint::Dialer(user_data); - self.handler.inject_substream(substream, endpoint); + let (user_data, timeout, upgrade) = requested_substream.extract(); + + negotiating_out.push(SubstreamUpgrade::new_outbound( + substream, + user_data, + timeout, + upgrade, + *substream_upgrade_protocol_override, + )); + continue; // Go back to the top, handler can potentially make progress again. } } } - match self.muxing.poll_inbound_unpin(cx)? { - Poll::Pending => {} - Poll::Ready(substream) => { - self.handler - .inject_substream(substream, SubstreamEndpoint::Listener); - continue; // Go back to the top, handler can potentially make progress again. + if negotiating_in.len() < *max_negotiating_inbound_streams { + match muxing.poll_inbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + let protocol = handler.listen_protocol(); + + negotiating_in.push(SubstreamUpgrade::new_inbound(substream, protocol)); + + continue; // Go back to the top, handler can potentially make progress again. + } } } @@ -225,4 +352,456 @@ impl fmt::Display for ConnectionLimit { } /// A `ConnectionLimit` can represent an error if it has been exceeded. -impl Error for ConnectionLimit {} +impl std::error::Error for ConnectionLimit {} + +struct SubstreamUpgrade { + user_data: Option, + timeout: Delay, + upgrade: Upgrade, +} + +impl + SubstreamUpgrade>> +where + Upgrade: Send + OutboundUpgradeSend, +{ + fn new_outbound( + substream: SubstreamBox, + user_data: UserData, + timeout: Delay, + upgrade: Upgrade, + version_override: Option, + ) -> Self { + let effective_version = match version_override { + Some(version_override) if version_override != upgrade::Version::default() => { + log::debug!( + "Substream upgrade protocol override: {:?} -> {:?}", + upgrade::Version::default(), + version_override + ); + + version_override + } + _ => upgrade::Version::default(), + }; + + Self { + user_data: Some(user_data), + timeout, + upgrade: upgrade::apply_outbound(substream, SendWrapper(upgrade), effective_version), + } + } +} + +impl + SubstreamUpgrade>> +where + Upgrade: Send + InboundUpgradeSend, +{ + fn new_inbound( + substream: SubstreamBox, + protocol: SubstreamProtocol, + ) -> Self { + let timeout = *protocol.timeout(); + let (upgrade, open_info) = protocol.into_upgrade(); + + Self { + user_data: Some(open_info), + timeout: Delay::new(timeout), + upgrade: upgrade::apply_inbound(substream, SendWrapper(upgrade)), + } + } +} + +impl Unpin for SubstreamUpgrade {} + +impl Future for SubstreamUpgrade +where + Upgrade: Future>> + Unpin, +{ + type Output = ( + UserData, + Result>, + ); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match self.timeout.poll_unpin(cx) { + Poll::Ready(()) => { + return Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Err(ConnectionHandlerUpgrErr::Timeout), + )) + } + + Poll::Pending => {} + } + + match self.upgrade.poll_unpin(cx) { + Poll::Ready(Ok(upgrade)) => Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Ok(upgrade), + )), + Poll::Ready(Err(err)) => Poll::Ready(( + self.user_data + .take() + .expect("Future not to be polled again once ready."), + Err(ConnectionHandlerUpgrErr::Upgrade(err)), + )), + Poll::Pending => Poll::Pending, + } + } +} + +enum SubstreamRequested { + Waiting { + user_data: UserData, + timeout: Delay, + upgrade: Upgrade, + /// A waker to notify our [`FuturesUnordered`] that we have extracted the data. + /// + /// This will ensure that we will get polled again in the next iteration which allows us to + /// resolve with `Ok(())` and be removed from the [`FuturesUnordered`]. + extracted_waker: Option, + }, + Done, +} + +impl SubstreamRequested { + fn new(user_data: UserData, timeout: Duration, upgrade: Upgrade) -> Self { + Self::Waiting { + user_data, + timeout: Delay::new(timeout), + upgrade, + extracted_waker: None, + } + } + + fn extract(&mut self) -> (UserData, Delay, Upgrade) { + match mem::replace(self, Self::Done) { + SubstreamRequested::Waiting { + user_data, + timeout, + upgrade, + extracted_waker: waker, + } => { + if let Some(waker) = waker { + waker.wake(); + } + + (user_data, timeout, upgrade) + } + SubstreamRequested::Done => panic!("cannot extract twice"), + } + } +} + +impl Unpin for SubstreamRequested {} + +impl Future for SubstreamRequested { + type Output = Result<(), UserData>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + match mem::replace(this, Self::Done) { + SubstreamRequested::Waiting { + user_data, + upgrade, + mut timeout, + .. + } => match timeout.poll_unpin(cx) { + Poll::Ready(()) => Poll::Ready(Err(user_data)), + Poll::Pending => { + *this = Self::Waiting { + user_data, + upgrade, + timeout, + extracted_waker: Some(cx.waker().clone()), + }; + Poll::Pending + } + }, + SubstreamRequested::Done => Poll::Ready(Ok(())), + } + } +} + +/// The options for a planned connection & handler shutdown. +/// +/// A shutdown is planned anew based on the the return value of +/// [`ConnectionHandler::connection_keep_alive`] of the underlying handler +/// after every invocation of [`ConnectionHandler::poll`]. +/// +/// A planned shutdown is always postponed for as long as there are ingoing +/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" +/// shutdown. +#[derive(Debug)] +enum Shutdown { + /// No shutdown is planned. + None, + /// A shut down is planned as soon as possible. + Asap, + /// A shut down is planned for when a `Delay` has elapsed. + Later(Delay, Instant), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::handler::DummyConnectionHandler; + use futures::AsyncRead; + use futures::AsyncWrite; + use libp2p_core::upgrade::DeniedUpgrade; + use libp2p_core::StreamMuxer; + use quickcheck::*; + use std::sync::{Arc, Weak}; + use void::Void; + + #[test] + fn max_negotiating_inbound_streams() { + fn prop(max_negotiating_inbound_streams: u8) { + let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); + + let alive_substream_counter = Arc::new(()); + + let mut connection = Connection::new( + StreamMuxerBox::new(DummyStreamMuxer { + counter: alive_substream_counter.clone(), + }), + DummyConnectionHandler { + keep_alive: KeepAlive::Yes, + }, + None, + max_negotiating_inbound_streams, + ); + + let result = Pin::new(&mut connection) + .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + + assert!(result.is_pending()); + assert_eq!( + Arc::weak_count(&alive_substream_counter), + max_negotiating_inbound_streams, + "Expect no more than the maximum number of allowed streams" + ); + } + + QuickCheck::new().quickcheck(prop as fn(_)); + } + + #[test] + fn outbound_stream_timeout_starts_on_request() { + let upgrade_timeout = Duration::from_secs(1); + let mut connection = Connection::new( + StreamMuxerBox::new(PendingStreamMuxer), + MockConnectionHandler::new(upgrade_timeout.clone()), + None, + 2, + ); + + connection.handler.open_new_outbound(); + let _ = Pin::new(&mut connection) + .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + + std::thread::sleep(upgrade_timeout + Duration::from_secs(1)); + + let _ = Pin::new(&mut connection) + .poll(&mut Context::from_waker(futures::task::noop_waker_ref())); + + assert!(matches!( + connection.handler.error.unwrap(), + ConnectionHandlerUpgrErr::Timeout + )) + } + + struct DummyStreamMuxer { + counter: Arc<()>, + } + + impl StreamMuxer for DummyStreamMuxer { + type Substream = PendingSubstream; + type Error = Void; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(PendingSubstream(Arc::downgrade(&self.counter)))) + } + + fn poll_outbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + + /// A [`StreamMuxer`] which never returns a stream. + struct PendingStreamMuxer; + + impl StreamMuxer for PendingStreamMuxer { + type Substream = PendingSubstream; + type Error = Void; + + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_outbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + Poll::Pending + } + } + + struct PendingSubstream(Weak<()>); + + impl AsyncRead for PendingSubstream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &mut [u8], + ) -> Poll> { + Poll::Pending + } + } + + impl AsyncWrite for PendingSubstream { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + _buf: &[u8], + ) -> Poll> { + Poll::Pending + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + } + + struct MockConnectionHandler { + outbound_requested: bool, + error: Option>, + upgrade_timeout: Duration, + } + + impl MockConnectionHandler { + fn new(upgrade_timeout: Duration) -> Self { + Self { + outbound_requested: false, + error: None, + upgrade_timeout, + } + } + + fn open_new_outbound(&mut self) { + self.outbound_requested = true; + } + } + + impl ConnectionHandler for MockConnectionHandler { + type InEvent = Void; + type OutEvent = Void; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = DeniedUpgrade; + type InboundOpenInfo = (); + type OutboundOpenInfo = (); + + fn listen_protocol( + &self, + ) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout) + } + + fn inject_fully_negotiated_inbound( + &mut self, + protocol: ::Output, + _: Self::InboundOpenInfo, + ) { + void::unreachable(protocol) + } + + fn inject_fully_negotiated_outbound( + &mut self, + protocol: ::Output, + _: Self::OutboundOpenInfo, + ) { + void::unreachable(protocol) + } + + fn inject_event(&mut self, event: Self::InEvent) { + void::unreachable(event) + } + + fn inject_dial_upgrade_error( + &mut self, + _: Self::OutboundOpenInfo, + error: ConnectionHandlerUpgrErr<::Error>, + ) { + self.error = Some(error) + } + + fn connection_keep_alive(&self) -> KeepAlive { + KeepAlive::Yes + } + + fn poll( + &mut self, + _: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + if self.outbound_requested { + self.outbound_requested = false; + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(DeniedUpgrade, ()) + .with_timeout(self.upgrade_timeout), + }); + } + + Poll::Pending + } + } +} diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index 8a6d6bbbf00..db51ebca874 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::handler_wrapper; use crate::transport::TransportError; use crate::Multiaddr; use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; @@ -66,15 +65,6 @@ where } } -impl From> for ConnectionError { - fn from(error: handler_wrapper::Error) -> Self { - match error { - handler_wrapper::Error::Handler(e) => Self::Handler(e), - handler_wrapper::Error::KeepAliveTimeout => Self::KeepAliveTimeout, - } - } -} - impl From for ConnectionError { fn from(error: io::Error) -> Self { ConnectionError::IO(error) diff --git a/swarm/src/connection/handler_wrapper.rs b/swarm/src/connection/handler_wrapper.rs deleted file mode 100644 index 03d09b3fbc1..00000000000 --- a/swarm/src/connection/handler_wrapper.rs +++ /dev/null @@ -1,521 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::connection::SubstreamEndpoint; -use crate::handler::{ - ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, -}; -use crate::upgrade::SendWrapper; -use crate::IntoConnectionHandler; - -use futures::prelude::*; -use futures::stream::FuturesUnordered; -use futures_timer::Delay; -use instant::Instant; -use libp2p_core::{ - muxing::SubstreamBox, - upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply, UpgradeError}, - Multiaddr, -}; -use libp2p_core::{ConnectedPoint, PeerId}; -use std::{error, fmt, pin::Pin, task::Context, task::Poll, time::Duration}; - -/// A wrapper for an underlying [`ConnectionHandler`]. -/// -/// It extends [`ConnectionHandler`] with: -/// - Enforced substream upgrade timeouts -/// - Driving substream upgrades -/// - Handling connection timeout -// TODO: add a caching system for protocols that are supported or not -pub struct HandlerWrapper -where - TConnectionHandler: ConnectionHandler, -{ - remote_peer_id: PeerId, - /// The underlying handler. - handler: TConnectionHandler, - /// Futures that upgrade incoming substreams. - negotiating_in: FuturesUnordered< - SubstreamUpgrade< - TConnectionHandler::InboundOpenInfo, - InboundUpgradeApply>, - >, - >, - /// Futures that upgrade outgoing substreams. - negotiating_out: FuturesUnordered< - SubstreamUpgrade< - TConnectionHandler::OutboundOpenInfo, - OutboundUpgradeApply>, - >, - >, - /// For each outbound substream request, how to upgrade it. The first element of the tuple - /// is the unique identifier (see `unique_dial_upgrade_id`). - queued_dial_upgrades: Vec<(u64, SendWrapper)>, - /// Unique identifier assigned to each queued dial upgrade. - unique_dial_upgrade_id: u64, - /// The currently planned connection & handler shutdown. - shutdown: Shutdown, - /// The substream upgrade protocol override, if any. - substream_upgrade_protocol_override: Option, - /// The maximum number of inbound streams concurrently negotiating on a - /// connection. New inbound streams exceeding the limit are dropped and thus - /// reset. - /// - /// Note: This only enforces a limit on the number of concurrently - /// negotiating inbound streams. The total number of inbound streams on a - /// connection is the sum of negotiating and negotiated streams. A limit on - /// the total number of streams can be enforced at the - /// [`StreamMuxerBox`](libp2p_core::muxing::StreamMuxerBox) level. - max_negotiating_inbound_streams: usize, -} - -impl std::fmt::Debug for HandlerWrapper { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("HandlerWrapper") - .field("negotiating_in", &self.negotiating_in) - .field("negotiating_out", &self.negotiating_out) - .field("unique_dial_upgrade_id", &self.unique_dial_upgrade_id) - .field("shutdown", &self.shutdown) - .field( - "substream_upgrade_protocol_override", - &self.substream_upgrade_protocol_override, - ) - .finish() - } -} - -impl HandlerWrapper { - pub(crate) fn new( - remote_peer_id: PeerId, - endpoint: ConnectedPoint, - handler: impl IntoConnectionHandler, - substream_upgrade_protocol_override: Option, - max_negotiating_inbound_streams: usize, - ) -> Self { - Self { - remote_peer_id, - handler: handler.into_handler(&remote_peer_id, &endpoint), - negotiating_in: Default::default(), - negotiating_out: Default::default(), - queued_dial_upgrades: Vec::new(), - unique_dial_upgrade_id: 0, - shutdown: Shutdown::None, - substream_upgrade_protocol_override, - max_negotiating_inbound_streams, - } - } - - pub(crate) fn into_connection_handler(self) -> TConnectionHandler { - self.handler - } -} - -struct SubstreamUpgrade { - user_data: Option, - timeout: Delay, - upgrade: Upgrade, -} - -impl Unpin for SubstreamUpgrade {} - -impl Future for SubstreamUpgrade -where - Upgrade: Future>> + Unpin, -{ - type Output = ( - UserData, - Result>, - ); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - match self.timeout.poll_unpin(cx) { - Poll::Ready(()) => { - return Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Timeout), - )) - } - - Poll::Pending => {} - } - - match self.upgrade.poll_unpin(cx) { - Poll::Ready(Ok(upgrade)) => Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Ok(upgrade), - )), - Poll::Ready(Err(err)) => Poll::Ready(( - self.user_data - .take() - .expect("Future not to be polled again once ready."), - Err(ConnectionHandlerUpgrErr::Upgrade(err)), - )), - Poll::Pending => Poll::Pending, - } - } -} - -/// The options for a planned connection & handler shutdown. -/// -/// A shutdown is planned anew based on the the return value of -/// [`ConnectionHandler::connection_keep_alive`] of the underlying handler -/// after every invocation of [`ConnectionHandler::poll`]. -/// -/// A planned shutdown is always postponed for as long as there are ingoing -/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle" -/// shutdown. -#[derive(Debug)] -enum Shutdown { - /// No shutdown is planned. - None, - /// A shut down is planned as soon as possible. - Asap, - /// A shut down is planned for when a `Delay` has elapsed. - Later(Delay, Instant), -} - -/// Error generated by the [`HandlerWrapper`]. -#[derive(Debug)] -pub enum Error { - /// The connection handler encountered an error. - Handler(TErr), - /// The connection keep-alive timeout expired. - KeepAliveTimeout, -} - -impl From for Error { - fn from(err: TErr) -> Error { - Error::Handler(err) - } -} - -impl fmt::Display for Error -where - TErr: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Error::Handler(err) => write!(f, "{}", err), - Error::KeepAliveTimeout => { - write!(f, "Connection closed due to expired keep-alive timeout.") - } - } - } -} - -impl error::Error for Error -where - TErr: error::Error + 'static, -{ - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - Error::Handler(err) => Some(err), - Error::KeepAliveTimeout => None, - } - } -} - -pub type OutboundOpenInfo = ( - u64, - ::OutboundOpenInfo, - Duration, -); - -impl HandlerWrapper -where - TConnectionHandler: ConnectionHandler, -{ - pub fn inject_substream( - &mut self, - substream: SubstreamBox, - // The first element of the tuple is the unique upgrade identifier - // (see `unique_dial_upgrade_id`). - endpoint: SubstreamEndpoint>, - ) { - match endpoint { - SubstreamEndpoint::Listener => { - if self.negotiating_in.len() == self.max_negotiating_inbound_streams { - log::warn!( - "Incoming substream from {} exceeding maximum number \ - of negotiating inbound streams {} on connection. \ - Dropping. See PoolConfig::with_max_negotiating_inbound_streams.", - self.remote_peer_id, - self.max_negotiating_inbound_streams, - ); - return; - } - - let protocol = self.handler.listen_protocol(); - let timeout = *protocol.timeout(); - let (upgrade, user_data) = protocol.into_upgrade(); - let upgrade = upgrade::apply_inbound(substream, SendWrapper(upgrade)); - let timeout = Delay::new(timeout); - self.negotiating_in.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); - } - SubstreamEndpoint::Dialer((upgrade_id, user_data, timeout)) => { - let pos = match self - .queued_dial_upgrades - .iter() - .position(|(id, _)| id == &upgrade_id) - { - Some(p) => p, - None => { - debug_assert!(false, "Received an upgrade with an invalid upgrade ID"); - return; - } - }; - - let (_, upgrade) = self.queued_dial_upgrades.remove(pos); - let mut version = upgrade::Version::default(); - if let Some(v) = self.substream_upgrade_protocol_override { - if v != version { - log::debug!( - "Substream upgrade protocol override: {:?} -> {:?}", - version, - v - ); - version = v; - } - } - let upgrade = upgrade::apply_outbound(substream, upgrade, version); - let timeout = Delay::new(timeout); - self.negotiating_out.push(SubstreamUpgrade { - user_data: Some(user_data), - timeout, - upgrade, - }); - } - } - } - - pub fn inject_event(&mut self, event: TConnectionHandler::InEvent) { - self.handler.inject_event(event); - } - - pub fn inject_address_change(&mut self, new_address: &Multiaddr) { - self.handler.inject_address_change(new_address); - } - - fn handle_connection_handler_event( - &mut self, - handler_event: ConnectionHandlerEvent< - TConnectionHandler::OutboundProtocol, - TConnectionHandler::OutboundOpenInfo, - TConnectionHandler::OutEvent, - TConnectionHandler::Error, - >, - ) -> Result< - Event, TConnectionHandler::OutEvent>, - Error, - > { - match handler_event { - ConnectionHandlerEvent::Custom(event) => Ok(Event::Custom(event)), - ConnectionHandlerEvent::OutboundSubstreamRequest { protocol } => { - let id = self.unique_dial_upgrade_id; - let timeout = *protocol.timeout(); - self.unique_dial_upgrade_id += 1; - let (upgrade, info) = protocol.into_upgrade(); - self.queued_dial_upgrades.push((id, SendWrapper(upgrade))); - Ok(Event::OutboundSubstreamRequest((id, info, timeout))) - } - ConnectionHandlerEvent::Close(err) => Err(err.into()), - } - } - - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll< - Result< - Event, TConnectionHandler::OutEvent>, - Error, - >, - > { - loop { - // Poll the [`ConnectionHandler`]. - if let Poll::Ready(handler_event) = self.handler.poll(cx) { - let wrapper_event = self.handle_connection_handler_event(handler_event)?; - return Poll::Ready(Ok(wrapper_event)); - } - - // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. - if let Poll::Ready(Some((user_data, res))) = self.negotiating_out.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_outbound(upgrade, user_data), - Err(err) => self.handler.inject_dial_upgrade_error(user_data, err), - } - - // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. - continue; - } - - // In case both the [`ConnectionHandler`] and the negotiating outbound streams can not - // make any more progress, poll the negotiating inbound streams. - if let Poll::Ready(Some((user_data, res))) = self.negotiating_in.poll_next_unpin(cx) { - match res { - Ok(upgrade) => self - .handler - .inject_fully_negotiated_inbound(upgrade, user_data), - Err(err) => self.handler.inject_listen_upgrade_error(user_data, err), - } - - // After the `inject_*` calls, the [`ConnectionHandler`] might be able to make progress. - continue; - } - - // None of the three can make any more progress, thus breaking the loop. - break; - } - - // Ask the handler whether it wants the connection (and the handler itself) - // to be kept alive, which determines the planned shutdown, if any. - match (&mut self.shutdown, self.handler.connection_keep_alive()) { - (Shutdown::Later(timer, deadline), KeepAlive::Until(t)) => { - if *deadline != t { - *deadline = t; - if let Some(dur) = deadline.checked_duration_since(Instant::now()) { - timer.reset(dur) - } - } - } - (_, KeepAlive::Until(t)) => { - if let Some(dur) = t.checked_duration_since(Instant::now()) { - self.shutdown = Shutdown::Later(Delay::new(dur), t) - } - } - (_, KeepAlive::No) => self.shutdown = Shutdown::Asap, - (_, KeepAlive::Yes) => self.shutdown = Shutdown::None, - }; - - // Check if the connection (and handler) should be shut down. - // As long as we're still negotiating substreams, shutdown is always postponed. - if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { - match self.shutdown { - Shutdown::None => {} - Shutdown::Asap => return Poll::Ready(Err(Error::KeepAliveTimeout)), - Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { - Poll::Ready(_) => return Poll::Ready(Err(Error::KeepAliveTimeout)), - Poll::Pending => {} - }, - } - } - - Poll::Pending - } -} - -/// Event produced by a [`HandlerWrapper`]. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum Event { - /// Require a new outbound substream to be opened with the remote. - OutboundSubstreamRequest(TOutboundOpenInfo), - - /// Other event. - Custom(TCustom), -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::handler::PendingConnectionHandler; - use quickcheck::*; - use std::sync::Arc; - - #[test] - fn max_negotiating_inbound_streams() { - fn prop(max_negotiating_inbound_streams: u8) { - let max_negotiating_inbound_streams: usize = max_negotiating_inbound_streams.into(); - let mut wrapper = HandlerWrapper::new( - PeerId::random(), - ConnectedPoint::Listener { - local_addr: Multiaddr::empty(), - send_back_addr: Multiaddr::empty(), - }, - PendingConnectionHandler::new("test".to_string()), - None, - max_negotiating_inbound_streams, - ); - let alive_substreams_counter = Arc::new(()); - - for _ in 0..max_negotiating_inbound_streams { - let substream = - SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); - wrapper.inject_substream(substream, SubstreamEndpoint::Listener); - } - - assert_eq!( - Arc::strong_count(&alive_substreams_counter), - max_negotiating_inbound_streams + 1, - "Expect none of the substreams up to the limit to be dropped." - ); - - let substream = SubstreamBox::new(PendingSubstream(alive_substreams_counter.clone())); - wrapper.inject_substream(substream, SubstreamEndpoint::Listener); - - assert_eq!( - Arc::strong_count(&alive_substreams_counter), - max_negotiating_inbound_streams + 1, - "Expect substream exceeding the limit to be dropped." - ); - } - - QuickCheck::new().quickcheck(prop as fn(_)); - } - - struct PendingSubstream(Arc<()>); - - impl AsyncRead for PendingSubstream { - fn poll_read( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - _buf: &mut [u8], - ) -> Poll> { - Poll::Pending - } - } - - impl AsyncWrite for PendingSubstream { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - _buf: &[u8], - ) -> Poll> { - Poll::Pending - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Pending - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Pending - } - } -} diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 62e931e9510..c43d5efb61e 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -19,6 +19,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::connection::Connection; use crate::{ behaviour::{THandlerInEvent, THandlerOutEvent}, connection::{ @@ -89,7 +90,7 @@ where /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, /// The executor to use for running the background tasks. If `None`, @@ -746,11 +747,9 @@ where }, ); - let connection = super::Connection::new( - obtained_peer_id, - endpoint, + let connection = Connection::new( muxer, - handler, + handler.into_handler(&obtained_peer_id, &endpoint), self.substream_upgrade_protocol_override, self.max_negotiating_inbound_streams, ); @@ -1165,7 +1164,7 @@ pub struct PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams]. + /// See [`Connection::max_negotiating_inbound_streams`]. max_negotiating_inbound_streams: usize, } @@ -1240,7 +1239,7 @@ impl PoolConfig { /// The maximum number of inbound streams concurrently negotiating on a connection. /// - /// See [`super::handler_wrapper::HandlerWrapper::max_negotiating_inbound_streams`]. + /// See [`Connection::max_negotiating_inbound_streams`]. pub fn with_max_negotiating_inbound_streams(mut self, v: usize) -> Self { self.max_negotiating_inbound_streams = v; self