From 3218229dab01ae7c8c3a0441a5529829c4b4a5ea Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Wed, 11 May 2022 22:02:11 -0700 Subject: [PATCH] Revert Connection::open_{uni, bi} to named 'static futures --- quinn/src/connection.rs | 101 +++++++++++++++++++++++++++++----------- quinn/src/lib.rs | 3 +- 2 files changed, 75 insertions(+), 29 deletions(-) diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index a94da0bb44..72558cd502 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -325,9 +325,11 @@ impl Connection { /// Streams are cheap and instantaneous to open unless blocked by flow control. As a /// consequence, the peer won't be notified that a stream has been opened until the stream is /// actually used. - pub async fn open_uni(&self) -> Result { - let (id, is_0rtt) = self.open(Dir::Uni).await?; - Ok(SendStream::new(self.0.clone(), id, is_0rtt)) + pub fn open_uni(&self) -> OpenUni<'_> { + OpenUni { + conn: &self.0, + notify: self.0.shared.stream_opening[Dir::Uni as usize].notified(), + } } /// Initiate a new outgoing bidirectional stream. @@ -335,31 +337,10 @@ impl Connection { /// Streams are cheap and instantaneous to open unless blocked by flow control. As a /// consequence, the peer won't be notified that a stream has been opened until the stream is /// actually used. - pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> { - let (id, is_0rtt) = self.open(Dir::Bi).await?; - Ok(( - SendStream::new(self.0.clone(), id, is_0rtt), - RecvStream::new(self.0.clone(), id, is_0rtt), - )) - } - - async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> { - loop { - { - let mut conn = self.0.state.lock("open"); - if let Some(ref e) = conn.error { - return Err(e.clone()); - } - if let Some(id) = conn.inner.streams().open(dir) { - let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking(); - return Ok((id, is_0rtt)); - } - // Construct the future while the lock is held to ensure we can't miss a wakeup if - // the `Notify` is signaled immediately after we release the lock. `await` it after - // the lock guard is out of scope. - self.0.shared.stream_opening[dir as usize].notified() - } - .await + pub fn open_bi(&self) -> OpenBi<'_> { + OpenBi { + conn: &self.0, + notify: self.0.shared.stream_opening[Dir::Bi as usize].notified(), } } @@ -623,6 +604,70 @@ impl Connection { } } +pin_project! { + /// Future produced by [`Connection::open_uni`] + pub struct OpenUni<'a> { + conn: &'a ConnectionRef, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for OpenUni<'_> { + type Output = Result; + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let this = self.project(); + let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?; + Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt))) + } +} + +pin_project! { + /// Future produced by [`Connection::open_bi`] + pub struct OpenBi<'a> { + conn: &'a ConnectionRef, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for OpenBi<'_> { + type Output = Result<(SendStream, RecvStream), ConnectionError>; + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let this = self.project(); + let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?; + + Poll::Ready(Ok(( + SendStream::new(conn.clone(), id, is_0rtt), + RecvStream::new(conn, id, is_0rtt), + ))) + } +} + +fn poll_open<'a>( + ctx: &mut Context<'_>, + conn: &'a ConnectionRef, + mut notify: Pin<&mut Notified<'a>>, + dir: Dir, +) -> Poll> { + let mut state = conn.state.lock("poll_open"); + if let Some(ref e) = state.error { + return Poll::Ready(Err(e.clone())); + } else if let Some(id) = state.inner.streams().open(dir) { + let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking(); + drop(state); // Release the lock so clone can take it + return Poll::Ready(Ok((conn.clone(), id, is_0rtt))); + } + loop { + match notify.as_mut().poll(ctx) { + // `state` lock ensures we didn't race with readiness + Poll::Pending => return Poll::Pending, + // Spurious wakeup, get a new future + Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()), + } + } +} + /// A stream of unidirectional QUIC streams initiated by a remote peer. /// /// Incoming streams are *always* opened in the same order that the peer created them, but data can diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index e333768259..e4bd4986be 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -71,7 +71,8 @@ pub use proto::{ pub use crate::connection::{ AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, - NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted, + NewConnection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, UnknownStream, + ZeroRttAccepted, }; pub use crate::endpoint::{Endpoint, Incoming}; pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};