Skip to content

Commit

Permalink
Introduce RecvStream::reset
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed May 19, 2024
1 parent 1b913cb commit 22c56a5
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 1 deletion.
23 changes: 23 additions & 0 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,29 @@ impl<'a> RecvStream<'a> {

Ok(())
}

/// Check whether this stream has been reset by the peer, returning the reset error code if so
pub fn reset(&mut self) -> Result<Option<VarInt>, ClosedStream> {
let hash_map::Entry::Occupied(entry) = self.state.recv.entry(self.id) else {
return Err(ClosedStream { _private: () });
};
let Some(s) = entry.get().as_ref() else {
return Ok(None);
};
if s.stopped {
return Err(ClosedStream { _private: () });
}
let Some(code) = s.reset_code() else {
return Ok(None);
};

// Clean up state after application observes the reset, since there's no reason for the
// application to attempt to read or stop the stream once it knows it's reset
entry.remove_entry();
self.state.stream_freed(self.id, StreamHalf::Recv);

Ok(Some(code))
}
}

/// Access to streams
Expand Down
7 changes: 7 additions & 0 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ impl Recv {
Ok(true)
}

pub(super) fn reset_code(&self) -> Option<VarInt> {
match self.state {
RecvState::ResetRecvd { error_code, .. } => Some(error_code),
_ => None,
}
}

/// Compute the amount of flow control credit consumed, or return an error if more was consumed
/// than issued
fn credit_consumed_by(
Expand Down
86 changes: 85 additions & 1 deletion quinn/src/recv_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
future::Future,
future::{poll_fn, Future},
io,
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -284,6 +284,54 @@ impl RecvStream {
self.stream
}

/// Completes when the stream is reset by the peer or otherwise closed
///
/// **Does not** actively cause the stream to become reset. Contrast [`stop()`](Self::stop),
/// which notifies the peer that the stream will no longer be read and should be reset.
///
/// Yields `Some` with the reset error code when the stream is reset by the peer. Yields `None`
/// when the stream was previously [`stop()`](Self::stop)ed, or when the stream was
/// [`finish()`](crate::SendStream::finish)ed by the peer and all data has been received, after
/// which it is no longer meaningful for the stream to be reset.
///
/// Notification of a reset is delivered exactly once. After a reset error code has been
/// delivered by *any* means, including calls to `reset()` or any operation that returns
/// [`ReadError::Reset`], future calls to `reset()` will return `Ok(None)`, and future calls to
/// methods that can return [`ReadError`] will return [`ReadError::ClosedStream`].
pub async fn reset(&mut self) -> Result<Option<VarInt>, ResetError> {
poll_fn(|cx| {
let mut conn = self.conn.state.lock("RecvStream::reset");
if self.is_0rtt {
if conn.check_0rtt().is_err() {
return Poll::Ready(Err(ResetError::ZeroRttRejected));
}
}

if let Some(code) = self.reset.take() {
return Poll::Ready(Ok(Some(code)));
}

match conn.inner.recv_stream(self.stream).reset() {
Err(_) => Poll::Ready(Ok(None)),
Ok(Some(error_code)) => {
// Stream state has just now been freed, so the connection may need to issue new
// stream ID flow control credit
conn.wake();
Poll::Ready(Ok(Some(error_code)))
}
Ok(None) => {
// Resets always notify readers, since a reset is an immediate read error. We
// could introduce a dedicated channel to reduce the risk of spurious wakeups,
// but that increased complexity is probably not justified, as an application
// that is expecting a reset is not likely to receive large amounts of data.
conn.blocked_readers.insert(self.stream, cx.waker().clone());
Poll::Pending
}
}
})
.await
}

/// Handle common logic related to reading out of a receive stream
///
/// This takes an `FnMut` closure that takes care of the actual reading process, matching
Expand Down Expand Up @@ -505,6 +553,15 @@ impl From<ReadableError> for ReadError {
}
}

impl From<ResetError> for ReadError {
fn from(e: ResetError) -> Self {
match e {
ResetError::ConnectionLost(e) => Self::ConnectionLost(e),
ResetError::ZeroRttRejected => Self::ZeroRttRejected,
}
}
}

impl From<ReadError> for io::Error {
fn from(x: ReadError) -> Self {
use self::ReadError::*;
Expand All @@ -517,6 +574,33 @@ impl From<ReadError> for io::Error {
}
}

/// Errors that arise while waiting for a stream to be reset
#[derive(Debug, Error, Clone, PartialEq, Eq)]
pub enum ResetError {
/// The connection was lost
#[error("connection lost")]
ConnectionLost(#[from] ConnectionError),
/// This was a 0-RTT stream and the server rejected it
///
/// Can only occur on clients for 0-RTT streams, which can be opened using
/// [`Connecting::into_0rtt()`].
///
/// [`Connecting::into_0rtt()`]: crate::Connecting::into_0rtt()
#[error("0-RTT rejected")]
ZeroRttRejected,
}

impl From<ResetError> for io::Error {
fn from(x: ResetError) -> Self {
use ResetError::*;
let kind = match x {
ZeroRttRejected => io::ErrorKind::ConnectionReset,
ConnectionLost(_) => io::ErrorKind::NotConnected,
};
Self::new(kind, x)
}
}

/// Future produced by [`RecvStream::read()`].
///
/// [`RecvStream::read()`]: crate::RecvStream::read
Expand Down

0 comments on commit 22c56a5

Please sign in to comment.