Skip to content

Commit

Permalink
feat!: Add SendError for send-specific errors
Browse files Browse the repository at this point in the history
The `SendError` covers only the error possibilities when sending a
message, which makes it easier to handle by users. All APIs that simply
connect and send messages (or just send messages) have been changed to
return `SendError`.

One method that could not easily be changed is `try_send_message`, since
it returns a particular error variant when there is no matching
connection in the pool. This may be changed separately.

BREAKING CHANGE: The following APIs had their error type changed from
`Error` to `SendError`:

- `SendStream::send_user_msg`
- `SendStream::send`
- `SendStream::finish`
- `Endpoint::send_message`

Additionally, the `StreamWrite` and `MaxLengthExceeded` variants have
been removed from `Error`, and a `Send` variant has been added.
  • Loading branch information
Chris Connelly authored and connec committed Aug 27, 2021
1 parent 33e9b3b commit 73c6501
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 20 deletions.
12 changes: 6 additions & 6 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::Endpoint;

use super::{
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
error::{ConnectionError, Error, Result},
error::{ConnectionError, Error, Result, SendError},
wire_msg::WireMsg,
};
use bytes::Bytes;
Expand Down Expand Up @@ -57,7 +57,7 @@ impl<I: ConnId> Connection<I> {

/// Send message to peer using a uni-directional stream.
/// Priority default is 0. Both lower and higher can be passed in.
pub(crate) async fn send_uni(&self, msg: Bytes, priority: i32) -> Result<()> {
pub(crate) async fn send_uni(&self, msg: Bytes, priority: i32) -> Result<(), SendError> {
let mut send_stream = self.handle_error(self.quic_conn.open_uni().await).await?;

// quinn returns `UnknownStream` error if the stream does not exist. We ignore it, on the
Expand Down Expand Up @@ -148,17 +148,17 @@ impl SendStream {
}

/// Send a message using the stream created by the initiator
pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<()> {
pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<(), SendError> {
send_msg(&mut self.quinn_send_stream, msg).await
}

/// Send a wire message
pub(crate) async fn send(&mut self, msg: WireMsg) -> Result<()> {
pub(crate) async fn send(&mut self, msg: WireMsg) -> Result<(), SendError> {
msg.write_to_stream(&mut self.quinn_send_stream).await
}

/// Gracefully finish current stream
pub async fn finish(mut self) -> Result<()> {
pub async fn finish(mut self) -> Result<(), SendError> {
self.quinn_send_stream.finish().await?;
Ok(())
}
Expand All @@ -176,7 +176,7 @@ async fn read_bytes(recv: &mut quinn::RecvStream) -> Result<WireMsg> {
}

// Helper to send bytes to peer using the provided stream.
async fn send_msg(mut send_stream: &mut quinn::SendStream, msg: Bytes) -> Result<()> {
async fn send_msg(mut send_stream: &mut quinn::SendStream, msg: Bytes) -> Result<(), SendError> {
let wire_msg = WireMsg::UserMsg(msg);
wire_msg.write_to_stream(&mut send_stream).await?;
Ok(())
Expand Down
9 changes: 7 additions & 2 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectionEvents, RecvStream, SendStream,
},
error::{ClientEndpointError, ConnectionError, Result},
error::{ClientEndpointError, ConnectionError, Result, SendError},
};
use bytes::Bytes;
use std::net::SocketAddr;
Expand Down Expand Up @@ -489,7 +489,12 @@ impl<I: ConnId> Endpoint<I> {
/// to the peer first. If this connection is broken or doesn't exist
/// a new connection is created and the message is sent.
/// Priority default is 0. Both lower and higher can be passed in.
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr, priority: i32) -> Result<()> {
pub async fn send_message(
&self,
msg: Bytes,
dest: &SocketAddr,
priority: i32,
) -> Result<(), SendError> {
let connection = self.get_or_connect_to(dest).await?;
self.retry(|| async { Ok(connection.send_uni(msg.clone(), priority).await?) })
.await?;
Expand Down
87 changes: 81 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ pub enum Error {
/// The message type flag decoded in an incoming stream is invalid/unsupported.
#[error("Invalid message type flag found in message's header: {0}")]
InvalidMsgFlag(u8),
/// Error occurred when trying to write on a currently opened message stream.
#[error("Stream write error")]
StreamWrite(#[from] quinn::WriteError),
/// The expected amount of message bytes couldn't be read from the stream.
#[error("Failed to read expected number of message bytes: {0}")]
StreamRead(#[from] quinn::ReadExactError),
Expand All @@ -80,9 +77,6 @@ pub enum Error {
/// The type of message received is not the expected one.
#[error(transparent)]
UnexpectedMessageType(#[from] UnexpectedMessageType),
/// The message exceeds the maximum message length allowed.
#[error("Maximum data length exceeded, length: {0}")]
MaxLengthExceeded(usize),
/// Incorrect Public Address provided
#[error("Incorrect Public Address provided")]
IncorrectPublicAddress,
Expand All @@ -92,6 +86,9 @@ pub enum Error {
/// Couldn't resolve Public IP address
#[error("Unresolved Public IP address")]
UnresolvedPublicIp,
/// Failed to send message.
#[error("Failed to send message")]
Send(#[from] SendError),
}

impl From<quinn::ConnectionError> for Error {
Expand Down Expand Up @@ -311,3 +308,81 @@ impl From<WireMsg> for UnexpectedMessageType {
UnexpectedMessageType(msg)
}
}

/// Errors that can occur when sending messages.
#[derive(Debug, Error)]
pub enum SendError {
/// Failed to serialize message.
///
/// This likely indicates a bug in the library, since serializing to bytes should be infallible.
/// Limitations in the serde API mean we cannot verify this statically, and we don't want to
/// introduce potential panics.
#[error("Failed to serialize message")]
Serialization(#[source] SerializationError),

/// The serialized message is too long (max: 4 GiB).
#[error("The serialized message is too long ({0} bytes, max: 4 GiB)")]
TooLong(usize),

/// Connection was lost when trying to send a message.
#[error("Connection was lost when trying to send a message")]
ConnectionLost(#[from] ConnectionError),

/// Stream was lost when trying to send a message.
#[error("Stream was lost when trying to send a message")]
StreamLost(#[source] StreamError),
}

impl From<bincode::Error> for SendError {
fn from(error: bincode::Error) -> Self {
Self::Serialization(SerializationError(error))
}
}

impl From<quinn::ConnectionError> for SendError {
fn from(error: quinn::ConnectionError) -> Self {
Self::ConnectionLost(error.into())
}
}

impl From<quinn::WriteError> for SendError {
fn from(error: quinn::WriteError) -> Self {
match error {
quinn::WriteError::Stopped(code) => Self::StreamLost(StreamError::Stopped(code.into())),
quinn::WriteError::ConnectionClosed(error) => Self::ConnectionLost(error.into()),
quinn::WriteError::UnknownStream => Self::StreamLost(StreamError::Gone),
quinn::WriteError::ZeroRttRejected => Self::StreamLost(StreamError::Unsupported(
UnsupportedStreamOperation(error.into()),
)),
}
}
}

/// Failed to serialize message.
#[derive(Debug, Error)]
#[error(transparent)]
pub struct SerializationError(bincode::Error);

/// Errors that can occur when interacting with streams.
#[derive(Debug, Error)]
pub enum StreamError {
/// The peer abandoned the stream.
#[error("The peer abandoned the stream (error code: {0})")]
Stopped(u64),

/// The stream was already stopped, finished, or reset.
#[error("The stream was already stopped, finished, or reset")]
Gone,

/// An error was caused by an unsupported operation.
///
/// Additional stream errors can arise from the use of 0-RTT connections or unordered reads,
/// neither of which are supported by the library.
#[error("An error was caused by an unsupported operation")]
Unsupported(#[source] UnsupportedStreamOperation),
}

/// An error caused by an unsupported operation.
#[derive(Debug, Error)]
#[error(transparent)]
pub struct UnsupportedStreamOperation(Box<dyn std::error::Error + Send + Sync>);
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ pub use connection_pool::ConnId;
pub use connections::{DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
pub use error::{
ClientEndpointError, Close, ConnectionError, Error, InternalConfigError, Result,
TransportErrorCode, UnexpectedMessageType,
ClientEndpointError, Close, ConnectionError, Error, InternalConfigError, Result, SendError,
SerializationError, StreamError, TransportErrorCode, UnexpectedMessageType,
UnsupportedStreamOperation,
};

#[cfg(test)]
Expand Down
11 changes: 7 additions & 4 deletions src/wire_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// Software.

use crate::{
error::{Error, Result},
error::{Error, Result, SendError},
utils,
};
use bytes::Bytes;
Expand Down Expand Up @@ -64,7 +64,10 @@ impl WireMsg {
}

// Helper to write WireMsg bytes to the provided stream.
pub(crate) async fn write_to_stream(&self, send_stream: &mut quinn::SendStream) -> Result<()> {
pub(crate) async fn write_to_stream(
&self,
send_stream: &mut quinn::SendStream,
) -> Result<(), SendError> {
// Let's generate the message bytes
let (msg_bytes, msg_flag) = match self {
WireMsg::UserMsg(ref m) => (m.clone(), USER_MSG_FLAG),
Expand Down Expand Up @@ -118,9 +121,9 @@ struct MsgHeader {
}

impl MsgHeader {
fn new(msg: &Bytes, usr_msg_flag: u8) -> Result<Self> {
fn new(msg: &Bytes, usr_msg_flag: u8) -> Result<Self, SendError> {
match u32::try_from(msg.len()) {
Err(_) => Err(Error::MaxLengthExceeded(msg.len())),
Err(_) => Err(SendError::TooLong(msg.len())),
Ok(data_len) => Ok(Self {
version: MSG_PROTOCOL_VERSION,
data_len,
Expand Down

0 comments on commit 73c6501

Please sign in to comment.