Skip to content

Commit

Permalink
[mplex] Refactoring with Patches (#1769)
Browse files Browse the repository at this point in the history
* Refactor Mplex.

Thereby addressing the following issues:

  * Send a `Reset` frame when open substreams get dropped (313).
  * Avoid stalls caused by a read operation on one substream
    reading (and buffering) frames for another substream without
    notifying the corresponding task. I.e. the tracked read-interest
    must be scoped to a substream.
  * Remove dropped substreams from the tracked set of open
    substreams, to avoid artificially running into substream
    limits.

* Update CHANGELOG.

* Refine behaviour of dropping substreams.

By taking the substream state into account. The refined
behaviour is modeled after the behaviour of Yamux.

* Tweak docs and recv buffer retention.

* Further small tweaks.

 * Make the pending frames a FIFO queue.
 * Take more care to avoid keeping read-wakers around
   and to notify them when streams close.

* Prefer wake over unregister.

It is probably safer to always wake pending wakers.

* Update muxers/mplex/src/codec.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Update muxers/mplex/src/io.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Some review feedback and cosmetics.

* Update muxers/mplex/src/io.rs

Co-authored-by: Max Inden <mail@max-inden.de>

* Revise read control flow for clarity.

While seemingly duplicating some control flow between
`poll_next_strean` and `poll_read_stream`, the individual
control flow of each read operation is easier to follow.

* CI

* Rename Status::Ok to Status::Open.

* Rename pending_flush to pending_flush_open.

* Finishing touches.

* Tweak changelog.

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
romanb and mxinden authored Sep 28, 2020
1 parent 9365be7 commit 0b18b86
Show file tree
Hide file tree
Showing 7 changed files with 1,099 additions and 607 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ libp2p-floodsub = { version = "0.22.0", path = "protocols/floodsub", optional =
libp2p-gossipsub = { version = "0.22.1", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.22.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.23.1", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.22.1", path = "muxers/mplex", optional = true }
libp2p-mplex = { version = "0.23.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.24.1", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
Expand Down
18 changes: 17 additions & 1 deletion muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,20 @@
# 0.22.1 [unreleased]
# 0.23.0 [unreleased]

- Address a potential stall when reading from substreams.

- Send a `Reset` or `Close` to the remote when a substream is dropped,
as appropriate for the current state of the substream,
removing that substream from the tracked open substreams,
to avoid artificially running into substream limits.

- Change the semantics of the `max_substreams` configuration. Now,
outbound substream attempts beyond the configured limit are delayed,
with a task wakeup once an existing substream closes, i.e. the limit
results in back-pressure for new outbound substreams. New inbound
substreams beyond the limit are immediately answered with a `Reset`.
If too many (by some internal threshold) pending frames accumulate,
e.g. as a result of an aggressive number of inbound substreams being
opened beyond the configured limit, the connection is closed ("DoS protection").

- Update dependencies.

Expand Down
2 changes: 1 addition & 1 deletion muxers/mplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-mplex"
edition = "2018"
description = "Mplex multiplexing protocol for libp2p"
version = "0.22.1"
version = "0.23.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
174 changes: 113 additions & 61 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use libp2p_core::Endpoint;
use futures_codec::{Decoder, Encoder};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use unsigned_varint::{codec, encode};

Expand All @@ -30,52 +30,104 @@ use unsigned_varint::{codec, encode};
// send a 4 TB-long packet full of zeroes that we kill our process with an OOM error.
pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024;

#[derive(Debug, Clone)]
pub enum Elem {
Open { substream_id: u32 },
Data { substream_id: u32, endpoint: Endpoint, data: Bytes },
Close { substream_id: u32, endpoint: Endpoint },
Reset { substream_id: u32, endpoint: Endpoint },
/// A unique identifier used by the local node for a substream.
///
/// `LocalStreamId`s are sent with frames to the remote, where
/// they are received as `RemoteStreamId`s.
///
/// > **Note**: Streams are identified by a number and a role encoded as a flag
/// > on each frame that is either odd (for receivers) or even (for initiators).
/// > `Open` frames do not have a flag, but are sent unidirectionally. As a
/// > consequence, we need to remember if a stream was initiated by us or remotely
/// > and we store the information from our point of view as a `LocalStreamId`,
/// > i.e. receiving an `Open` frame results in a local ID with role `Endpoint::Listener`,
/// > whilst sending an `Open` frame results in a local ID with role `Endpoint::Dialer`.
/// > Receiving a frame with a flag identifying the remote as a "receiver" means that
/// > we initiated the stream, so the local ID has the role `Endpoint::Dialer`.
/// > Conversely, when receiving a frame with a flag identifying the remote as a "sender",
/// > the corresponding local ID has the role `Endpoint::Listener`.
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct LocalStreamId {
num: u32,
role: Endpoint,
}

impl Elem {
/// Returns the ID of the substream of the message.
pub fn substream_id(&self) -> u32 {
match *self {
Elem::Open { substream_id } => substream_id,
Elem::Data { substream_id, .. } => substream_id,
Elem::Close { substream_id, .. } => substream_id,
Elem::Reset { substream_id, .. } => substream_id,
impl fmt::Display for LocalStreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.role {
Endpoint::Dialer => write!(f, "({}/initiator)", self.num),
Endpoint::Listener => write!(f, "({}/receiver)", self.num),
}
}
}

pub fn endpoint(&self) -> Option<Endpoint> {
match *self {
Elem::Open { .. } => None,
Elem::Data { endpoint, .. } => Some(endpoint),
Elem::Close { endpoint, .. } => Some(endpoint),
Elem::Reset { endpoint, .. } => Some(endpoint)
/// A unique identifier used by the remote node for a substream.
///
/// `RemoteStreamId`s are received with frames from the remote
/// and mapped by the receiver to `LocalStreamId`s via
/// [`RemoteStreamId::into_local()`].
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct RemoteStreamId {
num: u32,
role: Endpoint,
}

impl LocalStreamId {
pub fn dialer(num: u32) -> Self {
Self { num, role: Endpoint::Dialer }
}

pub fn next(self) -> Self {
Self {
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
.. self
}
}
}

impl RemoteStreamId {
fn dialer(num: u32) -> Self {
Self { num, role: Endpoint::Dialer }
}

fn listener(num: u32) -> Self {
Self { num, role: Endpoint::Listener }
}

/// Returns true if this message is `Close` or `Reset`.
#[inline]
pub fn is_close_or_reset_msg(&self) -> bool {
match self {
Elem::Close { .. } | Elem::Reset { .. } => true,
_ => false,
/// Converts this `RemoteStreamId` into the corresponding `LocalStreamId`
/// that identifies the same substream.
pub fn into_local(self) -> LocalStreamId {
LocalStreamId {
num: self.num,
role: !self.role,
}
}
}

/// An Mplex protocol frame.
#[derive(Debug, Clone)]
pub enum Frame<T> {
Open { stream_id: T },
Data { stream_id: T, data: Bytes },
Close { stream_id: T },
Reset { stream_id: T },
}

/// Returns true if this message is `Open`.
#[inline]
pub fn is_open_msg(&self) -> bool {
if let Elem::Open { .. } = self {
true
} else {
false
impl Frame<RemoteStreamId> {
fn remote_id(&self) -> RemoteStreamId {
match *self {
Frame::Open { stream_id } => stream_id,
Frame::Data { stream_id, .. } => stream_id,
Frame::Close { stream_id, .. } => stream_id,
Frame::Reset { stream_id, .. } => stream_id,
}
}

/// Gets the `LocalStreamId` corresponding to the `RemoteStreamId`
/// received with this frame.
pub fn local_id(&self) -> LocalStreamId {
self.remote_id().into_local()
}
}

pub struct Codec {
Expand All @@ -101,7 +153,7 @@ impl Codec {
}

impl Decoder for Codec {
type Item = Elem;
type Item = Frame<RemoteStreamId>;
type Error = IoError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -143,15 +195,15 @@ impl Decoder for Codec {
}

let buf = src.split_to(len);
let substream_id = (header >> 3) as u32;
let num = (header >> 3) as u32;
let out = match header & 7 {
0 => Elem::Open { substream_id },
1 => Elem::Data { substream_id, endpoint: Endpoint::Listener, data: buf.freeze() },
2 => Elem::Data { substream_id, endpoint: Endpoint::Dialer, data: buf.freeze() },
3 => Elem::Close { substream_id, endpoint: Endpoint::Listener },
4 => Elem::Close { substream_id, endpoint: Endpoint::Dialer },
5 => Elem::Reset { substream_id, endpoint: Endpoint::Listener },
6 => Elem::Reset { substream_id, endpoint: Endpoint::Dialer },
0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) },
1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() },
2 => Frame::Data { stream_id: RemoteStreamId::dialer(num), data: buf.freeze() },
3 => Frame::Close { stream_id: RemoteStreamId::listener(num) },
4 => Frame::Close { stream_id: RemoteStreamId::dialer(num) },
5 => Frame::Reset { stream_id: RemoteStreamId::listener(num) },
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
Expand All @@ -171,31 +223,31 @@ impl Decoder for Codec {
}

impl Encoder for Codec {
type Item = Elem;
type Item = Frame<LocalStreamId>;
type Error = IoError;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Elem::Open { substream_id } => {
(u64::from(substream_id) << 3, Bytes::new())
Frame::Open { stream_id } => {
(u64::from(stream_id.num) << 3, Bytes::new())
},
Elem::Data { substream_id, endpoint: Endpoint::Listener, data } => {
(u64::from(substream_id) << 3 | 1, data)
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Listener }, data } => {
(u64::from(num) << 3 | 1, data)
},
Elem::Data { substream_id, endpoint: Endpoint::Dialer, data } => {
(u64::from(substream_id) << 3 | 2, data)
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Dialer }, data } => {
(u64::from(num) << 3 | 2, data)
},
Elem::Close { substream_id, endpoint: Endpoint::Listener } => {
(u64::from(substream_id) << 3 | 3, Bytes::new())
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 3, Bytes::new())
},
Elem::Close { substream_id, endpoint: Endpoint::Dialer } => {
(u64::from(substream_id) << 3 | 4, Bytes::new())
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 4, Bytes::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Listener } => {
(u64::from(substream_id) << 3 | 5, Bytes::new())
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 5, Bytes::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Dialer } => {
(u64::from(substream_id) << 3 | 6, Bytes::new())
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 6, Bytes::new())
},
};

Expand Down Expand Up @@ -225,17 +277,17 @@ mod tests {
#[test]
fn encode_large_messages_fails() {
let mut enc = Codec::new();
let endpoint = Endpoint::Dialer;
let role = Endpoint::Dialer;
let data = Bytes::from(&[123u8; MAX_FRAME_SIZE + 1][..]);
let bad_msg = Elem::Data{ substream_id: 123, endpoint, data };
let bad_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
let mut out = BytesMut::new();
match enc.encode(bad_msg, &mut out) {
Err(e) => assert_eq!(e.to_string(), "data size exceed maximum"),
_ => panic!("Can't send a message bigger than MAX_FRAME_SIZE")
}

let data = Bytes::from(&[123u8; MAX_FRAME_SIZE][..]);
let ok_msg = Elem::Data{ substream_id: 123, endpoint, data };
let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
assert!(enc.encode(ok_msg, &mut out).is_ok());
}
}
106 changes: 106 additions & 0 deletions muxers/mplex/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::codec::MAX_FRAME_SIZE;
use std::cmp;

/// Configuration for the multiplexer.
#[derive(Debug, Clone)]
pub struct MplexConfig {
/// Maximum number of simultaneously-open substreams.
pub(crate) max_substreams: usize,
/// Maximum number of frames in the internal buffer.
pub(crate) max_buffer_len: usize,
/// Behaviour when the buffer size limit is reached.
pub(crate) max_buffer_behaviour: MaxBufferBehaviour,
/// When sending data, split it into frames whose maximum size is this value
/// (max 1MByte, as per the Mplex spec).
pub(crate) split_send_size: usize,
}

impl MplexConfig {
/// Builds the default configuration.
pub fn new() -> MplexConfig {
Default::default()
}

/// Sets the maximum number of simultaneously open substreams.
///
/// When the limit is reached, opening of outbound substreams
/// is delayed until another substream closes, whereas new
/// inbound substreams are immediately answered with a `Reset`.
/// If the number of inbound substreams that need to be reset
/// accumulates too quickly (judged by internal bounds), the
/// connection is closed, the connection is closed with an error
/// due to the misbehaved remote.
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
}

/// Sets the maximum number of frames buffered that have
/// not yet been consumed.
///
/// A limit is necessary in order to avoid DoS attacks.
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
}

/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
}

/// Sets the frame size used when sending data. Capped at 1Mbyte as per the
/// Mplex spec.
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
let size = cmp::min(size, MAX_FRAME_SIZE);
self.split_send_size = size;
self
}
}

/// Behaviour when the maximum length of the buffer is reached.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
/// Produce an error on all the substreams.
CloseAll,
/// No new message will be read from the underlying connection if the buffer is full.
///
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
/// before processing the messages received on another substream.
Block,
}

impl Default for MplexConfig {
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 4096,
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
split_send_size: 1024,
}
}
}

Loading

0 comments on commit 0b18b86

Please sign in to comment.