diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 64c532f853f..d8350606888 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -53,7 +53,6 @@ pub struct MplexConfig { impl MplexConfig { /// Builds the default configuration. - #[inline] pub fn new() -> MplexConfig { Default::default() } @@ -62,7 +61,6 @@ impl MplexConfig { /// generated and the connection closes. /// /// A limit is necessary in order to avoid DoS attacks. - #[inline] pub fn max_substreams(&mut self, max: usize) -> &mut Self { self.max_substreams = max; self @@ -71,7 +69,6 @@ impl MplexConfig { /// Sets the maximum number of pending incoming messages. /// /// A limit is necessary in order to avoid DoS attacks. - #[inline] pub fn max_buffer_len(&mut self, max: usize) -> &mut Self { self.max_buffer_len = max; self @@ -80,7 +77,6 @@ impl MplexConfig { /// Sets the behaviour when the maximum buffer length has been reached. /// /// See the documentation of `MaxBufferBehaviour`. - #[inline] pub fn max_buffer_len_behaviour(&mut self, behaviour: MaxBufferBehaviour) -> &mut Self { self.max_buffer_behaviour = behaviour; self @@ -94,7 +90,6 @@ impl MplexConfig { self } - #[inline] fn upgrade(self, i: C) -> Multiplex where C: AsyncRead + AsyncWrite + Unpin @@ -122,7 +117,6 @@ impl MplexConfig { } impl Default for MplexConfig { - #[inline] fn default() -> MplexConfig { MplexConfig { max_substreams: 128, @@ -149,7 +143,6 @@ impl UpgradeInfo for MplexConfig { type Info = &'static [u8]; type InfoIter = iter::Once; - #[inline] fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/mplex/6.7.0") } @@ -334,9 +327,7 @@ where C: AsyncRead + AsyncWrite + Unpin, fn poll_send(inner: &mut MultiplexInner, cx: &mut Context, elem: codec::Elem) -> Poll> where C: AsyncRead + AsyncWrite + Unpin { - if inner.is_shutdown { - return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down"))) - } + ensure_no_error_no_close(inner)?; inner.notifier_write.insert(cx.waker()); @@ -348,10 +339,26 @@ where C: AsyncRead + AsyncWrite + Unpin } }, Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => Poll::Ready(Err(err)) + Poll::Ready(Err(err)) => { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + Poll::Ready(Err(err)) + } } } +fn ensure_no_error_no_close(inner: &mut MultiplexInner) -> Result<(), IoError> +where + C: AsyncRead + AsyncWrite + Unpin +{ + if inner.is_shutdown { + return Err(IoError::new(IoErrorKind::Other, "connection is shut down")) + } + if let Err(ref e) = inner.error { + return Err(IoError::new(e.kind(), e.to_string())) + } + Ok(()) +} + impl StreamMuxer for Multiplex where C: AsyncRead + AsyncWrite + Unpin { @@ -418,9 +425,7 @@ where C: AsyncRead + AsyncWrite + Unpin poll_send(&mut inner, cx, elem.clone()) }, OutboundSubstreamState::Flush => { - if inner.is_shutdown { - return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down"))) - } + ensure_no_error_no_close(&mut inner)?; let inner = &mut *inner; // Avoids borrow errors inner.notifier_write.insert(cx.waker()); Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) @@ -438,6 +443,7 @@ where C: AsyncRead + AsyncWrite + Unpin inner.buffer.retain(|elem| { elem.substream_id() != substream.num || elem.endpoint() == Some(Endpoint::Dialer) }); + inner.error = Err(IoError::new(err.kind(), err.to_string())); return Poll::Ready(Err(err)); }, }; @@ -465,7 +471,6 @@ where C: AsyncRead + AsyncWrite + Unpin } } - #[inline] fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { // Nothing to do. } @@ -548,13 +553,14 @@ where C: AsyncRead + AsyncWrite + Unpin fn flush_substream(&self, cx: &mut Context, _substream: &mut Self::Substream) -> Poll> { let mut inner = self.inner.lock(); - if inner.is_shutdown { - return Poll::Ready(Err(IoError::new(IoErrorKind::Other, "connection is shut down"))) - } - + ensure_no_error_no_close(&mut inner)?; let inner = &mut *inner; // Avoids borrow errors inner.notifier_write.insert(cx.waker()); - Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) + let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))); + if let Poll::Ready(Err(err)) = &result { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + } + result } fn shutdown_substream(&self, cx: &mut Context, sub: &mut Self::Substream) -> Poll> { @@ -585,28 +591,42 @@ where C: AsyncRead + AsyncWrite + Unpin self.inner.lock().is_acknowledged } - #[inline] fn close(&self, cx: &mut Context) -> Poll> { let inner = &mut *self.inner.lock(); + if inner.is_shutdown { + return Poll::Ready(Ok(())) + } + if let Err(ref e) = inner.error { + return Poll::Ready(Err(IoError::new(e.kind(), e.to_string()))) + } inner.notifier_write.insert(cx.waker()); match Sink::poll_close(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) { Poll::Ready(Ok(())) => { inner.is_shutdown = true; Poll::Ready(Ok(())) } - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Err(err)) => { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + Poll::Ready(Err(err)) + } Poll::Pending => Poll::Pending, } } - #[inline] fn flush_all(&self, cx: &mut Context) -> Poll> { let inner = &mut *self.inner.lock(); if inner.is_shutdown { return Poll::Ready(Ok(())) } + if let Err(ref e) = inner.error { + return Poll::Ready(Err(IoError::new(e.kind(), e.to_string()))) + } inner.notifier_write.insert(cx.waker()); - Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))) + let result = Sink::poll_flush(Pin::new(&mut inner.inner), &mut Context::from_waker(&waker_ref(&inner.notifier_write))); + if let Poll::Ready(Err(err)) = &result { + inner.error = Err(IoError::new(err.kind(), err.to_string())); + } + result } }