From 5608f27634439828eab22b53a2fb858b60d7249f Mon Sep 17 00:00:00 2001 From: Chris Connelly Date: Thu, 26 Aug 2021 15:46:19 +0100 Subject: [PATCH] refactor!: Remove `Error` This was no longer used in any public APIs. The one internal use was to unify error when reading from streams in `connections`. This has been replaced by a private enum for that specific purpose. BREAKING CHANGE: The `Error` type has been removed. --- src/connections.rs | 34 +++++++++++++++++++++------------- src/error.rs | 27 ++++++--------------------- src/lib.rs | 4 ++-- 3 files changed, 29 insertions(+), 36 deletions(-) diff --git a/src/connections.rs b/src/connections.rs index a495a744..9691acf9 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -11,7 +11,7 @@ use crate::Endpoint; use super::{ connection_pool::{ConnId, ConnectionPool, ConnectionRemover}, - error::{ConnectionError, Error, RecvError, SendError, SerializationError}, + error::{ConnectionError, RecvError, RpcError, SendError, SerializationError}, wire_msg::WireMsg, }; use bytes::Bytes; @@ -258,28 +258,36 @@ pub(super) fn listen_for_incoming_messages( }); } +// unify uni- and bi-stream errors +#[derive(Debug, thiserror::Error)] +#[error(transparent)] +enum StreamError { + Uni(#[from] RecvError), + Bi(#[from] RpcError), +} + // Read messages sent by peer in an unidirectional stream. async fn read_on_uni_streams( uni_streams: &mut quinn::IncomingUniStreams, peer_addr: SocketAddr, message_tx: Sender<(SocketAddr, Bytes)>, -) -> Result<(), Error> { +) -> Result<(), StreamError> { while let Some(result) = uni_streams.next().await { match result { Err(error @ quinn::ConnectionError::ConnectionClosed(_)) => { trace!("Connection closed by peer {:?}", peer_addr); - return Err(error.into()); + return Err(StreamError::Uni(error.into())); } Err(error @ quinn::ConnectionError::ApplicationClosed(_)) => { trace!("Connection closed by peer {:?}.", peer_addr); - return Err(error.into()); + return Err(StreamError::Uni(error.into())); } Err(err) => { warn!( "Failed to read incoming message on uni-stream for peer {:?} with: {:?}", peer_addr, err ); - return Err(err.into()); + return Err(StreamError::Uni(err.into())); } Ok(mut recv) => loop { match read_bytes(&mut recv).await { @@ -308,23 +316,23 @@ async fn read_on_bi_streams( peer_addr: SocketAddr, message_tx: Sender<(SocketAddr, Bytes)>, endpoint: &Endpoint, -) -> Result<(), Error> { +) -> Result<(), StreamError> { while let Some(result) = bi_streams.next().await { match result { Err(error @ quinn::ConnectionError::ConnectionClosed(_)) => { trace!("Connection closed by peer {:?}", peer_addr); - return Err(error.into()); + return Err(StreamError::Bi(error.into())); } Err(error @ quinn::ConnectionError::ApplicationClosed(_)) => { trace!("Connection closed by peer {:?}.", peer_addr); - return Err(error.into()); + return Err(StreamError::Bi(error.into())); } Err(err) => { warn!( "Failed to read incoming message on bi-stream for peer {:?} with: {:?}", peer_addr, err ); - return Err(Error::from(err)); + return Err(StreamError::Bi(err.into())); } Ok((mut send, mut recv)) => loop { match read_bytes(&mut recv).await { @@ -338,7 +346,7 @@ async fn read_on_bi_streams( peer_addr, error ); - return Err(error); + return Err(StreamError::Bi(error.into())); } } Ok(WireMsg::EndpointVerificationReq(address_sent)) => { @@ -352,7 +360,7 @@ async fn read_on_bi_streams( { warn!("Failed to handle Endpoint verification request for peer {:?} with: {:?}", peer_addr, error); - return Err(error); + return Err(StreamError::Bi(error)); } } Ok(msg) => { @@ -379,7 +387,7 @@ async fn read_on_bi_streams( async fn handle_endpoint_echo_req( peer_addr: SocketAddr, send_stream: &mut quinn::SendStream, -) -> Result<(), Error> { +) -> Result<(), SendError> { trace!("Received Echo Request from peer {:?}", peer_addr); let message = WireMsg::EndpointEchoResp(peer_addr); message.write_to_stream(send_stream).await?; @@ -392,7 +400,7 @@ async fn handle_endpoint_verification_req( addr_sent: SocketAddr, send_stream: &mut quinn::SendStream, endpoint: &Endpoint, -) -> Result<(), Error> { +) -> Result<(), RpcError> { trace!( "Received Endpoint verification request {:?} from {:?}", addr_sent, diff --git a/src/error.rs b/src/error.rs index 874d3c6a..5d696147 100644 --- a/src/error.rs +++ b/src/error.rs @@ -15,27 +15,6 @@ use bytes::Bytes; use std::{fmt, io, net::SocketAddr}; use thiserror::Error; -/// Error types returned by the qp2p public API. -#[derive(Debug, Error)] -#[non_exhaustive] -pub enum Error { - /// quinn connection error - #[error("Connection lost due to error: {0}")] - ConnectionError(#[from] ConnectionError), - /// Failed to send message. - #[error("Failed to send message")] - Send(#[from] SendError), - /// Failed to receive message. - #[error("Failed to receive message")] - Recv(#[from] RecvError), -} - -impl From for Error { - fn from(error: quinn::ConnectionError) -> Self { - Self::ConnectionError(error.into()) - } -} - /// Errors returned from [`Endpoint::new`](crate::Endpoint::new). #[derive(Debug, Error)] pub enum EndpointError { @@ -327,6 +306,12 @@ impl From for RpcError { } } +impl From for RpcError { + fn from(error: quinn::ConnectionError) -> Self { + Self::Send(error.into()) + } +} + impl From for RpcError { fn from(_: tokio::time::error::Elapsed) -> Self { Self::TimedOut diff --git a/src/lib.rs b/src/lib.rs index fd246a2d..fb32febf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,8 +65,8 @@ pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages}; #[cfg(not(feature = "no-igd"))] pub use error::UpnpError; pub use error::{ - ClientEndpointError, Close, ConnectionError, EndpointError, Error, InternalConfigError, - RecvError, RpcError, SendError, SerializationError, StreamError, TransportErrorCode, + ClientEndpointError, Close, ConnectionError, EndpointError, InternalConfigError, RecvError, + RpcError, SendError, SerializationError, StreamError, TransportErrorCode, UnsupportedStreamOperation, };