diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 6d20d21ae..c90d9b4b9 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -81,8 +81,8 @@ pub use streams::StreamsState; #[cfg(not(fuzzing))] use streams::StreamsState; pub use streams::{ - BytesSource, Chunks, FinishError, ReadError, ReadableError, RecvStream, SendStream, - StreamEvent, Streams, UnknownStream, WriteError, Written, + BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream, + SendStream, StreamEvent, Streams, WriteError, Written, }; mod timer; diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index e5728c751..d2edf80e9 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -126,11 +126,11 @@ impl<'a> RecvStream<'a> { /// Stop accepting data on the given receive stream /// /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further - /// attempts to operate on a stream will yield `UnknownStream` errors. - pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> { + /// attempts to operate on a stream will yield `ClosedStream` errors. + pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { let mut entry = match self.state.recv.entry(self.id) { hash_map::Entry::Occupied(s) => s, - hash_map::Entry::Vacant(_) => return Err(UnknownStream { _private: () }), + hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }), }; let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut()); @@ -214,7 +214,7 @@ impl<'a> SendStream<'a> { .send .get_mut(&self.id) .map(get_or_insert_send(max_send_data)) - .ok_or(WriteError::UnknownStream)?; + .ok_or(WriteError::ClosedStream)?; if limit == 0 { trace!( @@ -240,11 +240,11 @@ impl<'a> SendStream<'a> { } /// Check if this stream was stopped, get the reason if it was - pub fn stopped(&mut self) -> Result, UnknownStream> { + pub fn stopped(&mut self) -> Result, ClosedStream> { match self.state.send.get(&self.id).as_ref() { Some(Some(s)) => Ok(s.stop_reason), Some(None) => Ok(None), - None => Err(UnknownStream { _private: () }), + None => Err(ClosedStream { _private: () }), } } @@ -260,7 +260,7 @@ impl<'a> SendStream<'a> { .send .get_mut(&self.id) .map(get_or_insert_send(max_send_data)) - .ok_or(FinishError::UnknownStream)?; + .ok_or(FinishError::ClosedStream)?; let was_pending = stream.is_pending(); stream.finish()?; @@ -275,18 +275,18 @@ impl<'a> SendStream<'a> { /// /// # Panics /// - when applied to a receive stream - pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> { + pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { let max_send_data = self.state.max_send_data(self.id); let stream = self .state .send .get_mut(&self.id) .map(get_or_insert_send(max_send_data)) - .ok_or(UnknownStream { _private: () })?; + .ok_or(ClosedStream { _private: () })?; if matches!(stream.state, SendState::ResetSent) { // Redundant reset call - return Err(UnknownStream { _private: () }); + return Err(ClosedStream { _private: () }); } // Restore the portion of the send window consumed by the data that we aren't about to @@ -304,14 +304,14 @@ impl<'a> SendStream<'a> { /// /// # Panics /// - when applied to a receive stream - pub fn set_priority(&mut self, priority: i32) -> Result<(), UnknownStream> { + pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> { let max_send_data = self.state.max_send_data(self.id); let stream = self .state .send .get_mut(&self.id) .map(get_or_insert_send(max_send_data)) - .ok_or(UnknownStream { _private: () })?; + .ok_or(ClosedStream { _private: () })?; stream.priority = priority; Ok(()) @@ -321,12 +321,12 @@ impl<'a> SendStream<'a> { /// /// # Panics /// - when applied to a receive stream - pub fn priority(&self) -> Result { + pub fn priority(&self) -> Result { let stream = self .state .send .get(&self.id) - .ok_or(UnknownStream { _private: () })?; + .ok_or(ClosedStream { _private: () })?; Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default()) } @@ -438,8 +438,8 @@ impl ShouldTransmit { /// Error indicating that a stream has not been opened or has already been finished or reset #[derive(Debug, Error, Clone, PartialEq, Eq)] -#[error("unknown stream")] -pub struct UnknownStream { +#[error("closed stream")] +pub struct ClosedStream { _private: (), } diff --git a/quinn-proto/src/connection/streams/recv.rs b/quinn-proto/src/connection/streams/recv.rs index 6f46ec865..a8b9fbb0e 100644 --- a/quinn-proto/src/connection/streams/recv.rs +++ b/quinn-proto/src/connection/streams/recv.rs @@ -5,7 +5,7 @@ use thiserror::Error; use tracing::debug; use super::state::get_or_insert_recv; -use super::{Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState, UnknownStream}; +use super::{ClosedStream, Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState}; use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead}; use crate::{frame, TransportError, VarInt}; @@ -73,9 +73,9 @@ impl Recv { Ok((new_bytes, frame.fin && self.stopped)) } - pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), UnknownStream> { + pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> { if self.stopped { - return Err(UnknownStream { _private: () }); + return Err(ClosedStream { _private: () }); } self.stopped = true; @@ -218,12 +218,12 @@ impl<'a> Chunks<'a> { ) -> Result { let mut entry = match streams.recv.entry(id) { Entry::Occupied(entry) => entry, - Entry::Vacant(_) => return Err(ReadableError::UnknownStream), + Entry::Vacant(_) => return Err(ReadableError::ClosedStream), }; let mut recv = match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped { - true => return Err(ReadableError::UnknownStream), + true => return Err(ReadableError::ClosedStream), false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with }; @@ -359,8 +359,8 @@ pub enum ReadError { #[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum ReadableError { /// The stream has not been opened or was already stopped, finished, or reset - #[error("unknown stream")] - UnknownStream, + #[error("closed stream")] + ClosedStream, /// Attempted an ordered read following an unordered read /// /// Performing an unordered read allows discontinuities to arise in the receive buffer of a diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs index 4a26cd01c..060aaf54c 100644 --- a/quinn-proto/src/connection/streams/send.rs +++ b/quinn-proto/src/connection/streams/send.rs @@ -45,7 +45,7 @@ impl Send { self.fin_pending = true; Ok(()) } else { - Err(FinishError::UnknownStream) + Err(FinishError::ClosedStream) } } @@ -55,7 +55,7 @@ impl Send { limit: u64, ) -> Result { if !self.is_writable() { - return Err(WriteError::UnknownStream); + return Err(WriteError::ClosedStream); } if let Some(error_code) = self.stop_reason { return Err(WriteError::Stopped(error_code)); @@ -274,8 +274,8 @@ pub enum WriteError { #[error("stopped by peer: code {0}")] Stopped(VarInt), /// The stream has not been opened or has already been finished or reset - #[error("unknown stream")] - UnknownStream, + #[error("closed stream")] + ClosedStream, } #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -300,8 +300,8 @@ pub enum FinishError { #[error("stopped by peer: code {0}")] Stopped(VarInt), /// The stream has not been opened or was already finished or reset - #[error("unknown stream")] - UnknownStream, + #[error("closed stream")] + ClosedStream, } #[cfg(test)] diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index 9077dc0b7..76ce2c178 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -1111,8 +1111,8 @@ mod tests { assert!(!recv.pending.max_data); assert!(recv.stop(0u32.into()).is_err()); - assert_eq!(recv.read(true).err(), Some(ReadableError::UnknownStream)); - assert_eq!(recv.read(false).err(), Some(ReadableError::UnknownStream)); + assert_eq!(recv.read(true).err(), Some(ReadableError::ClosedStream)); + assert_eq!(recv.read(false).err(), Some(ReadableError::ClosedStream)); assert_eq!(client.local_max_data - initial_max, 32); assert_eq!( @@ -1220,7 +1220,7 @@ mod tests { assert_eq!(stream.write(&[]), Err(WriteError::Stopped(error_code))); stream.reset(0u32.into()).unwrap(); - assert_eq!(stream.write(&[]), Err(WriteError::UnknownStream)); + assert_eq!(stream.write(&[]), Err(WriteError::ClosedStream)); // A duplicate frame is a no-op stream.state.received_stop_sending(id, error_code); diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index cdf4ed657..71777aad6 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -42,9 +42,9 @@ pub use varint::{VarInt, VarIntBoundsExceeded}; mod connection; pub use crate::connection::{ - BytesSource, Chunk, Chunks, Connection, ConnectionError, ConnectionStats, Datagrams, Event, - FinishError, FrameStats, PathStats, ReadError, ReadableError, RecvStream, RttEstimator, - SendDatagramError, SendStream, StreamEvent, Streams, UdpStats, UnknownStream, WriteError, + BytesSource, Chunk, Chunks, ClosedStream, Connection, ConnectionError, ConnectionStats, + Datagrams, Event, FinishError, FrameStats, PathStats, ReadError, ReadableError, RecvStream, + RttEstimator, SendDatagramError, SendStream, StreamEvent, Streams, UdpStats, WriteError, Written, }; diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 89c1733c5..9dcff42d0 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -1268,25 +1268,25 @@ const MAX_TRANSMIT_DATAGRAMS: usize = 20; /// Error indicating that a stream has already been finished or reset #[derive(Debug, Error, Clone, PartialEq, Eq)] -#[error("unknown stream")] -pub struct UnknownStream { +#[error("closed stream")] +pub struct ClosedStream { _private: (), } -impl UnknownStream { +impl ClosedStream { pub(crate) fn new() -> Self { Self { _private: () } } } -impl From for UnknownStream { - fn from(_: proto::UnknownStream) -> Self { +impl From for ClosedStream { + fn from(_: proto::ClosedStream) -> Self { Self { _private: () } } } -impl From for io::Error { - fn from(x: UnknownStream) -> Self { +impl From for io::Error { + fn from(x: ClosedStream) -> Self { Self::new(io::ErrorKind::NotConnected, x) } } diff --git a/quinn/src/lib.rs b/quinn/src/lib.rs index f928d9ca3..c48887308 100644 --- a/quinn/src/lib.rs +++ b/quinn/src/lib.rs @@ -71,8 +71,8 @@ pub use rustls; pub use udp; pub use crate::connection::{ - AcceptBi, AcceptUni, Connecting, Connection, OpenBi, OpenUni, ReadDatagram, SendDatagramError, - UnknownStream, ZeroRttAccepted, + AcceptBi, AcceptUni, ClosedStream, Connecting, Connection, OpenBi, OpenUni, ReadDatagram, + SendDatagramError, ZeroRttAccepted, }; pub use crate::endpoint::{Accept, Endpoint}; pub use crate::incoming::{Incoming, IncomingFuture, RetryError}; diff --git a/quinn/src/recv_stream.rs b/quinn/src/recv_stream.rs index ace12035d..9f0ee7399 100644 --- a/quinn/src/recv_stream.rs +++ b/quinn/src/recv_stream.rs @@ -11,7 +11,7 @@ use thiserror::Error; use tokio::io::ReadBuf; use crate::{ - connection::{ConnectionRef, UnknownStream}, + connection::{ClosedStream, ConnectionRef}, VarInt, }; @@ -259,8 +259,8 @@ impl RecvStream { /// Stop accepting data /// /// Discards unread data and notifies the peer to stop transmitting. Once stopped, further - /// attempts to operate on a stream will yield `UnknownStream` errors. - pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> { + /// attempts to operate on a stream will yield `ClosedStream` errors. + pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { let mut conn = self.conn.state.lock("RecvStream::stop"); if self.is_0rtt && conn.check_0rtt().is_err() { return Ok(()); @@ -459,7 +459,7 @@ impl Drop for RecvStream { return; } if !self.all_data_read { - // Ignore UnknownStream errors + // Ignore ClosedStream errors let _ = conn.inner.recv_stream(self.stream).stop(0u32.into()); conn.wake(); } @@ -478,8 +478,8 @@ pub enum ReadError { #[error("connection lost")] ConnectionLost(#[from] ConnectionError), /// The stream has already been stopped, finished, or reset - #[error("unknown stream")] - UnknownStream, + #[error("closed stream")] + ClosedStream, /// Attempted an ordered read following an unordered read /// /// Performing an unordered read allows discontinuities to arise in the receive buffer of a @@ -499,7 +499,7 @@ pub enum ReadError { impl From for ReadError { fn from(e: ReadableError) -> Self { match e { - ReadableError::UnknownStream => Self::UnknownStream, + ReadableError::ClosedStream => Self::ClosedStream, ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead, } } @@ -510,7 +510,7 @@ impl From for io::Error { use self::ReadError::*; let kind = match x { Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset, - ConnectionLost(_) | UnknownStream => io::ErrorKind::NotConnected, + ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected, IllegalOrderedRead => io::ErrorKind::InvalidInput, }; Self::new(kind, x) diff --git a/quinn/src/send_stream.rs b/quinn/src/send_stream.rs index 8edbd8769..ceaacb1d3 100644 --- a/quinn/src/send_stream.rs +++ b/quinn/src/send_stream.rs @@ -10,7 +10,7 @@ use proto::{ConnectionError, FinishError, StreamId, Written}; use thiserror::Error; use crate::{ - connection::{ConnectionRef, UnknownStream}, + connection::{ClosedStream, ConnectionRef}, VarInt, }; @@ -120,8 +120,8 @@ impl SendStream { Err(Stopped(error_code)) => { return Poll::Ready(Err(WriteError::Stopped(error_code))); } - Err(UnknownStream) => { - return Poll::Ready(Err(WriteError::UnknownStream)); + Err(ClosedStream) => { + return Poll::Ready(Err(WriteError::ClosedStream)); } }; @@ -140,14 +140,14 @@ impl SendStream { /// May fail if [`finish()`](Self::finish) or [`reset()`](Self::reset) was previously /// called. This error is harmless and serves only to indicate that the caller may have /// incorrect assumptions about the stream's state. - pub fn finish(&mut self) -> Result<(), UnknownStream> { + pub fn finish(&mut self) -> Result<(), ClosedStream> { let mut conn = self.conn.state.lock("finish"); match conn.inner.send_stream(self.stream).finish() { Ok(()) => { conn.wake(); Ok(()) } - Err(FinishError::UnknownStream) => Err(UnknownStream::new()), + Err(FinishError::ClosedStream) => Err(ClosedStream::new()), // Harmless. If the application needs to know about stopped streams at this point, it // should call `stopped`. Err(FinishError::Stopped(_)) => Ok(()), @@ -163,7 +163,7 @@ impl SendStream { /// May fail if [`finish()`](Self::finish) or [`reset()`](Self::reset) was previously /// called. This error is harmless and serves only to indicate that the caller may have /// incorrect assumptions about the stream's state. - pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> { + pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> { let mut conn = self.conn.state.lock("SendStream::reset"); if self.is_0rtt && conn.check_0rtt().is_err() { return Ok(()); @@ -180,14 +180,14 @@ impl SendStream { /// the priority of a stream with pending data may only take effect after that data has been /// transmitted. Using many different priority levels per connection may have a negative /// impact on performance. - pub fn set_priority(&self, priority: i32) -> Result<(), UnknownStream> { + pub fn set_priority(&self, priority: i32) -> Result<(), ClosedStream> { let mut conn = self.conn.state.lock("SendStream::set_priority"); conn.inner.send_stream(self.stream).set_priority(priority)?; Ok(()) } /// Get the priority of the send stream - pub fn priority(&self) -> Result { + pub fn priority(&self) -> Result { let mut conn = self.conn.state.lock("SendStream::priority"); Ok(conn.inner.send_stream(self.stream).priority()?) } @@ -295,7 +295,7 @@ impl Drop for SendStream { } } // Already finished or reset, which is fine. - Err(FinishError::UnknownStream) => {} + Err(FinishError::ClosedStream) => {} } } } @@ -434,8 +434,8 @@ pub enum WriteError { #[error("connection lost")] ConnectionLost(#[from] ConnectionError), /// The stream has already been finished or reset - #[error("unknown stream")] - UnknownStream, + #[error("closed stream")] + ClosedStream, /// This was a 0-RTT stream and the server rejected it /// /// Can only occur on clients for 0-RTT streams, which can be opened using @@ -446,10 +446,10 @@ pub enum WriteError { ZeroRttRejected, } -impl From for WriteError { +impl From for WriteError { #[inline] - fn from(_: UnknownStream) -> Self { - Self::UnknownStream + fn from(_: ClosedStream) -> Self { + Self::ClosedStream } } @@ -483,7 +483,7 @@ impl From for io::Error { use self::WriteError::*; let kind = match x { Stopped(_) | ZeroRttRejected => io::ErrorKind::ConnectionReset, - ConnectionLost(_) | UnknownStream => io::ErrorKind::NotConnected, + ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected, }; Self::new(kind, x) } diff --git a/quinn/tests/many_connections.rs b/quinn/tests/many_connections.rs index 7afae35ff..ff98539ab 100644 --- a/quinn/tests/many_connections.rs +++ b/quinn/tests/many_connections.rs @@ -103,10 +103,9 @@ async fn read_from_peer(mut stream: quinn::RecvStream) -> Result<(), quinn::Conn use quinn::ReadToEndError::*; use ReadError::*; match e { - TooLong - | Read(UnknownStream) - | Read(ZeroRttRejected) - | Read(IllegalOrderedRead) => unreachable!(), + TooLong | Read(ClosedStream) | Read(ZeroRttRejected) | Read(IllegalOrderedRead) => { + unreachable!() + } Read(Reset(error_code)) => panic!("unexpected stream reset: {error_code}"), Read(ConnectionLost(e)) => Err(e), }