Skip to content

Commit

Permalink
Rename UnknownStream to ClosedStream for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed May 3, 2024
1 parent 130d956 commit 0e9a196
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 73 deletions.
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ pub use streams::StreamsState;
#[cfg(not(fuzzing))]
use streams::StreamsState;
pub use streams::{
BytesSource, Chunks, FinishError, ReadError, ReadableError, RecvStream, SendStream,
StreamEvent, Streams, UnknownStream, WriteError, Written,
BytesSource, Chunks, ClosedStream, FinishError, ReadError, ReadableError, RecvStream,
SendStream, StreamEvent, Streams, WriteError, Written,
};

mod timer;
Expand Down
32 changes: 16 additions & 16 deletions quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ impl<'a> RecvStream<'a> {
/// Stop accepting data on the given receive stream
///
/// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
/// attempts to operate on a stream will yield `UnknownStream` errors.
pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
/// attempts to operate on a stream will yield `ClosedStream` errors.
pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
let mut entry = match self.state.recv.entry(self.id) {
hash_map::Entry::Occupied(s) => s,
hash_map::Entry::Vacant(_) => return Err(UnknownStream { _private: () }),
hash_map::Entry::Vacant(_) => return Err(ClosedStream { _private: () }),
};
let stream = get_or_insert_recv(self.state.stream_receive_window)(entry.get_mut());

Expand Down Expand Up @@ -214,7 +214,7 @@ impl<'a> SendStream<'a> {
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(WriteError::UnknownStream)?;
.ok_or(WriteError::ClosedStream)?;

if limit == 0 {
trace!(
Expand All @@ -240,11 +240,11 @@ impl<'a> SendStream<'a> {
}

/// Check if this stream was stopped, get the reason if it was
pub fn stopped(&mut self) -> Result<Option<VarInt>, UnknownStream> {
pub fn stopped(&mut self) -> Result<Option<VarInt>, ClosedStream> {
match self.state.send.get(&self.id).as_ref() {
Some(Some(s)) => Ok(s.stop_reason),
Some(None) => Ok(None),
None => Err(UnknownStream { _private: () }),
None => Err(ClosedStream { _private: () }),
}
}

Expand All @@ -260,7 +260,7 @@ impl<'a> SendStream<'a> {
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(FinishError::UnknownStream)?;
.ok_or(FinishError::ClosedStream)?;

let was_pending = stream.is_pending();
stream.finish()?;
Expand All @@ -275,18 +275,18 @@ impl<'a> SendStream<'a> {
///
/// # Panics
/// - when applied to a receive stream
pub fn reset(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
pub fn reset(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(UnknownStream { _private: () })?;
.ok_or(ClosedStream { _private: () })?;

if matches!(stream.state, SendState::ResetSent) {
// Redundant reset call
return Err(UnknownStream { _private: () });
return Err(ClosedStream { _private: () });
}

// Restore the portion of the send window consumed by the data that we aren't about to
Expand All @@ -304,14 +304,14 @@ impl<'a> SendStream<'a> {
///
/// # Panics
/// - when applied to a receive stream
pub fn set_priority(&mut self, priority: i32) -> Result<(), UnknownStream> {
pub fn set_priority(&mut self, priority: i32) -> Result<(), ClosedStream> {
let max_send_data = self.state.max_send_data(self.id);
let stream = self
.state
.send
.get_mut(&self.id)
.map(get_or_insert_send(max_send_data))
.ok_or(UnknownStream { _private: () })?;
.ok_or(ClosedStream { _private: () })?;

stream.priority = priority;
Ok(())
Expand All @@ -321,12 +321,12 @@ impl<'a> SendStream<'a> {
///
/// # Panics
/// - when applied to a receive stream
pub fn priority(&self) -> Result<i32, UnknownStream> {
pub fn priority(&self) -> Result<i32, ClosedStream> {
let stream = self
.state
.send
.get(&self.id)
.ok_or(UnknownStream { _private: () })?;
.ok_or(ClosedStream { _private: () })?;

Ok(stream.as_ref().map(|s| s.priority).unwrap_or_default())
}
Expand Down Expand Up @@ -438,8 +438,8 @@ impl ShouldTransmit {

/// Error indicating that a stream has not been opened or has already been finished or reset
#[derive(Debug, Error, Clone, PartialEq, Eq)]
#[error("unknown stream")]
pub struct UnknownStream {
#[error("closed stream")]
pub struct ClosedStream {
_private: (),
}

Expand Down
14 changes: 7 additions & 7 deletions quinn-proto/src/connection/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use thiserror::Error;
use tracing::debug;

use super::state::get_or_insert_recv;
use super::{Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState, UnknownStream};
use super::{ClosedStream, Retransmits, ShouldTransmit, StreamHalf, StreamId, StreamsState};
use crate::connection::assembler::{Assembler, Chunk, IllegalOrderedRead};
use crate::{frame, TransportError, VarInt};

Expand Down Expand Up @@ -73,9 +73,9 @@ impl Recv {
Ok((new_bytes, frame.fin && self.stopped))
}

pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), UnknownStream> {
pub(super) fn stop(&mut self) -> Result<(u64, ShouldTransmit), ClosedStream> {
if self.stopped {
return Err(UnknownStream { _private: () });
return Err(ClosedStream { _private: () });
}

self.stopped = true;
Expand Down Expand Up @@ -218,12 +218,12 @@ impl<'a> Chunks<'a> {
) -> Result<Self, ReadableError> {
let mut entry = match streams.recv.entry(id) {
Entry::Occupied(entry) => entry,
Entry::Vacant(_) => return Err(ReadableError::UnknownStream),
Entry::Vacant(_) => return Err(ReadableError::ClosedStream),
};

let mut recv =
match get_or_insert_recv(streams.stream_receive_window)(entry.get_mut()).stopped {
true => return Err(ReadableError::UnknownStream),
true => return Err(ReadableError::ClosedStream),
false => entry.remove().unwrap(), // this can't fail due to the previous get_or_insert_with
};

Expand Down Expand Up @@ -359,8 +359,8 @@ pub enum ReadError {
#[derive(Debug, Error, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum ReadableError {
/// The stream has not been opened or was already stopped, finished, or reset
#[error("unknown stream")]
UnknownStream,
#[error("closed stream")]
ClosedStream,
/// Attempted an ordered read following an unordered read
///
/// Performing an unordered read allows discontinuities to arise in the receive buffer of a
Expand Down
12 changes: 6 additions & 6 deletions quinn-proto/src/connection/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Send {
self.fin_pending = true;
Ok(())
} else {
Err(FinishError::UnknownStream)
Err(FinishError::ClosedStream)
}
}

Expand All @@ -55,7 +55,7 @@ impl Send {
limit: u64,
) -> Result<Written, WriteError> {
if !self.is_writable() {
return Err(WriteError::UnknownStream);
return Err(WriteError::ClosedStream);
}
if let Some(error_code) = self.stop_reason {
return Err(WriteError::Stopped(error_code));
Expand Down Expand Up @@ -274,8 +274,8 @@ pub enum WriteError {
#[error("stopped by peer: code {0}")]
Stopped(VarInt),
/// The stream has not been opened or has already been finished or reset
#[error("unknown stream")]
UnknownStream,
#[error("closed stream")]
ClosedStream,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
Expand All @@ -300,8 +300,8 @@ pub enum FinishError {
#[error("stopped by peer: code {0}")]
Stopped(VarInt),
/// The stream has not been opened or was already finished or reset
#[error("unknown stream")]
UnknownStream,
#[error("closed stream")]
ClosedStream,
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,8 +1111,8 @@ mod tests {
assert!(!recv.pending.max_data);

assert!(recv.stop(0u32.into()).is_err());
assert_eq!(recv.read(true).err(), Some(ReadableError::UnknownStream));
assert_eq!(recv.read(false).err(), Some(ReadableError::UnknownStream));
assert_eq!(recv.read(true).err(), Some(ReadableError::ClosedStream));
assert_eq!(recv.read(false).err(), Some(ReadableError::ClosedStream));

assert_eq!(client.local_max_data - initial_max, 32);
assert_eq!(
Expand Down Expand Up @@ -1220,7 +1220,7 @@ mod tests {
assert_eq!(stream.write(&[]), Err(WriteError::Stopped(error_code)));

stream.reset(0u32.into()).unwrap();
assert_eq!(stream.write(&[]), Err(WriteError::UnknownStream));
assert_eq!(stream.write(&[]), Err(WriteError::ClosedStream));

// A duplicate frame is a no-op
stream.state.received_stop_sending(id, error_code);
Expand Down
6 changes: 3 additions & 3 deletions quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ pub use varint::{VarInt, VarIntBoundsExceeded};

mod connection;
pub use crate::connection::{
BytesSource, Chunk, Chunks, Connection, ConnectionError, ConnectionStats, Datagrams, Event,
FinishError, FrameStats, PathStats, ReadError, ReadableError, RecvStream, RttEstimator,
SendDatagramError, SendStream, StreamEvent, Streams, UdpStats, UnknownStream, WriteError,
BytesSource, Chunk, Chunks, ClosedStream, Connection, ConnectionError, ConnectionStats,
Datagrams, Event, FinishError, FrameStats, PathStats, ReadError, ReadableError, RecvStream,
RttEstimator, SendDatagramError, SendStream, StreamEvent, Streams, UdpStats, WriteError,
Written,
};

Expand Down
14 changes: 7 additions & 7 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1268,25 +1268,25 @@ const MAX_TRANSMIT_DATAGRAMS: usize = 20;

/// Error indicating that a stream has already been finished or reset
#[derive(Debug, Error, Clone, PartialEq, Eq)]
#[error("unknown stream")]
pub struct UnknownStream {
#[error("closed stream")]
pub struct ClosedStream {
_private: (),
}

impl UnknownStream {
impl ClosedStream {
pub(crate) fn new() -> Self {
Self { _private: () }
}
}

impl From<proto::UnknownStream> for UnknownStream {
fn from(_: proto::UnknownStream) -> Self {
impl From<proto::ClosedStream> for ClosedStream {
fn from(_: proto::ClosedStream) -> Self {
Self { _private: () }
}
}

impl From<UnknownStream> for io::Error {
fn from(x: UnknownStream) -> Self {
impl From<ClosedStream> for io::Error {
fn from(x: ClosedStream) -> Self {
Self::new(io::ErrorKind::NotConnected, x)
}
}
4 changes: 2 additions & 2 deletions quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ pub use rustls;
pub use udp;

pub use crate::connection::{
AcceptBi, AcceptUni, Connecting, Connection, OpenBi, OpenUni, ReadDatagram, SendDatagramError,
UnknownStream, ZeroRttAccepted,
AcceptBi, AcceptUni, ClosedStream, Connecting, Connection, OpenBi, OpenUni, ReadDatagram,
SendDatagramError, ZeroRttAccepted,
};
pub use crate::endpoint::{Accept, Endpoint};
pub use crate::incoming::{Incoming, IncomingFuture, RetryError};
Expand Down
16 changes: 8 additions & 8 deletions quinn/src/recv_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use thiserror::Error;
use tokio::io::ReadBuf;

use crate::{
connection::{ConnectionRef, UnknownStream},
connection::{ClosedStream, ConnectionRef},
VarInt,
};

Expand Down Expand Up @@ -259,8 +259,8 @@ impl RecvStream {
/// Stop accepting data
///
/// Discards unread data and notifies the peer to stop transmitting. Once stopped, further
/// attempts to operate on a stream will yield `UnknownStream` errors.
pub fn stop(&mut self, error_code: VarInt) -> Result<(), UnknownStream> {
/// attempts to operate on a stream will yield `ClosedStream` errors.
pub fn stop(&mut self, error_code: VarInt) -> Result<(), ClosedStream> {
let mut conn = self.conn.state.lock("RecvStream::stop");
if self.is_0rtt && conn.check_0rtt().is_err() {
return Ok(());
Expand Down Expand Up @@ -459,7 +459,7 @@ impl Drop for RecvStream {
return;
}
if !self.all_data_read {
// Ignore UnknownStream errors
// Ignore ClosedStream errors
let _ = conn.inner.recv_stream(self.stream).stop(0u32.into());
conn.wake();
}
Expand All @@ -478,8 +478,8 @@ pub enum ReadError {
#[error("connection lost")]
ConnectionLost(#[from] ConnectionError),
/// The stream has already been stopped, finished, or reset
#[error("unknown stream")]
UnknownStream,
#[error("closed stream")]
ClosedStream,
/// Attempted an ordered read following an unordered read
///
/// Performing an unordered read allows discontinuities to arise in the receive buffer of a
Expand All @@ -499,7 +499,7 @@ pub enum ReadError {
impl From<ReadableError> for ReadError {
fn from(e: ReadableError) -> Self {
match e {
ReadableError::UnknownStream => Self::UnknownStream,
ReadableError::ClosedStream => Self::ClosedStream,
ReadableError::IllegalOrderedRead => Self::IllegalOrderedRead,
}
}
Expand All @@ -510,7 +510,7 @@ impl From<ReadError> for io::Error {
use self::ReadError::*;
let kind = match x {
Reset { .. } | ZeroRttRejected => io::ErrorKind::ConnectionReset,
ConnectionLost(_) | UnknownStream => io::ErrorKind::NotConnected,
ConnectionLost(_) | ClosedStream => io::ErrorKind::NotConnected,
IllegalOrderedRead => io::ErrorKind::InvalidInput,
};
Self::new(kind, x)
Expand Down
Loading

0 comments on commit 0e9a196

Please sign in to comment.