Skip to content

Commit

Permalink
mplex: Check for error and shutdown.
Browse files Browse the repository at this point in the history
Issues libp2p#1504 and libp2p#1523 reported panics caused by polling the sink of
`secio::LenPrefixCodec` after it had entered its terminal state, i.e.
after it had previously encountered an error or was closed. According
to the reports this happened only when using mplex as a stream
multiplexer. It seems that because mplex always stores and keeps the
`Waker` when polling, a wakeup of any of those wakers will resume the
polling even for those cases where the previous poll did not return
`Poll::Pending` but resolved to a value.

To prevent polling after the connection was closed or an error
happened we check for those conditions prior to every poll.
  • Loading branch information
twittner committed Mar 30, 2020
1 parent 0d3e4f2 commit 4f94e5a
Showing 1 changed file with 25 additions and 20 deletions.
45 changes: 25 additions & 20 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub struct MplexConfig {

impl MplexConfig {
/// Builds the default configuration.
#[inline]
pub fn new() -> MplexConfig {
Default::default()
}
Expand All @@ -62,7 +61,6 @@ impl MplexConfig {
/// generated and the connection closes.
///
/// A limit is necessary in order to avoid DoS attacks.
#[inline]
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
Expand All @@ -71,7 +69,6 @@ impl MplexConfig {
/// Sets the maximum number of pending incoming messages.
///
/// A limit is necessary in order to avoid DoS attacks.
#[inline]
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
Expand All @@ -80,7 +77,6 @@ impl MplexConfig {
/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
#[inline]
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
Expand All @@ -94,7 +90,6 @@ impl MplexConfig {
self
}

#[inline]
fn upgrade<C>(self, i: C) -> Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin
Expand Down Expand Up @@ -122,7 +117,6 @@ impl MplexConfig {
}

impl Default for MplexConfig {
#[inline]
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
Expand All @@ -149,7 +143,6 @@ impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;

#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/mplex/6.7.0")
}
Expand Down Expand Up @@ -334,9 +327,7 @@ where C: AsyncRead + AsyncWrite + Unpin,
fn poll_send<C>(inner: &mut MultiplexInner<C>, cx: &mut Context, elem: codec::Elem) -> Poll<Result<(), IoError>>
where C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}
ensure_no_error_no_close(inner)?;

inner.notifier_write.insert(cx.waker());

Expand All @@ -352,6 +343,19 @@ where C: AsyncRead + AsyncWrite + Unpin
}
}

fn ensure_no_error_no_close<C>(inner: &mut MultiplexInner<C>) -> Result<(), IoError>
where
C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
if let Err(ref e) = inner.error {
return Err(IoError::new(e.kind(), e.to_string()))
}
Ok(())
}

impl<C> StreamMuxer for Multiplex<C>
where C: AsyncRead + AsyncWrite + Unpin
{
Expand Down Expand Up @@ -418,9 +422,7 @@ where C: AsyncRead + AsyncWrite + Unpin
poll_send(&mut inner, cx, elem.clone())
},
OutboundSubstreamState::Flush => {
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}
ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
Expand Down Expand Up @@ -465,7 +467,6 @@ where C: AsyncRead + AsyncWrite + Unpin
}
}

#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
// Nothing to do.
}
Expand Down Expand Up @@ -548,10 +549,7 @@ where C: AsyncRead + AsyncWrite + Unpin

fn flush_substream(&self, cx: &mut Context, _substream: &mut Self::Substream) -> Poll<Result<(), IoError>> {
let mut inner = self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}

ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
Expand Down Expand Up @@ -585,9 +583,14 @@ where C: AsyncRead + AsyncWrite + Unpin
self.inner.lock().is_acknowledged
}

#[inline]
fn close(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
Poll::Ready(Ok(())) => {
Expand All @@ -599,12 +602,14 @@ where C: AsyncRead + AsyncWrite + Unpin
}
}

#[inline]
fn flush_all(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
}
Expand Down

0 comments on commit 4f94e5a

Please sign in to comment.