Skip to content

Commit

Permalink
refactor!: Remove Error
Browse files Browse the repository at this point in the history
This was no longer used in any public APIs. The one internal use was to
unify error when reading from streams in `connections`. This has been
replaced by a private enum for that specific purpose.

BREAKING CHANGE: The `Error` type has been removed.
  • Loading branch information
Chris Connelly authored and connec committed Aug 27, 2021
1 parent c43855b commit 5608f27
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 36 deletions.
34 changes: 21 additions & 13 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::Endpoint;

use super::{
connection_pool::{ConnId, ConnectionPool, ConnectionRemover},
error::{ConnectionError, Error, RecvError, SendError, SerializationError},
error::{ConnectionError, RecvError, RpcError, SendError, SerializationError},
wire_msg::WireMsg,
};
use bytes::Bytes;
Expand Down Expand Up @@ -258,28 +258,36 @@ pub(super) fn listen_for_incoming_messages<I: ConnId>(
});
}

// unify uni- and bi-stream errors
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
enum StreamError {
Uni(#[from] RecvError),
Bi(#[from] RpcError),
}

// Read messages sent by peer in an unidirectional stream.
async fn read_on_uni_streams(
uni_streams: &mut quinn::IncomingUniStreams,
peer_addr: SocketAddr,
message_tx: Sender<(SocketAddr, Bytes)>,
) -> Result<(), Error> {
) -> Result<(), StreamError> {
while let Some(result) = uni_streams.next().await {
match result {
Err(error @ quinn::ConnectionError::ConnectionClosed(_)) => {
trace!("Connection closed by peer {:?}", peer_addr);
return Err(error.into());
return Err(StreamError::Uni(error.into()));
}
Err(error @ quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(error.into());
return Err(StreamError::Uni(error.into()));
}
Err(err) => {
warn!(
"Failed to read incoming message on uni-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(err.into());
return Err(StreamError::Uni(err.into()));
}
Ok(mut recv) => loop {
match read_bytes(&mut recv).await {
Expand Down Expand Up @@ -308,23 +316,23 @@ async fn read_on_bi_streams<I: ConnId>(
peer_addr: SocketAddr,
message_tx: Sender<(SocketAddr, Bytes)>,
endpoint: &Endpoint<I>,
) -> Result<(), Error> {
) -> Result<(), StreamError> {
while let Some(result) = bi_streams.next().await {
match result {
Err(error @ quinn::ConnectionError::ConnectionClosed(_)) => {
trace!("Connection closed by peer {:?}", peer_addr);
return Err(error.into());
return Err(StreamError::Bi(error.into()));
}
Err(error @ quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(error.into());
return Err(StreamError::Bi(error.into()));
}
Err(err) => {
warn!(
"Failed to read incoming message on bi-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(Error::from(err));
return Err(StreamError::Bi(err.into()));
}
Ok((mut send, mut recv)) => loop {
match read_bytes(&mut recv).await {
Expand All @@ -338,7 +346,7 @@ async fn read_on_bi_streams<I: ConnId>(
peer_addr, error
);

return Err(error);
return Err(StreamError::Bi(error.into()));
}
}
Ok(WireMsg::EndpointVerificationReq(address_sent)) => {
Expand All @@ -352,7 +360,7 @@ async fn read_on_bi_streams<I: ConnId>(
{
warn!("Failed to handle Endpoint verification request for peer {:?} with: {:?}", peer_addr, error);

return Err(error);
return Err(StreamError::Bi(error));
}
}
Ok(msg) => {
Expand All @@ -379,7 +387,7 @@ async fn read_on_bi_streams<I: ConnId>(
async fn handle_endpoint_echo_req(
peer_addr: SocketAddr,
send_stream: &mut quinn::SendStream,
) -> Result<(), Error> {
) -> Result<(), SendError> {
trace!("Received Echo Request from peer {:?}", peer_addr);
let message = WireMsg::EndpointEchoResp(peer_addr);
message.write_to_stream(send_stream).await?;
Expand All @@ -392,7 +400,7 @@ async fn handle_endpoint_verification_req<I: ConnId>(
addr_sent: SocketAddr,
send_stream: &mut quinn::SendStream,
endpoint: &Endpoint<I>,
) -> Result<(), Error> {
) -> Result<(), RpcError> {
trace!(
"Received Endpoint verification request {:?} from {:?}",
addr_sent,
Expand Down
27 changes: 6 additions & 21 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,6 @@ use bytes::Bytes;
use std::{fmt, io, net::SocketAddr};
use thiserror::Error;

/// Error types returned by the qp2p public API.
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// quinn connection error
#[error("Connection lost due to error: {0}")]
ConnectionError(#[from] ConnectionError),
/// Failed to send message.
#[error("Failed to send message")]
Send(#[from] SendError),
/// Failed to receive message.
#[error("Failed to receive message")]
Recv(#[from] RecvError),
}

impl From<quinn::ConnectionError> for Error {
fn from(error: quinn::ConnectionError) -> Self {
Self::ConnectionError(error.into())
}
}

/// Errors returned from [`Endpoint::new`](crate::Endpoint::new).
#[derive(Debug, Error)]
pub enum EndpointError {
Expand Down Expand Up @@ -327,6 +306,12 @@ impl From<ConnectionError> for RpcError {
}
}

impl From<quinn::ConnectionError> for RpcError {
fn from(error: quinn::ConnectionError) -> Self {
Self::Send(error.into())
}
}

impl From<tokio::time::error::Elapsed> for RpcError {
fn from(_: tokio::time::error::Elapsed) -> Self {
Self::TimedOut
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
#[cfg(not(feature = "no-igd"))]
pub use error::UpnpError;
pub use error::{
ClientEndpointError, Close, ConnectionError, EndpointError, Error, InternalConfigError,
RecvError, RpcError, SendError, SerializationError, StreamError, TransportErrorCode,
ClientEndpointError, Close, ConnectionError, EndpointError, InternalConfigError, RecvError,
RpcError, SendError, SerializationError, StreamError, TransportErrorCode,
UnsupportedStreamOperation,
};

Expand Down

0 comments on commit 5608f27

Please sign in to comment.