Skip to content

Commit

Permalink
mplex: Check for error and shutdown. (#1529)
Browse files Browse the repository at this point in the history
* mplex: Check for error and shutdown.

Issues #1504 and #1523 reported panics caused by polling the sink of
`secio::LenPrefixCodec` after it had entered its terminal state, i.e.
after it had previously encountered an error or was closed. According
to the reports this happened only when using mplex as a stream
multiplexer. It seems that because mplex always stores and keeps the
`Waker` when polling, a wakeup of any of those wakers will resume the
polling even for those cases where the previous poll did not return
`Poll::Pending` but resolved to a value.

To prevent polling after the connection was closed or an error
happened we check for those conditions prior to every poll.

* Keep error when operations fail.

Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
  • Loading branch information
twittner and tomaka committed Apr 1, 2020
1 parent 6b4bdc1 commit 3f12a5c
Showing 1 changed file with 44 additions and 24 deletions.
68 changes: 44 additions & 24 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub struct MplexConfig {

impl MplexConfig {
/// Builds the default configuration.
#[inline]
pub fn new() -> MplexConfig {
Default::default()
}
Expand All @@ -62,7 +61,6 @@ impl MplexConfig {
/// generated and the connection closes.
///
/// A limit is necessary in order to avoid DoS attacks.
#[inline]
pub fn max_substreams(&mut self, max: usize) -> &mut Self {
self.max_substreams = max;
self
Expand All @@ -71,7 +69,6 @@ impl MplexConfig {
/// Sets the maximum number of pending incoming messages.
///
/// A limit is necessary in order to avoid DoS attacks.
#[inline]
pub fn max_buffer_len(&mut self, max: usize) -> &mut Self {
self.max_buffer_len = max;
self
Expand All @@ -80,7 +77,6 @@ impl MplexConfig {
/// Sets the behaviour when the maximum buffer length has been reached.
///
/// See the documentation of `MaxBufferBehaviour`.
#[inline]
pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self {
self.max_buffer_behaviour = behaviour;
self
Expand All @@ -94,7 +90,6 @@ impl MplexConfig {
self
}

#[inline]
fn upgrade<C>(self, i: C) -> Multiplex<C>
where
C: AsyncRead + AsyncWrite + Unpin
Expand Down Expand Up @@ -122,7 +117,6 @@ impl MplexConfig {
}

impl Default for MplexConfig {
#[inline]
fn default() -> MplexConfig {
MplexConfig {
max_substreams: 128,
Expand All @@ -149,7 +143,6 @@ impl UpgradeInfo for MplexConfig {
type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>;

#[inline]
fn protocol_info(&self) -> Self::InfoIter {
iter::once(b"/mplex/6.7.0")
}
Expand Down Expand Up @@ -334,9 +327,7 @@ where C: AsyncRead + AsyncWrite + Unpin,
fn poll_send<C>(inner: &mut MultiplexInner<C>, cx: &mut Context, elem: codec::Elem) -> Poll<Result<(), IoError>>
where C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}
ensure_no_error_no_close(inner)?;

inner.notifier_write.insert(cx.waker());

Expand All @@ -348,10 +339,26 @@ where C: AsyncRead + AsyncWrite + Unpin
}
},
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Err(err))
Poll::Ready(Err(err)) => {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
Poll::Ready(Err(err))
}
}
}

fn ensure_no_error_no_close<C>(inner: &mut MultiplexInner<C>) -> Result<(), IoError>
where
C: AsyncRead + AsyncWrite + Unpin
{
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
if let Err(ref e) = inner.error {
return Err(IoError::new(e.kind(), e.to_string()))
}
Ok(())
}

impl<C> StreamMuxer for Multiplex<C>
where C: AsyncRead + AsyncWrite + Unpin
{
Expand Down Expand Up @@ -418,9 +425,7 @@ where C: AsyncRead + AsyncWrite + Unpin
poll_send(&mut inner, cx, elem.clone())
},
OutboundSubstreamState::Flush => {
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}
ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
Expand All @@ -438,6 +443,7 @@ where C: AsyncRead + AsyncWrite + Unpin
inner.buffer.retain(|elem| {
elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer)
});
inner.error = Err(IoError::new(err.kind(), err.to_string()));
return Poll::Ready(Err(err));
},
};
Expand Down Expand Up @@ -465,7 +471,6 @@ where C: AsyncRead + AsyncWrite + Unpin
}
}

#[inline]
fn destroy_outbound(&self, _substream: Self::OutboundSubstream) {
// Nothing to do.
}
Expand Down Expand Up @@ -548,13 +553,14 @@ where C: AsyncRead + AsyncWrite + Unpin

fn flush_substream(&self, cx: &mut Context, _substream: &mut Self::Substream) -> Poll<Result<(), IoError>> {
let mut inner = self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down")))
}

ensure_no_error_no_close(&mut inner)?;
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
if let Poll::Ready(Err(err)) = &result {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
}
result
}

fn shutdown_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll<Result<(), IoError>> {
Expand Down Expand Up @@ -585,28 +591,42 @@ where C: AsyncRead + AsyncWrite + Unpin
self.inner.lock().is_acknowledged
}

#[inline]
fn close(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) {
Poll::Ready(Ok(())) => {
inner.is_shutdown = true;
Poll::Ready(Ok(()))
}
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Ready(Err(err)) => {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
Poll::Ready(Err(err))
}
Poll::Pending => Poll::Pending,
}
}

#[inline]
fn flush_all(&self, cx: &mut Context) -> Poll<Result<(), IoError>> {
let inner = &mut *self.inner.lock();
if inner.is_shutdown {
return Poll::Ready(Ok(()))
}
if let Err(ref e) = inner.error {
return Poll::Ready(Err(IoError::new(e.kind(), e.to_string())))
}
inner.notifier_write.insert(cx.waker());
Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)))
let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write)));
if let Poll::Ready(Err(err)) = &result {
inner.error = Err(IoError::new(err.kind(), err.to_string()));
}
result
}
}

Expand Down

0 comments on commit 3f12a5c

Please sign in to comment.