Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mplex] Refactoring with Patches #1769

Merged
merged 17 commits into from
Sep 28, 2020
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ libp2p-floodsub = { version = "0.22.0", path = "protocols/floodsub", optional =
libp2p-gossipsub = { version = "0.22.1", path = "./protocols/gossipsub", optional = true }
libp2p-identify = { version = "0.22.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.23.1", path = "protocols/kad", optional = true }
libp2p-mplex = { version = "0.22.1", path = "muxers/mplex", optional = true }
libp2p-mplex = { version = "0.23.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.24.1", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.22.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.23.0", path = "protocols/plaintext", optional = true }
Expand Down
17 changes: 16 additions & 1 deletion muxers/mplex/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
# 0.22.1 [unreleased]
# 0.23.0 [unreleased]

- Address a potential stall when reading from substreams.

- Send a `Reset` to the remote when a substream is dropped
and remove that substream from the tracked open substreams,
to avoid artificially running into substream limits.

- Change the semantics of the `max_substreams` configuration. Now,
outbound substream attempts beyond the configured limit are delayed,
with a task wakeup once an existing substream closes, i.e. the limit
results in back-pressure for new outbound substreams. New inbound
substreams beyond the limit are immediately answered with a `Reset`.
If too many (by some internal threshold) pending frames accumulate,
e.g. as a result of an aggressive number of inbound substreams being
opened beyond the configured limit, the connection is closed ("DoS protection").

- Update dependencies.

Expand Down
2 changes: 1 addition & 1 deletion muxers/mplex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-mplex"
edition = "2018"
description = "Mplex multiplexing protocol for libp2p"
version = "0.22.1"
version = "0.23.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
179 changes: 124 additions & 55 deletions muxers/mplex/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use libp2p_core::Endpoint;
use futures_codec::{Decoder, Encoder};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::{fmt, mem};
use bytes::{BufMut, Bytes, BytesMut};
use unsigned_varint::{codec, encode};

Expand All @@ -30,47 +30,116 @@ use unsigned_varint::{codec, encode};
// send a 4 TB-long packet full of zeroes that we kill our process with an OOM error.
pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024;

#[derive(Debug, Clone)]
pub enum Elem {
Open { substream_id: u32 },
Data { substream_id: u32, endpoint: Endpoint, data: Bytes },
Close { substream_id: u32, endpoint: Endpoint },
Reset { substream_id: u32, endpoint: Endpoint },
/// A unique identifier used by the local node for a substream.
///
/// `LocalStreamId`s are sent with frames to the remote, where
/// they are received as `RemoteStreamId`s.
///
/// > **Note**: Streams are identified by a number and a role encoded as a flag
/// > on each frame that is either odd (for receivers) or even (for initiators).
/// > `Open` frames do not have a flag, but are sent unidirectionally. As a
/// > consequence, we need to remember if a stream was initiated by us or remotely
/// > and we store the information from our point of view as a `LocalStreamId`,
/// > i.e. receiving an `Open` frame results in a local ID with role `Endpoint::Listener`,
/// > whilst sending an `Open` frame results in a local ID with role `Endpoint::Dialer`.
/// > Receiving a frame with a flag identifying the remote as a "receiver" means that
/// > we initiated the stream, so the local ID has the role `Endpoint::Dialer`.
/// > Conversely, when receiving a frame with a flag identifying the remote as a "sender",
/// > the corresponding local ID has the role `Endpoint::Listener`.
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct LocalStreamId {
num: u32,
role: Endpoint,
}

impl Elem {
/// Returns the ID of the substream of the message.
pub fn substream_id(&self) -> u32 {
match *self {
Elem::Open { substream_id } => substream_id,
Elem::Data { substream_id, .. } => substream_id,
Elem::Close { substream_id, .. } => substream_id,
Elem::Reset { substream_id, .. } => substream_id,
impl fmt::Display for LocalStreamId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.role {
Endpoint::Dialer => write!(f, "({}/initiator)", self.num),
Endpoint::Listener => write!(f, "({}/receiver)", self.num),
}
}
}

/// A unique identifier used by the remote node for a substream.
///
/// `RemoteStreamId`s are received with frames from the remote
/// and mapped by the receiver to `LocalStreamId`s via
/// [`RemoteStreamId::into_local()`].
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub struct RemoteStreamId {
num: u32,
role: Endpoint,
}

impl LocalStreamId {
pub fn dialer(num: u32) -> Self {
Self { num, role: Endpoint::Dialer }
}

pub fn next(self) -> Self {
Self {
num: self.num.checked_add(1).expect("Mplex substream ID overflowed"),
.. self
}
}
}

impl RemoteStreamId {
fn dialer(num: u32) -> Self {
Self { num, role: Endpoint::Dialer }
}

fn listener(num: u32) -> Self {
Self { num, role: Endpoint::Listener }
}

/// Converts this `RemoteStreamId` into the corresponding `LocalStreamId`
/// that identifies the same substream.
pub fn into_local(self) -> LocalStreamId {
LocalStreamId {
num: self.num,
role: !self.role,
}
}
}

/// An Mplex protocol frame.
#[derive(Debug, Clone)]
pub enum Frame<T> {
Open { stream_id: T },
Data { stream_id: T, data: Bytes },
Close { stream_id: T },
Reset { stream_id: T },
}

pub fn endpoint(&self) -> Option<Endpoint> {
impl Frame<RemoteStreamId> {
fn remote_id(&self) -> RemoteStreamId {
match *self {
Elem::Open { .. } => None,
Elem::Data { endpoint, .. } => Some(endpoint),
Elem::Close { endpoint, .. } => Some(endpoint),
Elem::Reset { endpoint, .. } => Some(endpoint)
Frame::Open { stream_id } => stream_id,
Frame::Data { stream_id, .. } => stream_id,
Frame::Close { stream_id, .. } => stream_id,
Frame::Reset { stream_id, .. } => stream_id,
}
}

/// Returns true if this message is `Close` or `Reset`.
#[inline]
pub fn is_close_or_reset_msg(&self) -> bool {
/// Gets the `LocalStreamId` corresponding to the `RemoteStreamId`
/// received with this frame.
pub fn local_id(&self) -> LocalStreamId {
self.remote_id().into_local()
}

/// Returns true if this is a `Data` frame.
pub fn is_data(&self) -> bool {
match self {
Elem::Close { .. } | Elem::Reset { .. } => true,
Frame::Data { .. } => true,
_ => false,
}
}

/// Returns true if this message is `Open`.
#[inline]
pub fn is_open_msg(&self) -> bool {
if let Elem::Open { .. } = self {
/// Returns true if this is an `Open` frame.
pub fn is_open(&self) -> bool {
if let Frame::Open { .. } = self {
true
} else {
false
Expand Down Expand Up @@ -101,7 +170,7 @@ impl Codec {
}

impl Decoder for Codec {
type Item = Elem;
type Item = Frame<RemoteStreamId>;
type Error = IoError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -143,15 +212,15 @@ impl Decoder for Codec {
}

let buf = src.split_to(len);
let substream_id = (header >> 3) as u32;
let num = (header >> 3) as u32;
let out = match header & 7 {
0 => Elem::Open { substream_id },
1 => Elem::Data { substream_id, endpoint: Endpoint::Listener, data: buf.freeze() },
2 => Elem::Data { substream_id, endpoint: Endpoint::Dialer, data: buf.freeze() },
3 => Elem::Close { substream_id, endpoint: Endpoint::Listener },
4 => Elem::Close { substream_id, endpoint: Endpoint::Dialer },
5 => Elem::Reset { substream_id, endpoint: Endpoint::Listener },
6 => Elem::Reset { substream_id, endpoint: Endpoint::Dialer },
0 => Frame::Open { stream_id: RemoteStreamId::dialer(num) },
1 => Frame::Data { stream_id: RemoteStreamId::listener(num), data: buf.freeze() },
2 => Frame::Data { stream_id: RemoteStreamId::dialer(num), data: buf.freeze() },
3 => Frame::Close { stream_id: RemoteStreamId::listener(num) },
4 => Frame::Close { stream_id: RemoteStreamId::dialer(num) },
5 => Frame::Reset { stream_id: RemoteStreamId::listener(num) },
6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) },
_ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
Expand All @@ -171,31 +240,31 @@ impl Decoder for Codec {
}

impl Encoder for Codec {
type Item = Elem;
type Item = Frame<LocalStreamId>;
type Error = IoError;

fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let (header, data) = match item {
Elem::Open { substream_id } => {
(u64::from(substream_id) << 3, Bytes::new())
Frame::Open { stream_id } => {
(u64::from(stream_id.num) << 3, Bytes::new())
},
Elem::Data { substream_id, endpoint: Endpoint::Listener, data } => {
(u64::from(substream_id) << 3 | 1, data)
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Listener }, data } => {
(u64::from(num) << 3 | 1, data)
},
Elem::Data { substream_id, endpoint: Endpoint::Dialer, data } => {
(u64::from(substream_id) << 3 | 2, data)
Frame::Data { stream_id: LocalStreamId { num, role: Endpoint::Dialer }, data } => {
(u64::from(num) << 3 | 2, data)
},
Elem::Close { substream_id, endpoint: Endpoint::Listener } => {
(u64::from(substream_id) << 3 | 3, Bytes::new())
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 3, Bytes::new())
},
Elem::Close { substream_id, endpoint: Endpoint::Dialer } => {
(u64::from(substream_id) << 3 | 4, Bytes::new())
Frame::Close { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 4, Bytes::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Listener } => {
(u64::from(substream_id) << 3 | 5, Bytes::new())
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Listener } } => {
(u64::from(num) << 3 | 5, Bytes::new())
},
Elem::Reset { substream_id, endpoint: Endpoint::Dialer } => {
(u64::from(substream_id) << 3 | 6, Bytes::new())
Frame::Reset { stream_id: LocalStreamId { num, role: Endpoint::Dialer } } => {
(u64::from(num) << 3 | 6, Bytes::new())
},
};

Expand Down Expand Up @@ -225,17 +294,17 @@ mod tests {
#[test]
fn encode_large_messages_fails() {
let mut enc = Codec::new();
let endpoint = Endpoint::Dialer;
let role = Endpoint::Dialer;
let data = Bytes::from(&[123u8; MAX_FRAME_SIZE + 1][..]);
let bad_msg = Elem::Data{ substream_id: 123, endpoint, data };
let bad_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
let mut out = BytesMut::new();
match enc.encode(bad_msg, &mut out) {
Err(e) => assert_eq!(e.to_string(), "data size exceed maximum"),
_ => panic!("Can't send a message bigger than MAX_FRAME_SIZE")
}

let data = Bytes::from(&[123u8; MAX_FRAME_SIZE][..]);
let ok_msg = Elem::Data{ substream_id: 123, endpoint, data };
let ok_msg = Frame::Data { stream_id: LocalStreamId { num: 123, role }, data };
assert!(enc.encode(ok_msg, &mut out).is_ok());
}
}
106 changes: 106 additions & 0 deletions muxers/mplex/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::codec::MAX_FRAME_SIZE;
use std::cmp;

/// Configuration for the multiplexer.
#[derive(Debug, Clone)]
pub struct MplexConfig {
/// Maximum number of simultaneously-open substreams.
pub(crate) max_substreams: usize,
/// Maximum number of frames in the internal buffer.
pub(crate) max_buffer_len: usize,
/// Behaviour when the buffer size limit is reached.
pub(crate) max_buffer_behaviour: MaxBufferBehaviour,
/// When sending data, split it into frames whose maximum size is this value
/// (max 1MByte, as per the Mplex spec).
pub(crate) split_send_size: usize,
}

impl MplexConfig {
/// Builds the default configuration.
pub fn new() -> MplexConfig {
Default::default()
}

/// Sets the maximum number of simultaneously open substreams.
///
/// When the limit is reached, opening of outbound substreams
/// is delayed until another substream closes, whereas new
/// inbound substreams are immediately answered with a `Reset`.
/// If the number of inbound substreams that need to be reset
/// accumulates too quickly (judged by internal bounds), the
/// connection is closed, the connection is closed with an error
/// due to the misbehaved remote.
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
}

/// Sets the maximum number of frames buffered that have
/// not yet been consumed.
///
/// A limit is necessary in order to avoid DoS attacks.
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
}

/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
}

/// Sets the frame size used when sending data. Capped at 1Mbyte as per the
/// Mplex spec.
pub fn split_send_size(&mut self, size: usize) -> &mut Self {
let size = cmp::min(size, MAX_FRAME_SIZE);
self.split_send_size = size;
self
}
}

/// Behaviour when the maximum length of the buffer is reached.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MaxBufferBehaviour {
/// Produce an error on all the substreams.
CloseAll,
/// No new message will be read from the underlying connection if the buffer is full.
///
/// This can potentially introduce a deadlock if you are waiting for a message from a substream
/// before processing the messages received on another substream.
Block,
}

impl Default for MplexConfig {
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
max_buffer_len: 4096,
max_buffer_behaviour: MaxBufferBehaviour::CloseAll,
split_send_size: 1024,
}
}
}

Loading