diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 79a144dd3b7..341d19e4978 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -11,12 +11,13 @@ categories = ["network-programming", "asynchronous"] [dependencies] bytes = "0.5" -fnv = "1.0" futures = "0.3.1" futures_codec = "0.4" libp2p-core = { version = "0.22.0", path = "../../core" } log = "0.4" +nohash-hasher = "0.2" parking_lot = "0.11" +rand = "0.7" smallvec = "1.4" unsigned-varint = { version = "0.5", features = ["futures-codec"] } diff --git a/muxers/mplex/src/codec.rs b/muxers/mplex/src/codec.rs index 268860a511e..25d653bc0ad 100644 --- a/muxers/mplex/src/codec.rs +++ b/muxers/mplex/src/codec.rs @@ -18,11 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use libp2p_core::Endpoint; -use futures_codec::{Decoder, Encoder}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::{fmt, mem}; use bytes::{BufMut, Bytes, BytesMut}; +use futures_codec::{Decoder, Encoder}; +use libp2p_core::Endpoint; +use std::{fmt, hash::{Hash, Hasher}, io, mem}; use unsigned_varint::{codec, encode}; // Maximum size for a packet: 1MB as per the spec. @@ -46,7 +45,7 @@ pub(crate) const MAX_FRAME_SIZE: usize = 1024 * 1024; /// > we initiated the stream, so the local ID has the role `Endpoint::Dialer`. /// > Conversely, when receiving a frame with a flag identifying the remote as a "sender", /// > the corresponding local ID has the role `Endpoint::Listener`. -#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)] +#[derive(Copy, Clone, PartialEq, Eq, Debug)] pub struct LocalStreamId { num: u32, role: Endpoint, @@ -61,6 +60,14 @@ impl fmt::Display for LocalStreamId { } } +impl Hash for LocalStreamId { + fn hash(&self, state: &mut H) { + state.write_u32(self.num); + } +} + +impl nohash_hasher::IsEnabled for LocalStreamId {} + /// A unique identifier used by the remote node for a substream. /// /// `RemoteStreamId`s are received with frames from the remote @@ -161,7 +168,7 @@ impl Codec { impl Decoder for Codec { type Item = Frame; - type Error = IoError; + type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { loop { @@ -182,7 +189,7 @@ impl Decoder for Codec { Some(len) => { if len as usize > MAX_FRAME_SIZE { let msg = format!("Mplex frame length {} exceeds maximum", len); - return Err(IoError::new(IoErrorKind::InvalidData, msg)); + return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); } self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize); @@ -213,7 +220,7 @@ impl Decoder for Codec { 6 => Frame::Reset { stream_id: RemoteStreamId::dialer(num) }, _ => { let msg = format!("Invalid mplex header value 0x{:x}", header); - return Err(IoError::new(IoErrorKind::InvalidData, msg)); + return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); }, }; @@ -222,7 +229,7 @@ impl Decoder for Codec { }, CodecDecodeState::Poisoned => { - return Err(IoError::new(IoErrorKind::InvalidData, "Mplex codec poisoned")); + return Err(io::Error::new(io::ErrorKind::InvalidData, "Mplex codec poisoned")); } } } @@ -231,7 +238,7 @@ impl Decoder for Codec { impl Encoder for Codec { type Item = Frame; - type Error = IoError; + type Error = io::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { let (header, data) = match item { @@ -266,7 +273,7 @@ impl Encoder for Codec { let data_len_bytes = encode::usize(data_len, &mut data_buf); if data_len > MAX_FRAME_SIZE { - return Err(IoError::new(IoErrorKind::InvalidData, "data size exceed maximum")); + return Err(io::Error::new(io::ErrorKind::InvalidData, "data size exceed maximum")); } dst.reserve(header_bytes.len() + data_len_bytes.len() + data_len); diff --git a/muxers/mplex/src/io.rs b/muxers/mplex/src/io.rs index 223641a5dc3..9fff47cde4a 100644 --- a/muxers/mplex/src/io.rs +++ b/muxers/mplex/src/io.rs @@ -22,22 +22,42 @@ use bytes::Bytes; use crate::{MplexConfig, MaxBufferBehaviour}; use crate::codec::{Codec, Frame, LocalStreamId, RemoteStreamId}; use log::{debug, trace}; -use fnv::FnvHashMap; use futures::{prelude::*, ready, stream::Fuse}; use futures::task::{AtomicWaker, ArcWake, waker_ref, WakerRef}; use futures_codec::Framed; +use nohash_hasher::{IntMap, IntSet}; use parking_lot::Mutex; use smallvec::SmallVec; use std::collections::VecDeque; -use std::{cmp, io, mem, sync::Arc, task::{Context, Poll, Waker}}; +use std::{cmp, fmt, io, mem, sync::Arc, task::{Context, Poll, Waker}}; pub use std::io::{Result, Error, ErrorKind}; +/// A connection identifier. +/// +/// Randomly generated and mainly intended to improve log output +/// by scoping substream IDs to a connection. +#[derive(Clone, Copy)] +struct Id(u32); + +impl fmt::Debug for Id { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:08x}", self.0) + } +} + +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:08x}", self.0) + } +} /// A multiplexed I/O stream. pub struct Multiplexed { - /// The current operating status. + /// A unique ID for the multiplexed stream (i.e. connection). + id: Id, + /// The current operating status of the multiplex stream. status: Status, - /// The underlying I/O stream. + /// The underlying multiplexed I/O stream. io: Fuse>, /// The configuration. config: MplexConfig, @@ -47,7 +67,7 @@ pub struct Multiplexed { open_buffer: VecDeque, /// Whether a flush is pending due to one or more new outbound /// `Open` frames, before reading frames can proceed. - pending_flush_open: bool, + pending_flush_open: IntSet, /// The stream that currently blocks reading for all streams /// due to a full buffer, if any. Only applicable for use /// with [`MaxBufferBehaviour::Block`]. @@ -61,7 +81,7 @@ pub struct Multiplexed { /// if some or all of the pending frames cannot be sent. pending_frames: VecDeque>, /// The managed substreams. - substreams: FnvHashMap, + substreams: IntMap, /// The ID for the next outbound substream. next_outbound_stream_id: LocalStreamId, /// Registry of wakers for pending tasks interested in reading. @@ -93,13 +113,16 @@ where { /// Creates a new multiplexed I/O stream. pub fn new(io: C, config: MplexConfig) -> Self { + let id = Id(rand::random()); + debug!("New multiplexed stream: {}", id); Multiplexed { + id, config, status: Status::Open, io: Framed::new(io, Codec::new()).fuse(), open_buffer: Default::default(), substreams: Default::default(), - pending_flush_open: false, + pending_flush_open: Default::default(), pending_frames: Default::default(), blocking_stream: None, next_outbound_stream_id: LocalStreamId::dialer(0), @@ -132,7 +155,7 @@ where match ready!(self.io.poll_flush_unpin(&mut Context::from_waker(&waker))) { Err(e) => Poll::Ready(self.on_error(e)), Ok(()) => { - self.pending_flush_open = false; + self.pending_flush_open = Default::default(); Poll::Ready(Ok(())) } } @@ -219,7 +242,8 @@ where // Check the stream limits. if self.substreams.len() >= self.config.max_substreams { - debug!("Maximum number of substreams reached: {}", self.config.max_substreams); + debug!("{}: Maximum number of substreams reached ({})", + self.id, self.config.max_substreams); self.notifier_open.register(cx.waker()); return Poll::Pending } @@ -235,11 +259,11 @@ where self.substreams.insert(stream_id, SubstreamState::Open { buf: Default::default() }); - log::debug!("New outbound substream: {} (total {})", - stream_id, self.substreams.len()); + debug!("{}: New outbound substream: {} (total {})", + self.id, stream_id, self.substreams.len()); // The flush is delayed and the `Open` frame may be sent // together with other frames in the same transport packet. - self.pending_flush_open = true; + self.pending_flush_open.insert(stream_id); Poll::Ready(Ok(stream_id)) } Err(e) => Poll::Ready(self.on_error(e)), @@ -303,14 +327,14 @@ where if self.check_max_pending_frames().is_err() { return } - log::trace!("Pending close for stream {}", id); + trace!("{}: Pending close for stream {}", self.id, id); self.pending_frames.push_front(Frame::Close { stream_id: id }); } SubstreamState::Open { .. } => { if self.check_max_pending_frames().is_err() { return } - log::trace!("Pending reset for stream {}", id); + trace!("{}: Pending reset for stream {}", self.id, id); self.pending_frames.push_front(Frame::Reset { stream_id: id }); } } @@ -407,7 +431,7 @@ where frame @ Frame::Open { .. } => { if let Some(id) = self.on_open(frame.remote_id())? { self.open_buffer.push_front(id); - trace!("Buffered new inbound stream {} (total: {})", id, self.open_buffer.len()); + trace!("{}: Buffered new inbound stream {} (total: {})", self.id, id, self.open_buffer.len()); self.notifier_read.wake_next_stream(); } } @@ -440,7 +464,7 @@ where self.guard_open()?; ready!(self.poll_flush(cx))?; - trace!("Flushed substream {}", id); + trace!("{}: Flushed substream {}", self.id, id); Poll::Ready(Ok(())) } @@ -472,7 +496,7 @@ where self.substreams.insert(id, SubstreamState::Open { buf }); Poll::Pending } else { - debug!("Closed substream {} (half-close)", id); + debug!("{}: Closed substream {} (half-close)", self.id, id); self.substreams.insert(id, SubstreamState::SendClosed { buf }); Poll::Ready(Ok(())) } @@ -482,7 +506,7 @@ where self.substreams.insert(id, SubstreamState::RecvClosed { buf }); Poll::Pending } else { - debug!("Closed substream {}", id); + debug!("{}: Closed substream {}", self.id, id); self.substreams.insert(id, SubstreamState::Closed { buf }); Poll::Ready(Ok(())) } @@ -503,7 +527,7 @@ where match ready!(self.io.poll_ready_unpin(&mut Context::from_waker(&waker))) { Ok(()) => { let frame = frame(); - trace!("Sending {:?}", frame); + trace!("{}: Sending {:?}", self.id, frame); match self.io.start_send_unpin(frame) { Ok(()) => Poll::Ready(Ok(())), Err(e) => Poll::Ready(self.on_error(e)) @@ -528,10 +552,11 @@ where } // Perform any pending flush before reading. - if self.pending_flush_open { - trace!("Executing pending flush."); - ready!(self.poll_flush(cx))?; - debug_assert!(!self.pending_flush_open); + if let Some(id) = &stream_id { + if self.pending_flush_open.remove(id) { + trace!("{}: Executing pending flush for {}.", self.id, id); + ready!(self.poll_flush(cx))?; + } } // Check if there is a blocked stream. @@ -544,7 +569,7 @@ where if !self.notifier_read.wake_read_stream(*blocked_id) { // No task dedicated to the blocked stream woken, so schedule // this task again to have a chance at progress. - trace!("No task to read from blocked stream. Waking current task."); + trace!("{}: No task to read from blocked stream. Waking current task.", self.id); cx.waker().clone().wake(); } else { if let Some(id) = stream_id { @@ -570,7 +595,7 @@ where }; match ready!(self.io.poll_next_unpin(&mut Context::from_waker(&waker))) { Some(Ok(frame)) => { - trace!("Received {:?}", frame); + trace!("{}: Received {:?}", self.id, frame); Poll::Ready(Ok(frame)) } Some(Err(e)) => Poll::Ready(self.on_error(e)), @@ -583,15 +608,16 @@ where let id = id.into_local(); if self.substreams.contains_key(&id) { - debug!("Received unexpected `Open` frame for open substream {}", id); + debug!("{}: Received unexpected `Open` frame for open substream {}", self.id, id); return self.on_error(io::Error::new(io::ErrorKind::Other, "Protocol error: Received `Open` frame for open substream.")) } if self.substreams.len() >= self.config.max_substreams { - debug!("Maximum number of substreams exceeded: {}", self.config.max_substreams); + debug!("{}: Maximum number of substreams exceeded: {}", + self.id, self.config.max_substreams); self.check_max_pending_frames()?; - debug!("Pending reset for new stream {}", id); + debug!("{}: Pending reset for new stream {}", self.id, id); self.pending_frames.push_front(Frame::Reset { stream_id: id }); @@ -602,7 +628,7 @@ where buf: Default::default() }); - log::debug!("New inbound substream: {} (total {})", id, self.substreams.len()); + debug!("{}: New inbound substream: {} (total {})", self.id, id, self.substreams.len()); Ok(Some(id)) } @@ -612,15 +638,16 @@ where if let Some(state) = self.substreams.remove(&id) { match state { SubstreamState::Closed { .. } => { - trace!("Ignoring reset for mutually closed substream {}.", id); + trace!("{}: Ignoring reset for mutually closed substream {}.", self.id, id); } SubstreamState::Reset { .. } => { - trace!("Ignoring redundant reset for already reset substream {}", id); + trace!("{}: Ignoring redundant reset for already reset substream {}", + self.id, id); } SubstreamState::RecvClosed { buf } | SubstreamState::SendClosed { buf } | SubstreamState::Open { buf } => { - debug!("Substream {} reset by remote.", id); + debug!("{}: Substream {} reset by remote.", self.id, id); self.substreams.insert(id, SubstreamState::Reset { buf }); // Notify tasks interested in reading from that stream, // so they may read the EOF. @@ -628,7 +655,8 @@ where } } } else { - trace!("Ignoring `Reset` for unknown substream {}. Possibly dropped earlier.", id); + trace!("{}: Ignoring `Reset` for unknown substream {}. Possibly dropped earlier.", + self.id, id); } } @@ -637,30 +665,35 @@ where if let Some(state) = self.substreams.remove(&id) { match state { SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => { - debug!("Received unexpected `Close` frame for closed substream {}", id); + debug!("{}: Received unexpected `Close` frame for closed substream {}", + self.id, id); return self.on_error( io::Error::new(io::ErrorKind::Other, "Protocol error: Received `Close` frame for closed substream.")) }, SubstreamState::Reset { buf } => { - debug!("Ignoring `Close` frame for already reset substream {}", id); + debug!("{}: Ignoring `Close` frame for already reset substream {}", + self.id, id); self.substreams.insert(id, SubstreamState::Reset { buf }); } SubstreamState::SendClosed { buf } => { - debug!("Substream {} closed by remote (SendClosed -> Closed).", id); + debug!("{}: Substream {} closed by remote (SendClosed -> Closed).", + self.id, id); self.substreams.insert(id, SubstreamState::Closed { buf }); // Notify tasks interested in reading, so they may read the EOF. self.notifier_read.wake_read_stream(id); }, SubstreamState::Open { buf } => { - debug!("Substream {} closed by remote (Open -> RecvClosed)", id); + debug!("{}: Substream {} closed by remote (Open -> RecvClosed)", + self.id, id); self.substreams.insert(id, SubstreamState::RecvClosed { buf }); // Notify tasks interested in reading, so they may read the EOF. self.notifier_read.wake_read_stream(id); }, } } else { - trace!("Ignoring `Close` for unknown substream {}. Possibly dropped earlier.", id); + trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.", + self.id, id); } Ok(()) @@ -697,7 +730,7 @@ where /// Records a fatal error for the multiplexed I/O stream. fn on_error(&mut self, e: io::Error) -> io::Result { - log::debug!("Multiplexed connection failed: {:?}", e); + debug!("{}: Multiplexed connection failed: {:?}", self.id, e); self.status = Status::Err(io::Error::new(e.kind(), e.to_string())); self.pending_frames = Default::default(); self.substreams = Default::default(); @@ -738,29 +771,29 @@ where let state = if let Some(state) = self.substreams.get_mut(&id) { state } else { - trace!("Dropping data {:?} for unknown substream {}", data, id); + trace!("{}: Dropping data {:?} for unknown substream {}", self.id, data, id); return Ok(()) }; let buf = if let Some(buf) = state.recv_buf_open() { buf } else { - trace!("Dropping data {:?} for closed or reset substream {}", data, id); + trace!("{}: Dropping data {:?} for closed or reset substream {}", self.id, data, id); return Ok(()) }; debug_assert!(buf.len() <= self.config.max_buffer_len); - trace!("Buffering {:?} for stream {} (total: {})", data, id, buf.len() + 1); + trace!("{}: Buffering {:?} for stream {} (total: {})", self.id, data, id, buf.len() + 1); buf.push(data); self.notifier_read.wake_read_stream(id); if buf.len() > self.config.max_buffer_len { - debug!("Frame buffer of stream {} is full.", id); + debug!("{}: Frame buffer of stream {} is full.", self.id, id); match self.config.max_buffer_behaviour { MaxBufferBehaviour::ResetStream => { let buf = buf.clone(); self.check_max_pending_frames()?; self.substreams.insert(id, SubstreamState::Reset { buf }); - debug!("Pending reset for stream {}", id); + debug!("{}: Pending reset for stream {}", self.id, id); self.pending_frames.push_front(Frame::Reset { stream_id: id }); @@ -828,7 +861,7 @@ struct NotifierRead { next_stream: AtomicWaker, /// The wakers of currently pending tasks that last /// called `poll_read_stream` for a particular substream. - read_stream: Mutex>, + read_stream: Mutex>, } impl NotifierRead {