From cef505685c418a1d8a6a11af91f2eb2414700211 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 16 Aug 2022 04:50:17 +0200 Subject: [PATCH] core/muxing: Generalise `StreamMuxer::poll_address_change` to `poll` (#2797) This is to allow general-purpose background work to be performed by implementations. --- core/CHANGELOG.md | 6 +++-- core/src/either.rs | 21 ++++++++--------- core/src/muxing.rs | 45 +++++++++++++++++++++--------------- core/src/muxing/boxed.rs | 38 ++++++++++++++---------------- core/src/muxing/singleton.rs | 16 ++++++------- muxers/mplex/src/lib.rs | 10 ++++---- muxers/yamux/src/lib.rs | 7 +++--- swarm/src/connection.rs | 43 ++++++++++++++++++++-------------- 8 files changed, 99 insertions(+), 87 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index dc4fd0829c0..c377d042106 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,16 +1,18 @@ # 0.35.0 [unreleased] -- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` - and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. - Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776]. - Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775]. - Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765]. +- Change `StreamMuxer` interface to be entirely poll-based. All functions on `StreamMuxer` now + require a `Context` and return `Poll`. This gives callers fine-grained control over what they + would like to make progress on. See [PR 2724] and [PR 2797]. [PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 [PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 [PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775 [PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776 [PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765 +[PR 2797]: https://github.com/libp2p/rust-libp2p/pull/2797 # 0.34.0 diff --git a/core/src/either.rs b/core/src/either.rs index 42984519488..a34552bf28f 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -18,6 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::muxing::StreamMuxerEvent; use crate::{ muxing::StreamMuxer, transport::{ListenerId, Transport, TransportError, TransportEvent}, @@ -236,22 +237,20 @@ where } } - fn poll_address_change( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project() { - EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), - EitherOutputProj::Second(inner) => { - inner.poll_address_change(cx).map_err(EitherError::B) - } + EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), } } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { match self.project() { - EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A), - EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), + EitherOutputProj::First(inner) => inner.poll(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => inner.poll(cx).map_err(EitherError::B), } } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 2d1e1068044..9763436e94a 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -75,6 +75,10 @@ pub trait StreamMuxer { type Error: std::error::Error; /// Poll for new inbound substreams. + /// + /// This function should be called whenever callers are ready to accept more inbound streams. In + /// other words, callers may exercise back-pressure on incoming streams by not calling this + /// function if a certain limit is hit. fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -86,25 +90,33 @@ pub trait StreamMuxer { cx: &mut Context<'_>, ) -> Poll>; - /// Poll for an address change of the underlying connection. + /// Poll to close this [`StreamMuxer`]. /// - /// Not all implementations may support this feature. - fn poll_address_change( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - - /// Closes this `StreamMuxer`. - /// - /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All - /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns, - /// or polls must generate an error or be ignored. + /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely + /// dropped. /// /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Poll to allow the underlying connection to make progress. + /// + /// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called + /// unconditionally. Because it will be called regardless, this function can be used by + /// implementations to return events about the underlying connection that the caller MUST deal + /// with. + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; +} + +/// An event produced by a [`StreamMuxer`]. +pub enum StreamMuxerEvent { + /// The address of the remote has changed. + AddressChange(Multiaddr), } /// Extension trait for [`StreamMuxer`]. @@ -131,15 +143,12 @@ pub trait StreamMuxerExt: StreamMuxer + Sized { Pin::new(self).poll_outbound(cx) } - /// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`. - fn poll_address_change_unpin( - &mut self, - cx: &mut Context<'_>, - ) -> Poll> + /// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll> where Self: Unpin, { - Pin::new(self).poll_address_change(cx) + Pin::new(self).poll(cx) } /// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`. diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 0f5b6e5822e..99f7a87c6a5 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,6 +1,5 @@ -use crate::StreamMuxer; +use crate::muxing::{StreamMuxer, StreamMuxerEvent}; use futures::{AsyncRead, AsyncWrite}; -use multiaddr::Multiaddr; use pin_project::pin_project; use std::error::Error; use std::fmt; @@ -38,11 +37,6 @@ where type Substream = SubstreamBox; type Error = io::Error; - #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_close(cx).map_err(into_io_error) - } - fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -65,14 +59,16 @@ where .map_err(into_io_error) } - fn poll_address_change( + #[inline] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx).map_err(into_io_error) + } + + fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { - self.project() - .inner - .poll_address_change(cx) - .map_err(into_io_error) + ) -> Poll> { + self.project().inner.poll(cx).map_err(into_io_error) } } @@ -109,11 +105,6 @@ impl StreamMuxer for StreamMuxerBox { type Substream = SubstreamBox; type Error = io::Error; - #[inline] - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().poll_close(cx) - } - fn poll_inbound( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -128,11 +119,16 @@ impl StreamMuxer for StreamMuxerBox { self.project().poll_outbound(cx) } - fn poll_address_change( + #[inline] + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().poll_close(cx) + } + + fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { - self.project().poll_address_change(cx) + ) -> Poll> { + self.project().poll(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 193cfb6303f..3ba2c1cb366 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,10 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{connection::Endpoint, muxing::StreamMuxer}; +use crate::connection::Endpoint; +use crate::muxing::{StreamMuxer, StreamMuxerEvent}; use futures::prelude::*; -use multiaddr::Multiaddr; use std::cell::Cell; use std::pin::Pin; use std::{io, task::Context, task::Poll}; @@ -88,14 +88,14 @@ where } } - fn poll_address_change( + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 14f9cda65d9..501c4dd6735 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -27,10 +27,8 @@ pub use config::{MaxBufferBehaviour, MplexConfig}; use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; -use libp2p_core::{ - upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, - Multiaddr, StreamMuxer, -}; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -105,10 +103,10 @@ where .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn poll_address_change( + fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 07327e203b3..5b109f2b3b0 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -26,9 +26,8 @@ use futures::{ prelude::*, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::StreamMuxer; +use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; -use libp2p_core::Multiaddr; use std::{ fmt, io, iter, mem, pin::Pin, @@ -124,10 +123,10 @@ where .map_err(YamuxError) } - fn poll_address_change( + fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index f92186618ae..24e54aba525 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -35,7 +35,7 @@ use crate::IntoConnectionHandler; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt}; use libp2p_core::upgrade; use libp2p_core::PeerId; use std::collections::VecDeque; @@ -153,27 +153,36 @@ where } } - if !self.open_info.is_empty() { - if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? { - let user_data = self - .open_info - .pop_front() - .expect("`open_info` is not empty"); - let endpoint = SubstreamEndpoint::Dialer(user_data); - self.handler.inject_substream(substream, endpoint); - continue; // Go back to the top, handler can potentially make progress again. + match self.muxing.poll_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); } } - if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? { - self.handler - .inject_substream(substream, SubstreamEndpoint::Listener); - continue; // Go back to the top, handler can potentially make progress again. + if !self.open_info.is_empty() { + match self.muxing.poll_outbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + let user_data = self + .open_info + .pop_front() + .expect("`open_info` is not empty"); + let endpoint = SubstreamEndpoint::Dialer(user_data); + self.handler.inject_substream(substream, endpoint); + continue; // Go back to the top, handler can potentially make progress again. + } + } } - if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); + match self.muxing.poll_inbound_unpin(cx)? { + Poll::Pending => {} + Poll::Ready(substream) => { + self.handler + .inject_substream(substream, SubstreamEndpoint::Listener); + continue; // Go back to the top, handler can potentially make progress again. + } } return Poll::Pending; // Nothing can make progress, return `Pending`.