Skip to content

Commit

Permalink
Re-design the StreamMuxer trait
Browse files Browse the repository at this point in the history
The current `StreamMuxer` API is based around a poll-based approach
where each substream needs to be passed by value to read/write/close
it.

The muxer itself however is not available in the end-user abstractions
like `ConnectionHandler`. To circumvent this, the `StreamMuxerBox`
type exists which allows a `StreamMuxer` to be cloned. Together with
`SubstreamRef`, this allows each substream to own a reference to the
muxer it was created with and pass itself to the muxer within
implementations of `AsyncRead` and `AsyncWrite`. These implementations
are convenient as they allow an end-user to simply read and write to
the substream without holding a reference to anything else.

We can achieve the same goal by changing the `StreamMuxer` abstraction
to always return a `AsyncReadWriteBox`. Depending on the implementation
of the muxer, this may require more code within its implementation of the
`StreamMuxer` trait.

yamux already provides a `Stream` type that implements `AsyncRead`
and `AsyncWrite` and we can thus directly construct a `AsyncReadWriteBox`.

For mplex, we wrap the actual muxer in an `Arc` and store it within the
`Substream` struct. This allowsus to call the various `poll_` functions
from within the `AsyncRead` and `AsyncWrite` implementations.
  • Loading branch information
thomaseizinger committed May 20, 2022
1 parent 9c90331 commit 2b0f2de
Show file tree
Hide file tree
Showing 20 changed files with 463 additions and 1,340 deletions.
2 changes: 2 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# 0.33.0 [unreleased]

- Have methods on `Transport` take `&mut self` instead of `self`. See [PR 2529].
- Re-design `StreamMuxer` trait to take `&mut self` and remove all associated types. See [PR 2648].

[PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529
[PR 2648]: https://github.com/libp2p/rust-libp2p/pull/2648

# 0.32.1

Expand Down
150 changes: 8 additions & 142 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::OutboundSubstreamId;
use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
Expand Down Expand Up @@ -201,163 +202,28 @@ where
A: StreamMuxer,
B: StreamMuxer,
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;

fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<StreamMuxerEvent>> {
match self {
EitherOutput::First(inner) => inner.poll_event(cx).map(|result| {
result.map(|event| match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream))
}
})
}),
EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| {
result.map(|event| match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream))
}
})
}),
}
}

fn open_outbound(&self) -> Self::OutboundSubstream {
match self {
EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()),
EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()),
}
}

fn poll_outbound(
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<io::Result<Self::Substream>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First)),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second)),
_ => panic!("Wrong API usage"),
}
}

fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
match self {
EitherOutput::First(inner) => match substream {
EitherOutbound::A(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutbound::B(substream) => inner.destroy_outbound(substream),
_ => panic!("Wrong API usage"),
},
}
}

fn read_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(cx, sub, buf)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(cx, sub, buf)
}
_ => panic!("Wrong API usage"),
}
}

fn write_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(cx, sub, buf)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(cx, sub, buf)
}
_ => panic!("Wrong API usage"),
}
}

fn flush_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<io::Result<()>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(cx, sub)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(cx, sub)
}
_ => panic!("Wrong API usage"),
EitherOutput::First(inner) => inner.poll(cx),
EitherOutput::Second(inner) => inner.poll(cx),
}
}

fn shutdown_substream(
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<io::Result<()>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(cx, sub)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(cx, sub)
}
_ => panic!("Wrong API usage"),
}
}

fn destroy_substream(&self, substream: Self::Substream) {
fn open_outbound(&mut self) -> OutboundSubstreamId {
match self {
EitherOutput::First(inner) => match substream {
EitherOutput::First(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::Second(inner) => match substream {
EitherOutput::Second(substream) => inner.destroy_substream(substream),
_ => panic!("Wrong API usage"),
},
EitherOutput::First(inner) => inner.open_outbound(),
EitherOutput::Second(inner) => inner.open_outbound(),
}
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx),
EitherOutput::Second(inner) => inner.poll_close(cx),
}
}
}

#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
A(A::OutboundSubstream),
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
Expand Down
Loading

0 comments on commit 2b0f2de

Please sign in to comment.