From b17ccdd5ca5f49022ad0fe92dfc77fffcc729d5f Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Wed, 30 Sep 2020 15:14:35 +0200 Subject: [PATCH 1/3] Require remaining negotiation data to be flushed. There appears to still be an edge-case whereby the `remaining` data to send w.r.t. protocol negotiation to send is successfully written before a `poll_read` on a `Negotiated` stream, but where the subsequent `poll_flush()` is pending. Now `remaining` is empty and the next `poll_read()` will go straight to reading from the underlying I/O stream, despite the flush not having happened yet, which can lead to a form of deadlock during protocol negotiation. Rather than complicating the existing code further in order to accommodate for this case, it seems preferable to simplify the code by giving up on this optimisation that only affects the last negotiation protocol message sent by the "listener". So we give up on the ability to combine data sent by the "listener" immediately after protocol negotiation together with the final negotiation frame in the same transport-level frame/packet. --- misc/multistream-select/src/dialer_select.rs | 3 +- .../src/length_delimited.rs | 22 ++- .../multistream-select/src/listener_select.rs | 39 ++--- misc/multistream-select/src/negotiated.rs | 151 +++--------------- misc/multistream-select/src/protocol.rs | 38 ++--- 5 files changed, 63 insertions(+), 190 deletions(-) diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 87149a6aa9c..efbdc105c9d 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -241,8 +241,7 @@ where } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); - let (io, remaining) = io.into_inner(); - let io = Negotiated::completed(io, remaining); + let io = Negotiated::completed(io.into_inner()); return Poll::Ready(Ok((protocol, io))); } Message::NotAvailable => { diff --git a/misc/multistream-select/src/length_delimited.rs b/misc/multistream-select/src/length_delimited.rs index da7bc632794..593c915ac2b 100644 --- a/misc/multistream-select/src/length_delimited.rs +++ b/misc/multistream-select/src/length_delimited.rs @@ -76,22 +76,18 @@ impl LengthDelimited { } } - /// Drops the [`LengthDelimited`] resource, yielding the underlying I/O stream - /// together with the remaining write buffer containing the uvi-framed data - /// that has not yet been written to the underlying I/O stream. - /// - /// The returned remaining write buffer may be prepended to follow-up - /// protocol data to send with a single `write`. Either way, if non-empty, - /// the write buffer _must_ eventually be written to the I/O stream - /// _before_ any follow-up data, in order to maintain a correct data stream. + /// Drops the [`LengthDelimited`] resource, yielding the underlying I/O stream. /// /// # Panic /// - /// Will panic if called while there is data in the read buffer. The read buffer is - /// guaranteed to be empty whenever `Stream::poll` yields a new `Bytes` frame. - pub fn into_inner(self) -> (R, BytesMut) { + /// Will panic if called while there is data in the read or write buffer. + /// The read buffer is guaranteed to be empty whenever `Stream::poll` yields + /// a new `Bytes` frame. The write buffer is guaranteed to be empty after + /// flushing. + pub fn into_inner(self) -> R { assert!(self.read_buffer.is_empty()); - (self.inner, self.write_buffer) + assert!(self.write_buffer.is_empty()); + self.inner } /// Converts the [`LengthDelimited`] into a [`LengthDelimitedReader`], dropping the @@ -303,7 +299,7 @@ impl LengthDelimitedReader { /// yield a new `Message`. The write buffer is guaranteed to be empty whenever /// [`LengthDelimited::poll_write_buffer`] yields [`Poll::Ready`] or after /// the [`Sink`] has been completely flushed via [`Sink::poll_flush`]. - pub fn into_inner(self) -> (R, BytesMut) { + pub fn into_inner(self) -> R { self.inner.into_inner() } } diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 7a71c9efda9..2d66d666b7b 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -88,7 +88,10 @@ where message: Message, protocol: Option }, - Flush { io: MessageIO }, + Flush { + io: MessageIO, + protocol: Option + }, Done } @@ -141,7 +144,7 @@ where } *this.state = match version { - Version::V1 => State::Flush { io }, + Version::V1 => State::Flush { io, protocol: None }, Version::V1Lazy => State::RecvMessage { io }, } } @@ -204,28 +207,28 @@ where return Poll::Ready(Err(From::from(err))); } - // If a protocol has been selected, finish negotiation. - // Otherwise flush the sink and expect to receive another - // message. - *this.state = match protocol { - Some(protocol) => { - log::debug!("Listener: sent confirmed protocol: {}", - String::from_utf8_lossy(protocol.as_ref())); - let (io, remaining) = io.into_inner(); - let io = Negotiated::completed(io, remaining); - return Poll::Ready(Ok((protocol, io))); - } - None => State::Flush { io } - }; + *this.state = State::Flush { io, protocol }; } - State::Flush { mut io } => { + State::Flush { mut io, protocol } => { match Pin::new(&mut io).poll_flush(cx) { Poll::Pending => { - *this.state = State::Flush { io }; + *this.state = State::Flush { io, protocol }; return Poll::Pending }, - Poll::Ready(Ok(())) => *this.state = State::RecvMessage { io }, + Poll::Ready(Ok(())) => { + // If a protocol has been selected, finish negotiation. + // Otherwise expect to receive another message. + match protocol { + Some(protocol) => { + log::debug!("Listener: sent confirmed protocol: {}", + String::from_utf8_lossy(protocol.as_ref())); + let io = Negotiated::completed(io.into_inner()); + return Poll::Ready(Ok((protocol, io))) + } + None => *this.state = State::RecvMessage { io } + } + } Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), } } diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index f3f9ed9e90f..e96a4259633 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -20,7 +20,6 @@ use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError}; -use bytes::{BytesMut, Buf}; use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready}; use pin_project::pin_project; use std::{error::Error, fmt, io, mem, pin::Pin, task::{Context, Poll}}; @@ -74,10 +73,9 @@ where } impl Negotiated { - /// Creates a `Negotiated` in state [`State::Completed`], possibly - /// with `remaining` data to be sent. - pub(crate) fn completed(io: TInner, remaining: BytesMut) -> Self { - Negotiated { state: State::Completed { io, remaining } } + /// Creates a `Negotiated` in state [`State::Completed`]. + pub(crate) fn completed(io: TInner) -> Self { + Negotiated { state: State::Completed { io } } } /// Creates a `Negotiated` in state [`State::Expecting`] that is still @@ -107,10 +105,7 @@ impl Negotiated { let mut this = self.project(); match this.state.as_mut().project() { - StateProj::Completed { remaining, .. } => { - debug_assert!(remaining.is_empty()); - return Poll::Ready(Ok(())) - } + StateProj::Completed { .. } => return Poll::Ready(Ok(())) _ => {} } @@ -139,8 +134,7 @@ impl Negotiated { if let Message::Protocol(p) = &msg { if p.as_ref() == protocol.as_ref() { log::debug!("Negotiated: Received confirmation for protocol: {}", p); - let (io, remaining) = io.into_inner(); - *this.state = State::Completed { io, remaining }; + *this.state = State::Completed { io: io.into_inner() }; return Poll::Ready(Ok(())); } } @@ -165,7 +159,8 @@ impl Negotiated { #[derive(Debug)] enum State { /// In this state, a `Negotiated` is still expecting to - /// receive confirmation of the protocol it as settled on. + /// receive confirmation of the protocol it has optimistically + /// settled on. Expecting { /// The underlying I/O stream. #[pin] @@ -176,11 +171,9 @@ enum State { version: Version }, - /// In this state, a protocol has been agreed upon and may - /// only be pending the sending of the final acknowledgement, - /// which is prepended to / combined with the next write for - /// efficiency. - Completed { #[pin] io: R, remaining: BytesMut }, + /// In this state, a protocol has been agreed upon and I/O + /// on the underlying stream can commence. + Completed { #[pin] io: R }, /// Temporary state while moving the `io` resource from /// `Expecting` to `Completed`. @@ -196,12 +189,9 @@ where { loop { match self.as_mut().project().state.project() { - StateProj::Completed { io, remaining } => { - // If protocol negotiation is complete and there is no - // remaining data to be flushed, commence with reading. - if remaining.is_empty() { - return io.poll_read(cx, buf) - } + StateProj::Completed { io } => { + // If protocol negotiation is complete, commence with reading. + return io.poll_read(cx, buf) }, _ => {} } @@ -230,12 +220,9 @@ where { loop { match self.as_mut().project().state.project() { - StateProj::Completed { io, remaining } => { - // If protocol negotiation is complete and there is no - // remaining data to be flushed, commence with reading. - if remaining.is_empty() { - return io.poll_read_vectored(cx, bufs) - } + StateProj::Completed { io } => { + // If protocol negotiation is complete, commence with reading. + return io.poll_read_vectored(cx, bufs) }, _ => {} } @@ -257,16 +244,7 @@ where { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { match self.project().state.project() { - StateProj::Completed { mut io, remaining } => { - while !remaining.is_empty() { - let n = ready!(io.as_mut().poll_write(cx, &remaining)?); - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - remaining.advance(n); - } - io.poll_write(cx, buf) - }, + StateProj::Completed { io } => io.poll_write(cx, buf), StateProj::Expecting { io, .. } => io.poll_write(cx, buf), StateProj::Invalid => panic!("Negotiated: Invalid state"), } @@ -274,16 +252,7 @@ where fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.project().state.project() { - StateProj::Completed { mut io, remaining } => { - while !remaining.is_empty() { - let n = ready!(io.as_mut().poll_write(cx, &remaining)?); - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - remaining.advance(n); - } - io.poll_flush(cx) - }, + StateProj::Completed { io } => io.poll_flush(cx), StateProj::Expecting { io, .. } => io.poll_flush(cx), StateProj::Invalid => panic!("Negotiated: Invalid state"), } @@ -307,16 +276,7 @@ where -> Poll> { match self.project().state.project() { - StateProj::Completed { mut io, remaining } => { - while !remaining.is_empty() { - let n = ready!(io.as_mut().poll_write(cx, &remaining)?); - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - remaining.advance(n); - } - io.poll_write_vectored(cx, bufs) - }, + StateProj::Completed { io } => io.poll_write_vectored(cx, bufs), StateProj::Expecting { io, .. } => io.poll_write_vectored(cx, bufs), StateProj::Invalid => panic!("Negotiated: Invalid state"), } @@ -373,76 +333,3 @@ impl fmt::Display for NegotiationError { } } } - -#[cfg(test)] -mod tests { - use super::*; - use quickcheck::*; - use std::{io::Write, task::Poll}; - - /// An I/O resource with a fixed write capacity (total and per write op). - struct Capped { buf: Vec, step: usize } - - impl AsyncRead for Capped { - fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll> { - unreachable!() - } - } - - impl AsyncWrite for Capped { - fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { - if self.buf.len() + buf.len() > self.buf.capacity() { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())) - } - let len = usize::min(self.step, buf.len()); - let n = Write::write(&mut self.buf, &buf[.. len]).unwrap(); - Poll::Ready(Ok(n)) - } - - fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - } - - #[test] - fn write_remaining() { - fn prop(rem: Vec, new: Vec, free: u8, step: u8) -> TestResult { - let cap = rem.len() + free as usize; - let step = u8::min(free, step) as usize + 1; - let buf = Capped { buf: Vec::with_capacity(cap), step }; - let rem = BytesMut::from(&rem[..]); - let mut io = Negotiated::completed(buf, rem.clone()); - let mut written = 0; - loop { - // Write until `new` has been fully written or the capped buffer runs - // over capacity and yields WriteZero. - match future::poll_fn(|cx| Pin::new(&mut io).poll_write(cx, &new[written..])).now_or_never().unwrap() { - Ok(n) => - if let State::Completed { remaining, .. } = &io.state { - assert!(remaining.is_empty()); - written += n; - if written == new.len() { - return TestResult::passed() - } - } else { - return TestResult::failed() - } - Err(e) if e.kind() == io::ErrorKind::WriteZero => { - if let State::Completed { .. } = &io.state { - assert!(rem.len() + new.len() > cap); - return TestResult::passed() - } else { - return TestResult::failed() - } - } - Err(e) => panic!("Unexpected error: {:?}", e), - } - } - } - quickcheck(prop as fn(_,_,_,_) -> _) - } -} diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 846a4167305..c15ddbc098d 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -289,23 +289,16 @@ impl MessageIO { MessageReader { inner: self.inner.into_reader() } } - /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream - /// together with the remaining write buffer containing the protocol - /// negotiation frame data that has not yet been written to the I/O stream. - /// - /// The returned remaining write buffer may be prepended to follow-up - /// protocol data to send with a single `write`. Either way, if non-empty, - /// the write buffer _must_ eventually be written to the I/O stream - /// _before_ any follow-up data, in order for protocol negotiation to - /// complete cleanly. + /// Drops the [`MessageIO`] resource, yielding the underlying I/O stream. /// /// # Panics /// - /// Panics if the read buffer is not empty, meaning that an incoming - /// protocol negotiation frame has been partially read. The read buffer - /// is guaranteed to be empty whenever `MessageIO::poll` returned - /// a message. - pub fn into_inner(self) -> (R, BytesMut) { + /// Panics if the read buffer or write buffer is not empty, meaning that an incoming + /// protocol negotiation frame has been partially read or an outgoing frame + /// has not yet been flushed. The read buffer is guaranteed to be empty whenever + /// `MessageIO::poll` returned a message. The write buffer is guaranteed to be empty + /// when the sink has been flushed. + pub fn into_inner(self) -> R { self.inner.into_inner() } } @@ -365,19 +358,14 @@ impl MessageReader { /// together with the remaining write buffer containing the protocol /// negotiation frame data that has not yet been written to the I/O stream. /// - /// The returned remaining write buffer may be prepended to follow-up - /// protocol data to send with a single `write`. Either way, if non-empty, - /// the write buffer _must_ eventually be written to the I/O stream - /// _before_ any follow-up data, in order for protocol negotiation to - /// complete cleanly. - /// /// # Panics /// - /// Panics if the read buffer is not empty, meaning that an incoming - /// protocol negotiation frame has been partially read. The read buffer - /// is guaranteed to be empty whenever `MessageReader::poll` returned - /// a message. - pub fn into_inner(self) -> (R, BytesMut) { + /// Panics if the read buffer or write buffer is not empty, meaning that either + /// an incoming protocol negotiation frame has been partially read, or an + /// outgoing frame has not yet been flushed. The read buffer is guaranteed to + /// be empty whenever `MessageReader::poll` returned a message. The write + /// buffer is guaranteed to be empty whenever the sink has been flushed. + pub fn into_inner(self) -> R { self.inner.into_inner() } } From 1067636b2ecbfb6df3d41f946d26503c3798c660 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Wed, 30 Sep 2020 15:33:07 +0200 Subject: [PATCH 2/3] Update changelog. --- misc/multistream-select/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index e8dedd3770a..7eb9593d353 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,5 +1,10 @@ # 0.8.3 [unreleased] +- Fix a potential deadlock during protocol negotiation due + to a missing flush, potentially resulting in sporadic protocol + upgrade timeouts. + [PR 1781](https://github.com/libp2p/rust-libp2p/pull/1781). + - Update dependencies. # 0.8.2 [2020-06-22] From 77e315e194d9b2aa7e79d253dba8b014389c8b66 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Wed, 30 Sep 2020 15:40:46 +0200 Subject: [PATCH 3/3] Add missing comma. --- misc/multistream-select/src/negotiated.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index e96a4259633..bfc1c9d176f 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -105,7 +105,7 @@ impl Negotiated { let mut this = self.project(); match this.state.as_mut().project() { - StateProj::Completed { .. } => return Poll::Ready(Ok(())) + StateProj::Completed { .. } => return Poll::Ready(Ok(())), _ => {} }