Skip to content

Commit

Permalink
core/muxing: Generalise StreamMuxer::poll_address_change to poll (#…
Browse files Browse the repository at this point in the history
…2797)

This is to allow general-purpose background work to be performed
by implementations.
  • Loading branch information
thomaseizinger authored Aug 16, 2022
1 parent 06aaea6 commit cef5056
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 87 deletions.
6 changes: 4 additions & 2 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# 0.35.0 [unreleased]

- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].
- Change `StreamMuxer` interface to be entirely poll-based. All functions on `StreamMuxer` now
require a `Context` and return `Poll`. This gives callers fine-grained control over what they
would like to make progress on. See [PR 2724] and [PR 2797].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765
[PR 2797]: https://github.com/libp2p/rust-libp2p/pull/2797

# 0.34.0

Expand Down
21 changes: 10 additions & 11 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::StreamMuxerEvent;
use crate::{
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Expand Down Expand Up @@ -236,22 +237,20 @@ where
}
}

fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
EitherOutputProj::First(inner) => inner.poll(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll(cx).map_err(EitherError::B),
}
}
}
Expand Down
45 changes: 27 additions & 18 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub trait StreamMuxer {
type Error: std::error::Error;

/// Poll for new inbound substreams.
///
/// This function should be called whenever callers are ready to accept more inbound streams. In
/// other words, callers may exercise back-pressure on incoming streams by not calling this
/// function if a certain limit is hit.
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -86,25 +90,33 @@ pub trait StreamMuxer {
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for an address change of the underlying connection.
/// Poll to close this [`StreamMuxer`].
///
/// Not all implementations may support this feature.
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All
/// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns,
/// or polls must generate an error or be ignored.
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely
/// dropped.
///
/// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

/// Poll to allow the underlying connection to make progress.
///
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
/// unconditionally. Because it will be called regardless, this function can be used by
/// implementations to return events about the underlying connection that the caller MUST deal
/// with.
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>>;
}

/// An event produced by a [`StreamMuxer`].
pub enum StreamMuxerEvent {
/// The address of the remote has changed.
AddressChange(Multiaddr),
}

/// Extension trait for [`StreamMuxer`].
Expand All @@ -131,15 +143,12 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_address_change_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
/// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_address_change(cx)
Pin::new(self).poll(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
Expand Down
38 changes: 17 additions & 21 deletions core/src/muxing/boxed.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::StreamMuxer;
use crate::muxing::{StreamMuxer, StreamMuxerEvent};
use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
Expand Down Expand Up @@ -38,11 +37,6 @@ where
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -65,14 +59,16 @@ where
.map_err(into_io_error)
}

fn poll_address_change(
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project()
.inner
.poll_address_change(cx)
.map_err(into_io_error)
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().inner.poll(cx).map_err(into_io_error)
}
}

Expand Down Expand Up @@ -109,11 +105,6 @@ impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -128,11 +119,16 @@ impl StreamMuxer for StreamMuxerBox {
self.project().poll_outbound(cx)
}

fn poll_address_change(
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project().poll_address_change(cx)
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().poll(cx)
}
}

Expand Down
16 changes: 8 additions & 8 deletions core/src/muxing/singleton.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{connection::Endpoint, muxing::StreamMuxer};
use crate::connection::Endpoint;
use crate::muxing::{StreamMuxer, StreamMuxerEvent};

use futures::prelude::*;
use multiaddr::Multiaddr;
use std::cell::Cell;
use std::pin::Pin;
use std::{io, task::Context, task::Poll};
Expand Down Expand Up @@ -88,14 +88,14 @@ where
}
}

fn poll_address_change(
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
10 changes: 4 additions & 6 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ pub use config::{MaxBufferBehaviour, MplexConfig};
use bytes::Bytes;
use codec::LocalStreamId;
use futures::{future, prelude::*, ready};
use libp2p_core::{
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
Multiaddr, StreamMuxer,
};
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use parking_lot::Mutex;
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};

Expand Down Expand Up @@ -105,10 +103,10 @@ where
.map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
}

fn poll_address_change(
fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

Expand Down
7 changes: 3 additions & 4 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ use futures::{
prelude::*,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::StreamMuxer;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::Multiaddr;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand Down Expand Up @@ -124,10 +123,10 @@ where
.map_err(YamuxError)
}

fn poll_address_change(
fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

Expand Down
43 changes: 26 additions & 17 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::IntoConnectionHandler;
use handler_wrapper::HandlerWrapper;
use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt};
use libp2p_core::upgrade;
use libp2p_core::PeerId;
use std::collections::VecDeque;
Expand Down Expand Up @@ -153,27 +153,36 @@ where
}
}

if !self.open_info.is_empty() {
if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
match self.muxing.poll_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
}

if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
if !self.open_info.is_empty() {
match self.muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
}
}
}

if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
match self.muxing.poll_inbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
}
}

return Poll::Pending; // Nothing can make progress, return `Pending`.
Expand Down

0 comments on commit cef5056

Please sign in to comment.