Skip to content

Commit

Permalink
Introduce Connection::{accept_{uni, bi}, read_datagram}
Browse files Browse the repository at this point in the history
Provides a more natural alternative to the streams in NewConnection
  • Loading branch information
Ralith authored and djc committed Sep 27, 2022
1 parent 3e2935f commit 6ba0051
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 3 deletions.
1 change: 1 addition & 0 deletions quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ futures-io = { version = "0.3.19", optional = true }
# Implements futures::Stream for async streams such as `Incoming`
futures-core = { version = "0.3.19", optional = true }
rustc-hash = "1.1"
pin-project-lite = "0.2"
proto = { package = "quinn-proto", path = "../quinn-proto", version = "0.8", default-features = false }
rustls = { version = "0.20.3", default-features = false, features = ["quic"], optional = true }
thiserror = "1.0.21"
Expand Down
132 changes: 131 additions & 1 deletion quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ use std::{

use crate::runtime::{AsyncTimer, Runtime};
use bytes::Bytes;
use pin_project_lite::pin_project;
use proto::{ConnectionError, ConnectionHandle, ConnectionStats, Dir, StreamEvent, StreamId};
use rustc_hash::FxHashMap;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::sync::{futures::Notified, mpsc, oneshot, Notify};
use tracing::debug_span;
use udp::UdpState;

Expand Down Expand Up @@ -362,6 +363,30 @@ impl Connection {
}
}

/// Accept the next incoming uni-directional stream
pub fn accept_uni(&self) -> AcceptUni<'_> {
AcceptUni {
conn: &self.0,
notify: self.0.shared.stream_incoming[Dir::Uni as usize].notified(),
}
}

/// Accept the next incoming bidirectional stream
pub fn accept_bi(&self) -> AcceptBi<'_> {
AcceptBi {
conn: &self.0,
notify: self.0.shared.stream_incoming[Dir::Bi as usize].notified(),
}
}

/// Receive an application datagram
pub fn read_datagram(&self) -> ReadDatagram<'_> {
ReadDatagram {
conn: self.0.clone(),
notify: self.0.shared.datagrams.notified(),
}
}

/// Wait for the connection to be closed for any reason
///
/// Despite the return type's name, closed connections are often not an error condition at the
Expand Down Expand Up @@ -688,6 +713,103 @@ impl futures_core::Stream for IncomingBiStreams {
}
}

pin_project! {
/// Future produced by [`Connection::accept_uni`]
pub struct AcceptUni<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}

impl Future for AcceptUni<'_> {
type Output = Result<RecvStream, ConnectionError>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Uni))?;
Poll::Ready(Ok(RecvStream::new(conn, id, is_0rtt)))
}
}

pin_project! {
/// Future produced by [`Connection::accept_bi`]
pub struct AcceptBi<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}

impl Future for AcceptBi<'_> {
type Output = Result<(SendStream, RecvStream), ConnectionError>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let (conn, id, is_0rtt) = ready!(poll_accept(ctx, this.conn, this.notify, Dir::Bi))?;
Poll::Ready(Ok((
SendStream::new(conn.clone(), id, is_0rtt),
RecvStream::new(conn, id, is_0rtt),
)))
}
}

fn poll_accept<'a>(
ctx: &mut Context<'_>,
conn: &'a ConnectionRef,
mut notify: Pin<&mut Notified<'a>>,
dir: Dir,
) -> Poll<Result<(ConnectionRef, StreamId, bool), ConnectionError>> {
let mut state = conn.state.lock("poll_accept");
// Check for incoming streams before checking `state.error` so that already-received streams,
// which are necessarily finite, can be drained from a closed connection.
if let Some(id) = state.inner.streams().accept(dir) {
let is_0rtt = state.inner.is_handshaking();
state.wake(); // To send additional stream ID credit
drop(state); // Release the lock so clone can take it
return Poll::Ready(Ok((conn.clone(), id, is_0rtt)));
} else if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
}
loop {
match notify.as_mut().poll(ctx) {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => notify.set(conn.shared.stream_incoming[dir as usize].notified()),
}
}
}

pin_project! {
/// Future produced by [`Connection::read_datagram`]
pub struct ReadDatagram<'a> {
conn: ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}

impl Future for ReadDatagram<'_> {
type Output = Result<Bytes, ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut conn = this.conn.state.lock("ReadDatagram::poll");
// Check for buffered datagrams before checking `state.error` so that already-received
// datagrams, which are necessarily finite, can be drained from a closed connection.
if let Some(x) = conn.inner.datagrams().recv() {
Poll::Ready(Ok(x))
} else if let Some(ref e) = conn.error {
Poll::Ready(Err(e.clone()))
} else if this.notify.poll(ctx).is_pending() {
// `conn` lock ensures we don't race with readiness
Poll::Pending
} else {
unreachable!("ReadDatagram notified with no datagrams pending");
}
}
}

/// Stream of unordered, unreliable datagrams sent by the peer
#[derive(Debug)]
pub struct Datagrams(ConnectionRef);
Expand Down Expand Up @@ -809,6 +931,8 @@ pub struct ConnectionInner {
#[derive(Debug, Default)]
pub(crate) struct Shared {
stream_opening: [Notify; 2],
stream_incoming: [Notify; 2],
datagrams: Notify,
closed: Notify,
}

Expand Down Expand Up @@ -932,16 +1056,19 @@ impl State {
}
}
Stream(StreamEvent::Opened { dir: Dir::Uni }) => {
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
}
Stream(StreamEvent::Opened { dir: Dir::Bi }) => {
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
if let Some(x) = self.incoming_bi_streams_reader.take() {
x.wake();
}
}
DatagramReceived => {
shared.datagrams.notify_waiters();
if let Some(x) = self.datagram_reader.take() {
x.wake();
}
Expand Down Expand Up @@ -1047,12 +1174,15 @@ impl State {
}
shared.stream_opening[Dir::Uni as usize].notify_waiters();
shared.stream_opening[Dir::Bi as usize].notify_waiters();
shared.stream_incoming[Dir::Uni as usize].notify_waiters();
shared.stream_incoming[Dir::Bi as usize].notify_waiters();
if let Some(x) = self.incoming_uni_streams_reader.take() {
x.wake();
}
if let Some(x) = self.incoming_bi_streams_reader.take() {
x.wake();
}
shared.datagrams.notify_waiters();
if let Some(x) = self.datagram_reader.take() {
x.wake();
}
Expand Down
4 changes: 2 additions & 2 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub use proto::{
};

pub use crate::connection::{
Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams, NewConnection,
SendDatagramError, UnknownStream, ZeroRttAccepted,
AcceptBi, AcceptUni, Connecting, Connection, Datagrams, IncomingBiStreams, IncomingUniStreams,
NewConnection, ReadDatagram, SendDatagramError, UnknownStream, ZeroRttAccepted,
};
pub use crate::endpoint::{Endpoint, Incoming};
pub use crate::recv_stream::{ReadError, ReadExactError, ReadToEndError, RecvStream};
Expand Down

0 comments on commit 6ba0051

Please sign in to comment.