Skip to content

Commit

Permalink
Revert Connection::open_{uni, bi} to named 'static futures
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Aug 28, 2022
1 parent 6c8be8c commit 4c1186a
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 38 deletions.
108 changes: 71 additions & 37 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,45 +326,22 @@ impl Connection {
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_uni(&self) -> Result<SendStream, ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Uni).await?;
Ok(SendStream::new(self.0.clone(), id, is_0rtt))
pub fn open_uni(&self) -> OpenUni {
OpenUni {
conn: Some(self.0.clone()),
notify: self.0.lock("open_uni").stream_opening[Dir::Uni as usize].wait(),
}
}

/// Initiate a new outgoing bidirectional stream.
///
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the stream is
/// actually used.
pub async fn open_bi(&self) -> Result<(SendStream, RecvStream), ConnectionError> {
let (id, is_0rtt) = self.open(Dir::Bi).await?;
Ok((
SendStream::new(self.0.clone(), id, is_0rtt),
RecvStream::new(self.0.clone(), id, is_0rtt),
))
}

async fn open(&self, dir: Dir) -> Result<(StreamId, bool), ConnectionError> {
loop {
let opening;
{
let mut conn = self.0.lock("open");
if let Some(ref e) = conn.error {
return Err(e.clone());
}
if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
return Ok((id, is_0rtt));
}
// Clone the `Arc<Notify>` so we can wait on the underlying `Notify` without holding
// the lock. Store it in the outer scope to ensure it outlives the lock guard.
opening = conn.stream_opening[dir as usize].clone();
// Construct the future while the lock is held to ensure we can't miss a wakeup if
// the `Notify` is signaled immediately after we release the lock. `await` it after
// the lock guard is out of scope.
opening.notified()
}
.await
pub fn open_bi(&self) -> OpenBi {
OpenBi {
conn: Some(self.0.clone()),
notify: self.0.lock("open_bi").stream_opening[Dir::Bi as usize].wait(),
}
}

Expand Down Expand Up @@ -623,6 +600,63 @@ impl Clone for Connection {
}
}

/// Future produced by [`Connection::open_uni`]
pub struct OpenUni {
conn: Option<ConnectionRef>,
notify: notify::Waiter,
}

impl Future for OpenUni {
type Output = Result<SendStream, ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (conn, id, is_0rtt) =
ready!(poll_open(ctx, &mut this.conn, &mut this.notify, Dir::Uni))?;
Poll::Ready(Ok(SendStream::new(conn, id, is_0rtt)))
}
}

/// Future produced by [`Connection::open_bi`]
pub struct OpenBi {
conn: Option<ConnectionRef>,
notify: notify::Waiter,
}

impl Future for OpenBi {
type Output = Result<(SendStream, RecvStream), ConnectionError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let (conn, id, is_0rtt) =
ready!(poll_open(ctx, &mut this.conn, &mut this.notify, Dir::Bi))?;

Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}

fn poll_open(
ctx: &mut Context<'_>,
conn_storage: &mut Option<ConnectionRef>,
notify: &mut notify::Waiter,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut conn = conn_storage.as_ref().unwrap().lock("poll_open");
if let Some(ref e) = conn.error {
Poll::Ready(Err(e.clone()))
} else if let Some(id) = conn.inner.streams().open(dir) {
let is_0rtt = conn.inner.side().is_client() && conn.inner.is_handshaking();
drop(conn); // Release the borrow so it can be passed to `RecvStream`
let conn = conn_storage.take().expect("polled after completion");
Poll::Ready(Ok((conn, id, is_0rtt)))
} else {
// `conn` lock ensures we don't race with readiness
notify.register(ctx);
Poll::Pending
}
}

/// A stream of unidirectional QUIC streams initiated by a remote peer.
///
/// Incoming streams are *always* opened in the same order that the peer created them, but data can
Expand Down Expand Up @@ -855,7 +889,7 @@ impl ConnectionRef {
endpoint_events,
blocked_writers: FxHashMap::default(),
blocked_readers: FxHashMap::default(),
stream_opening: [Arc::new(Notify::new()), Arc::new(Notify::new())],
stream_opening: [NotifyOwned::new(), NotifyOwned::new()],
incoming_uni_streams_reader: None,
stream_incoming: [NotifyOwned::new(), NotifyOwned::new()],
incoming_bi_streams_reader: None,
Expand Down Expand Up @@ -919,7 +953,7 @@ pub struct ConnectionInner {
endpoint_events: mpsc::UnboundedSender<(ConnectionHandle, EndpointEvent)>,
pub(crate) blocked_writers: FxHashMap<StreamId, Waker>,
pub(crate) blocked_readers: FxHashMap<StreamId, Waker>,
stream_opening: [Arc<Notify>; 2],
stream_opening: [NotifyOwned; 2],
incoming_uni_streams_reader: Option<Waker>,
incoming_bi_streams_reader: Option<Waker>,
stream_incoming: [NotifyOwned; 2],
Expand Down Expand Up @@ -1049,7 +1083,7 @@ impl ConnectionInner {
}
}
Stream(StreamEvent::Available { dir }) => {
self.stream_opening[dir as usize].notify_one();
self.stream_opening[dir as usize].notify_all();
}
Stream(StreamEvent::Finished { id }) => {
if let Some(finishing) = self.finishing.remove(&id) {
Expand Down Expand Up @@ -1141,8 +1175,8 @@ impl ConnectionInner {
for (_, reader) in self.blocked_readers.drain() {
reader.wake()
}
self.stream_opening[Dir::Uni as usize].notify_waiters();
self.stream_opening[Dir::Bi as usize].notify_waiters();
self.stream_opening[Dir::Uni as usize].notify_all();
self.stream_opening[Dir::Bi as usize].notify_all();
self.stream_incoming[Dir::Uni as usize].notify_all();
self.stream_incoming[Dir::Bi as usize].notify_all();
if let Some(x) = self.incoming_uni_streams_reader.take() {
Expand Down
3 changes: 2 additions & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ pub use proto::{

pub use crate::connection::{
AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams,
NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted,
NewConnection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, UnknownStream,
ZeroRttAccepted,
};
pub use crate::endpoint::{Endpoint, Incoming};
pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};
Expand Down

0 comments on commit 4c1186a

Please sign in to comment.