Skip to content

Commit

Permalink
Revert Connection::open_{uni, bi} to named 'static futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Sep 21, 2022
1 parent 5b68158 commit 3218229
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 29 deletions.
101 changes: 73 additions & 28 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,41 +325,22 @@ impl Connection {
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Uni).await?;
Ok(SendStream::new(self.0.clone(), id, is_0rtt))
pub fn open_uni(&self) -> OpenUni<'_> {
OpenUni {
conn: &self.0,
notify: self.0.shared.stream_opening[Dir::Uni as usize].notified(),
}
}

/// Initiate a new outgoing bidirectional stream.
///
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Bi).await?;
Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
))
}

async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> {
loop {
{
let mut conn = self.0.state.lock("open");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
return Ok((id, is_0rtt));
}
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
self.0.shared.stream_opening[dir as usize].notified()
}
.await
pub fn open_bi(&self) -> OpenBi<'_> {
OpenBi {
conn: &self.0,
notify: self.0.shared.stream_opening[Dir::Bi as usize].notified(),
}
}

Expand Down Expand Up @@ -623,6 +604,70 @@ impl Connection {
}
}

pin_project! {
/// Future produced by [`Connection::open_uni`]
pub struct OpenUni<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}

impl Future for OpenUni<'_> {
type Output = Result<SendStream, ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Uni))?;
Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
}
}

pin_project! {
/// Future produced by [`Connection::open_bi`]
pub struct OpenBi<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}

impl Future for OpenBi<'_> {
type Output = Result<(SendStream, RecvStream), ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_open(ctx, this.conn, this.notify, Dir::Bi))?;

Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}

fn poll_open<'a>(
ctx: &mut Context<'_>,
conn: &'a ConnectionRef,
mut notify: Pin<&mut Notified<'a>>,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut state = conn.state.lock("poll_open");
if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
} else if let Some(id) = state.inner.streams().open(dir) {
let is_0rtt = state.inner.side().is_client() && state.inner.is_handshaking();
drop(state); // Release the lock so clone can take it
return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
}
loop {
match notify.as_mut().poll(ctx) {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
}
}
}

/// A stream of unidirectional QUIC streams initiated by a remote peer.
///
/// Incoming streams are *always* opened in the same order that the peer created them, but data can
Expand Down
3 changes: 2 additions & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ pub use proto::{

pub use crate::connection::{
AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams,
NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted,
NewConnection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, UnknownStream,
ZeroRttAccepted,
};
pub use crate::endpoint::{Endpoint, Incoming};
pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};
Expand Down

0 comments on commit 3218229

Please sign in to comment.