From 5237870ad7846ffd666b2685380e4316e0bf73a3 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Sun, 19 May 2024 11:19:33 -0700 Subject: [PATCH] Introduce RecvStream::reset --- quinn-proto/src/connection/streams/mod.rs | 23 ++++++ quinn-proto/src/connection/streams/recv.rs | 7 ++ quinn/src/recv_stream.rs | 84 +++++++++++++++++++++- 3 files changed, 113 insertions(+), 1 deletion(-) diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index e58613ed58..5a3ac5f1f6 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -156,6 +156,29 @@ impl<'a> RecvStream<'a> { Ok(()) } + + /// Check whether this stream has been reset by the peer, returning the reset error code if so + pub fn reset(&mut self) -> Result, ClosedStream> { + let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else { + return Err(ClosedStream { _private: () }); + }; + let Some(s) = entry.get().as_ref() else { + return Ok(None); + }; + if s.stopped { + return Err(ClosedStream { _private: () }); + } + let Some(code) = s.reset_code() else { + return Ok(None); + }; + + // Clean up state after application observes the reset, since there's no reason for the + // application to attempt to read or stop the stream once it knows it's reset + entry.remove_entry(); + self.state.stream_freed(self.id, StreamHalf::Recv); + + Ok(Some(code)) + } } /// Access to streams diff --git a/quinn-proto/src/connection/streams/recv.rs b/quinn-proto/src/connection/streams/recv.rs index a8b9fbb0e7..b89e88d178 100644 --- a/quinn-proto/src/connection/streams/recv.rs +++ b/quinn-proto/src/connection/streams/recv.rs @@ -173,6 +173,13 @@ impl Recv { Ok(true) } + pub(super) fn reset_code(&self) -> Option { + match self.state { + RecvState::ResetRecvd { error_code, .. } => Some(error_code), + _ => None, + } + } + /// Compute the amount of flow control credit consumed, or return an error if more was consumed /// than issued fn credit_consumed_by( diff --git a/quinn/src/recv_stream.rs b/quinn/src/recv_stream.rs index 9f0ee73992..d97e87d149 100644 --- a/quinn/src/recv_stream.rs +++ b/quinn/src/recv_stream.rs @@ -1,5 +1,5 @@ use std::{ - future::Future, + future::{poll_fn, Future}, io, pin::Pin, task::{Context, Poll}, @@ -284,6 +284,52 @@ impl RecvStream { self.stream } + /// Completes when the stream is reset by the peer or otherwise closed + /// + /// **Does not** actively cause the stream to become reset. Contrast [`stop()`](Self::stop), + /// which notifies the peer that the stream will no longer be read and should be reset. + /// + /// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None` + /// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was + /// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after + /// which it is no longer meaningful for the stream to be reset. + /// + /// Notification of a reset is delivered exactly once. After a reset error code has been + /// delivered by *any* means, including calls to `reset()` or any operation that returns + /// [`ReadError::Reset`], future calls to `reset()` will return `Ok(None)`, and future calls to + /// methods that can return [`ReadError`] will return [`ReadError::ClosedStream`]. + pub async fn reset(&mut self) -> Result, ResetError> { + poll_fn(|cx| { + let mut conn = self.conn.state.lock("RecvStream::reset"); + if self.is_0rtt && conn.check_0rtt().is_err() { + return Poll::Ready(Err(ResetError::ZeroRttRejected)); + } + + if let Some(code) = self.reset.take() { + return Poll::Ready(Ok(Some(code))); + } + + match conn.inner.recv_stream(self.stream).reset() { + Err(_) => Poll::Ready(Ok(None)), + Ok(Some(error_code)) => { + // Stream state has just now been freed, so the connection may need to issue new + // stream ID flow control credit + conn.wake(); + Poll::Ready(Ok(Some(error_code))) + } + Ok(None) => { + // Resets always notify readers, since a reset is an immediate read error. We + // could introduce a dedicated channel to reduce the risk of spurious wakeups, + // but that increased complexity is probably not justified, as an application + // that is expecting a reset is not likely to receive large amounts of data. + conn.blocked_readers.insert(self.stream, cx.waker().clone()); + Poll::Pending + } + } + }) + .await + } + /// Handle common logic related to reading out of a receive stream /// /// This takes an `FnMut` closure that takes care of the actual reading process, matching @@ -505,6 +551,15 @@ impl From for ReadError { } } +impl From for ReadError { + fn from(e: ResetError) -> Self { + match e { + ResetError::ConnectionLost(e) => Self::ConnectionLost(e), + ResetError::ZeroRttRejected => Self::ZeroRttRejected, + } + } +} + impl From for io::Error { fn from(x: ReadError) -> Self { use self::ReadError::*; @@ -517,6 +572,33 @@ impl From for io::Error { } } +/// Errors that arise while waiting for a stream to be reset +#[derive(Debug, Error, Clone, PartialEq, Eq)] +pub enum ResetError { + /// The connection was lost + #[error("connection lost")] + ConnectionLost(#[from] ConnectionError), + /// This was a 0-RTT stream and the server rejected it + /// + /// Can only occur on clients for 0-RTT streams, which can be opened using + /// [`Connecting::into_0rtt()`]. + /// + /// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt() + #[error("0-RTT rejected")] + ZeroRttRejected, +} + +impl From for io::Error { + fn from(x: ResetError) -> Self { + use ResetError::*; + let kind = match x { + ZeroRttRejected => io::ErrorKind::ConnectionReset, + ConnectionLost(_) => io::ErrorKind::NotConnected, + }; + Self::new(kind, x) + } +} + /// Future produced by [`RecvStream::read()`]. /// /// [`RecvStream::read()`]: crate::RecvStream::read