diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index e8dedd3770a..7eb9593d353 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,5 +1,10 @@ # 0.8.3 [unreleased] +- Fix a potential deadlock during protocol negotiation due + to a missing flush, potentially resulting in sporadic protocol + upgrade timeouts. + [PR 1781](https://github.com/libp2p/rust-libp2p/pull/1781). + - Update dependencies. # 0.8.2 [2020-06-22] diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 87149a6aa9c..efbdc105c9d 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -241,8 +241,7 @@ where } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); - let (io, remaining) = io.into_inner(); - let io = Negotiated::completed(io, remaining); + let io = Negotiated::completed(io.into_inner()); return Poll::Ready(Ok((protocol, io))); } Message::NotAvailable => { diff --git a/misc/multistream-select/src/length_delimited.rs b/misc/multistream-select/src/length_delimited.rs index da7bc632794..593c915ac2b 100644 --- a/misc/multistream-select/src/length_delimited.rs +++ b/misc/multistream-select/src/length_delimited.rs @@ -76,22 +76,18 @@ impl LengthDelimited { } } - /// Drops the [`LengthDelimited`] resource, yielding the underlying I/O stream - /// together with the remaining write buffer containing the uvi-framed data - /// that has not yet been written to the underlying I/O stream. - /// - /// The returned remaining write buffer may be prepended to follow-up - /// protocol data to send with a single `write`. Either way, if non-empty, - /// the write buffer _must_ eventually be written to the I/O stream - /// _before_ any follow-up data, in order to maintain a correct data stream. + /// Drops the [`LengthDelimited`] resource, yielding the underlying I/O stream. /// /// # Panic /// - /// Will panic if called while there is data in the read buffer. The read buffer is - /// guaranteed to be empty whenever `Stream::poll` yields a new `Bytes` frame. - pub fn into_inner(self) -> (R, BytesMut) { + /// Will panic if called while there is data in the read or write buffer. + /// The read buffer is guaranteed to be empty whenever `Stream::poll` yields + /// a new `Bytes` frame. The write buffer is guaranteed to be empty after + /// flushing. + pub fn into_inner(self) -> R { assert!(self.read_buffer.is_empty()); - (self.inner, self.write_buffer) + assert!(self.write_buffer.is_empty()); + self.inner } /// Converts the [`LengthDelimited`] into a [`LengthDelimitedReader`], dropping the @@ -303,7 +299,7 @@ impl LengthDelimitedReader { /// yield a new `Message`. The write buffer is guaranteed to be empty whenever /// [`LengthDelimited::poll_write_buffer`] yields [`Poll::Ready`] or after /// the [`Sink`] has been completely flushed via [`Sink::poll_flush`]. - pub fn into_inner(self) -> (R, BytesMut) { + pub fn into_inner(self) -> R { self.inner.into_inner() } } diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 7a71c9efda9..2d66d666b7b 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -88,7 +88,10 @@ where message: Message, protocol: Option }, - Flush { io: MessageIO }, + Flush { + io: MessageIO, + protocol: Option + }, Done } @@ -141,7 +144,7 @@ where } *this.state = match version { - Version::V1 => State::Flush { io }, + Version::V1 => State::Flush { io, protocol: None }, Version::V1Lazy => State::RecvMessage { io }, } } @@ -204,28 +207,28 @@ where return Poll::Ready(Err(From::from(err))); } - // If a protocol has been selected, finish negotiation. - // Otherwise flush the sink and expect to receive another - // message. - *this.state = match protocol { - Some(protocol) => { - log::debug!("Listener: sent confirmed protocol: {}", - String::from_utf8_lossy(protocol.as_ref())); - let (io, remaining) = io.into_inner(); - let io = Negotiated::completed(io, remaining); - return Poll::Ready(Ok((protocol, io))); - } - None => State::Flush { io } - }; + *this.state = State::Flush { io, protocol }; } - State::Flush { mut io } => { + State::Flush { mut io, protocol } => { match Pin::new(&mut io).poll_flush(cx) { Poll::Pending => { - *this.state = State::Flush { io }; + *this.state = State::Flush { io, protocol }; return Poll::Pending }, - Poll::Ready(Ok(())) => *this.state = State::RecvMessage { io }, + Poll::Ready(Ok(())) => { + // If a protocol has been selected, finish negotiation. + // Otherwise expect to receive another message. + match protocol { + Some(protocol) => { + log::debug!("Listener: sent confirmed protocol: {}", + String::from_utf8_lossy(protocol.as_ref())); + let io = Negotiated::completed(io.into_inner()); + return Poll::Ready(Ok((protocol, io))) + } + None => *this.state = State::RecvMessage { io } + } + } Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), } } diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index f3f9ed9e90f..bfc1c9d176f 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -20,7 +20,6 @@ use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError}; -use bytes::{BytesMut, Buf}; use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready}; use pin_project::pin_project; use std::{error::Error, fmt, io, mem, pin::Pin, task::{Context, Poll}}; @@ -74,10 +73,9 @@ where } impl Negotiated { - /// Creates a `Negotiated` in state [`State::Completed`], possibly - /// with `remaining` data to be sent. - pub(crate) fn completed(io: TInner, remaining: BytesMut) -> Self { - Negotiated { state: State::Completed { io, remaining } } + /// Creates a `Negotiated` in state [`State::Completed`]. + pub(crate) fn completed(io: TInner) -> Self { + Negotiated { state: State::Completed { io } } } /// Creates a `Negotiated` in state [`State::Expecting`] that is still @@ -107,10 +105,7 @@ impl Negotiated { let mut this = self.project(); match this.state.as_mut().project() { - StateProj::Completed { remaining, .. } => { - debug_assert!(remaining.is_empty()); - return Poll::Ready(Ok(())) - } + StateProj::Completed { .. } => return Poll::Ready(Ok(())), _ => {} } @@ -139,8 +134,7 @@ impl Negotiated { if let Message::Protocol(p) = &msg { if p.as_ref() == protocol.as_ref() { log::debug!("Negotiated: Received confirmation for protocol: {}", p); - let (io, remaining) = io.into_inner(); - *this.state = State::Completed { io, remaining }; + *this.state = State::Completed { io: io.into_inner() }; return Poll::Ready(Ok(())); } } @@ -165,7 +159,8 @@ impl Negotiated { #[derive(Debug)] enum State { /// In this state, a `Negotiated` is still expecting to - /// receive confirmation of the protocol it as settled on. + /// receive confirmation of the protocol it has optimistically + /// settled on. Expecting { /// The underlying I/O stream. #[pin] @@ -176,11 +171,9 @@ enum State { version: Version }, - /// In this state, a protocol has been agreed upon and may - /// only be pending the sending of the final acknowledgement, - /// which is prepended to / combined with the next write for - /// efficiency. - Completed { #[pin] io: R, remaining: BytesMut }, + /// In this state, a protocol has been agreed upon and I/O + /// on the underlying stream can commence. + Completed { #[pin] io: R }, /// Temporary state while moving the `io` resource from /// `Expecting` to `Completed`. @@ -196,12 +189,9 @@ where { loop { match self.as_mut().project().state.project() { - StateProj::Completed { io, remaining } => { - // If protocol negotiation is complete and there is no - // remaining data to be flushed, commence with reading. - if remaining.is_empty() { - return io.poll_read(cx, buf) - } + StateProj::Completed { io } => { + // If protocol negotiation is complete, commence with reading. + return io.poll_read(cx, buf) }, _ => {} } @@ -230,12 +220,9 @@ where { loop { match self.as_mut().project().state.project() { - StateProj::Completed { io, remaining } => { - // If protocol negotiation is complete and there is no - // remaining data to be flushed, commence with reading. - if remaining.is_empty() { - return io.poll_read_vectored(cx, bufs) - } + StateProj::Completed { io } => { + // If protocol negotiation is complete, commence with reading. + return io.poll_read_vectored(cx, bufs) }, _ => {} } @@ -257,16 +244,7 @@ where { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { match self.project().state.project() { - StateProj::Completed { mut io, remaining } => { - while !remaining.is_empty() { - let n = ready!(io.as_mut().poll_write(cx, &remaining)?); - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - remaining.advance(n); - } - io.poll_write(cx, buf) - }, + StateProj::Completed { io } => io.poll_write(cx, buf), StateProj::Expecting { io, .. } => io.poll_write(cx, buf), StateProj::Invalid => panic!("Negotiated: Invalid state"), } @@ -274,16 +252,7 @@ where fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project().state.project() { - StateProj::Completed { mut io, remaining } => { - while !remaining.is_empty() { - let n = ready!(io.as_mut().poll_write(cx, &remaining)?); - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - remaining.advance(n); - } - io.poll_flush(cx) - }, + StateProj::Completed { io } => io.poll_flush(cx), StateProj::Expecting { io, .. } => io.poll_flush(cx), StateProj::Invalid => panic!("Negotiated: Invalid state"), } @@ -307,16 +276,7 @@ where -> Poll> { match self.project().state.project() { - StateProj::Completed { mut io, remaining } => { - while !remaining.is_empty() { - let n = ready!(io.as_mut().poll_write(cx, &remaining)?); - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - remaining.advance(n); - } - io.poll_write_vectored(cx, bufs) - }, + StateProj::Completed { io } => io.poll_write_vectored(cx, bufs), StateProj::Expecting { io, .. } => io.poll_write_vectored(cx, bufs), StateProj::Invalid => panic!("Negotiated: Invalid state"), } @@ -373,76 +333,3 @@ impl fmt::Display for NegotiationError { } } } - -#[cfg(test)] -mod tests { - use super::*; - use quickcheck::*; - use std::{io::Write, task::Poll}; - - /// An I/O resource with a fixed write capacity (total and per write op). - struct Capped { buf: Vec, step: usize } - - impl AsyncRead for Capped { - fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll> { - unreachable!() - } - } - - impl AsyncWrite for Capped { - fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { - if self.buf.len() + buf.len() > self.buf.capacity() { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - let len = usize::min(self.step, buf.len()); - let n = Write::write(&mut self.buf, &buf[.. len]).unwrap(); - Poll::Ready(Ok(n)) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - } - - #[test] - fn write_remaining() { - fn prop(rem: Vec, new: Vec, free: u8, step: u8) -> TestResult { - let cap = rem.len() + free as usize; - let step = u8::min(free, step) as usize + 1; - let buf = Capped { buf: Vec::with_capacity(cap), step }; - let rem = BytesMut::from(&rem[..]); - let mut io = Negotiated::completed(buf, rem.clone()); - let mut written = 0; - loop { - // Write until `new` has been fully written or the capped buffer runs - // over capacity and yields WriteZero. - match future::poll_fn(|cx| Pin::new(&mut io).poll_write(cx, &new[written..])).now_or_never().unwrap() { - Ok(n) => - if let State::Completed { remaining, .. } = &io.state { - assert!(remaining.is_empty()); - written += n; - if written == new.len() { - return TestResult::passed() - } - } else { - return TestResult::failed() - } - Err(e) if e.kind() == io::ErrorKind::WriteZero => { - if let State::Completed { .. } = &io.state { - assert!(rem.len() + new.len() > cap); - return TestResult::passed() - } else { - return TestResult::failed() - } - } - Err(e) => panic!("Unexpected error: {:?}", e), - } - } - } - quickcheck(prop as fn(_,_,_,_) -> _) - } -} diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 846a4167305..c15ddbc098d 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -289,23 +289,16 @@ impl MessageIO { MessageReader { inner: self.inner.into_reader() } } - /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream - /// together with the remaining write buffer containing the protocol - /// negotiation frame data that has not yet been written to the I/O stream. - /// - /// The returned remaining write buffer may be prepended to follow-up - /// protocol data to send with a single `write`. Either way, if non-empty, - /// the write buffer _must_ eventually be written to the I/O stream - /// _before_ any follow-up data, in order for protocol negotiation to - /// complete cleanly. + /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream. /// /// # Panics /// - /// Panics if the read buffer is not empty, meaning that an incoming - /// protocol negotiation frame has been partially read. The read buffer - /// is guaranteed to be empty whenever `MessageIO::poll` returned - /// a message. - pub fn into_inner(self) -> (R, BytesMut) { + /// Panics if the read buffer or write buffer is not empty, meaning that an incoming + /// protocol negotiation frame has been partially read or an outgoing frame + /// has not yet been flushed. The read buffer is guaranteed to be empty whenever + /// `MessageIO::poll` returned a message. The write buffer is guaranteed to be empty + /// when the sink has been flushed. + pub fn into_inner(self) -> R { self.inner.into_inner() } } @@ -365,19 +358,14 @@ impl MessageReader { /// together with the remaining write buffer containing the protocol /// negotiation frame data that has not yet been written to the I/O stream. /// - /// The returned remaining write buffer may be prepended to follow-up - /// protocol data to send with a single `write`. Either way, if non-empty, - /// the write buffer _must_ eventually be written to the I/O stream - /// _before_ any follow-up data, in order for protocol negotiation to - /// complete cleanly. - /// /// # Panics /// - /// Panics if the read buffer is not empty, meaning that an incoming - /// protocol negotiation frame has been partially read. The read buffer - /// is guaranteed to be empty whenever `MessageReader::poll` returned - /// a message. - pub fn into_inner(self) -> (R, BytesMut) { + /// Panics if the read buffer or write buffer is not empty, meaning that either + /// an incoming protocol negotiation frame has been partially read, or an + /// outgoing frame has not yet been flushed. The read buffer is guaranteed to + /// be empty whenever `MessageReader::poll` returned a message. The write + /// buffer is guaranteed to be empty whenever the sink has been flushed. + pub fn into_inner(self) -> R { self.inner.into_inner() } }