From 5b68158e395b536b5040e48ef6a20fbd1494f911 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Wed, 11 May 2022 21:45:37 -0700 Subject: [PATCH] Introduce Connection::{accept_{uni, bi}, read_datagram} Provides a more natural alternative to the streams in NewConnection --- quinn/Cargo.toml | 1 + quinn/src/connection.rs | 130 +++++++++++++++++++++++++++++++++++++++- quinn/src/lib.rs | 4 +- 3 files changed, 132 insertions(+), 3 deletions(-) diff --git a/quinn/Cargo.toml b/quinn/Cargo.toml index 21bb498d5b..05e2be9866 100644 --- a/quinn/Cargo.toml +++ b/quinn/Cargo.toml @@ -39,6 +39,7 @@ futures-io = { version = "0.3.19", optional = true } # Implements futures::Stream for async streams such as `Incoming` futures-core = { version = "0.3.19", optional = true } rustc-hash = "1.1" +pin-project-lite = "0.2" proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false } rustls = { version = "0.20.3", default-features = false, features = ["quic"], optional = true } thiserror = "1.0.21" diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 0be6a9072e..a94da0bb44 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -12,10 +12,11 @@ use std::{ use crate::runtime::{AsyncTimer, Runtime}; use bytes::Bytes; +use pin_project_lite::pin_project; use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId}; use rustc_hash::FxHashMap; use thiserror::Error; -use tokio::sync::{mpsc, oneshot, Notify}; +use tokio::sync::{futures::Notified, mpsc, oneshot, Notify}; use tracing::debug_span; use udp::UdpState; @@ -362,6 +363,30 @@ impl Connection { } } + /// Accept the next incoming uni-directional stream + pub fn accept_uni(&self) -> AcceptUni<'_> { + AcceptUni { + conn: &self.0, + notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(), + } + } + + /// Accept the next incoming bidirectional stream + pub fn accept_bi(&self) -> AcceptBi<'_> { + AcceptBi { + conn: &self.0, + notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(), + } + } + + /// Receive an application datagram + pub fn read_datagram(&self) -> ReadDatagram<'_> { + ReadDatagram { + conn: self.0.clone(), + notify: self.0.shared.datagrams.notified(), + } + } + /// Wait for the connection to be closed for any reason /// /// Despite the return type's name, closed connections are often not an error condition at the @@ -688,6 +713,99 @@ impl futures_core::Stream for IncomingBiStreams { } } +pin_project! { + /// Future produced by [`Connection::accept_uni`] + pub struct AcceptUni<'a> { + conn: &'a ConnectionRef, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for AcceptUni<'_> { + type Output = Result; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let this = self.project(); + let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?; + Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt))) + } +} + +pin_project! { + /// Future produced by [`Connection::accept_bi`] + pub struct AcceptBi<'a> { + conn: &'a ConnectionRef, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for AcceptBi<'_> { + type Output = Result<(SendStream, RecvStream), ConnectionError>; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let this = self.project(); + let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?; + Poll::Ready(Ok(( + SendStream::new(conn.clone(), id, is_0rtt), + RecvStream::new(conn, id, is_0rtt), + ))) + } +} + +fn poll_accept<'a>( + ctx: &mut Context<'_>, + conn: &'a ConnectionRef, + mut notify: Pin<&mut Notified<'a>>, + dir: Dir, +) -> Poll> { + let mut state = conn.state.lock("poll_accept"); + if let Some(id) = state.inner.streams().accept(dir) { + let is_0rtt = state.inner.is_handshaking(); + state.wake(); // To send additional stream ID credit + drop(state); // Release the lock so clone can take it + return Poll::Ready(Ok((conn.clone(), id, is_0rtt))); + } else if let Some(ref e) = state.error { + return Poll::Ready(Err(e.clone())); + } + loop { + match notify.as_mut().poll(ctx) { + // `state` lock ensures we didn't race with readiness + Poll::Pending => return Poll::Pending, + // Spurious wakeup, get a new future + Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()), + } + } +} + +pin_project! { + /// Future produced by [`Connection::read_datagram`] + pub struct ReadDatagram<'a> { + conn: ConnectionRef, + #[pin] + notify: Notified<'a>, + } +} + +impl Future for ReadDatagram<'_> { + type Output = Result; + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let this = self.project(); + let mut conn = this.conn.state.lock("ReadDatagram::poll"); + if let Some(x) = conn.inner.datagrams().recv() { + Poll::Ready(Ok(x)) + } else if let Some(ref e) = conn.error { + Poll::Ready(Err(e.clone())) + } else if this.notify.poll(ctx).is_pending() { + // `conn` lock ensures we don't race with readiness + Poll::Pending + } else { + unreachable!("ReadDatagram notified with no datagrams pending"); + } + } +} + /// Stream of unordered, unreliable datagrams sent by the peer #[derive(Debug)] pub struct Datagrams(ConnectionRef); @@ -763,6 +881,8 @@ impl ConnectionRef { }), shared: Shared { stream_opening: [Notify::new(), Notify::new()], + stream_incoming: [Notify::new(), Notify::new()], + datagrams: Notify::new(), closed: Notify::new(), }, })) @@ -812,6 +932,8 @@ pub struct ConnectionInner { #[derive(Debug)] pub(crate) struct Shared { stream_opening: [Notify; 2], + stream_incoming: [Notify; 2], + datagrams: Notify, closed: Notify, } @@ -935,16 +1057,19 @@ impl State { } } Stream(StreamEvent::Opened { dir: Dir::Uni }) => { + shared.stream_incoming[Dir::Uni as usize].notify_waiters(); if let Some(x) = self.incoming_uni_streams_reader.take() { x.wake(); } } Stream(StreamEvent::Opened { dir: Dir::Bi }) => { + shared.stream_incoming[Dir::Bi as usize].notify_waiters(); if let Some(x) = self.incoming_bi_streams_reader.take() { x.wake(); } } DatagramReceived => { + shared.datagrams.notify_waiters(); if let Some(x) = self.datagram_reader.take() { x.wake(); } @@ -1050,12 +1175,15 @@ impl State { } shared.stream_opening[Dir::Uni as usize].notify_waiters(); shared.stream_opening[Dir::Bi as usize].notify_waiters(); + shared.stream_incoming[Dir::Uni as usize].notify_waiters(); + shared.stream_incoming[Dir::Bi as usize].notify_waiters(); if let Some(x) = self.incoming_uni_streams_reader.take() { x.wake(); } if let Some(x) = self.incoming_bi_streams_reader.take() { x.wake(); } + shared.datagrams.notify_waiters(); if let Some(x) = self.datagram_reader.take() { x.wake(); } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index 7b5ad09eb2..e333768259 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -70,8 +70,8 @@ pub use proto::{ }; pub use crate::connection::{ - Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, NewConnection, - SendDatagramError, UnknownStream, ZeroRttAccepted, + AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, + NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted, }; pub use crate::endpoint::{Endpoint, Incoming}; pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};