Skip to content

Commit

Permalink
refactor!: Merge 'connect' and 'close' errors with ConnectionError
Browse files Browse the repository at this point in the history
This introduces new variants to the `ConnectionError` enum to cover
failures that may be encountered before establishing a connection
(`quinn::ConnectError`) as well as closures (`Close`).

This is useful for a couple of reasons:

- Many of our APIs will transparently try to create connections before
  using them. Even when reducing the possible variants from these APIs,
  we would still need to account for these three possibilities
  (pre-connect error, connection error, and connection close).
- It removes `quinn::ConnectError` from our public API, and better
  represents errors that should be impossible as bugs.
- It replaces the crate-private `ConnectFailure` and simplifies some of
  the error conversion logic.

BREAKING CHANGE: The `Error::Connect` and `Error::ConnectionClosed`
variants have been removed. Several additional variants have been added
to `ConnectionError`.
  • Loading branch information
Chris Connelly authored and connec committed Aug 27, 2021
1 parent 07b2432 commit 9aae2e3
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 125 deletions.
4 changes: 2 additions & 2 deletions src/connection_deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::error::ConnectFailed;
use crate::ConnectionError;
use std::sync::Arc;
use std::{
collections::{hash_map::Entry, HashMap},
net::SocketAddr,
};
use tokio::sync::{broadcast, Mutex};

type Result = std::result::Result<(), ConnectFailed>;
type Result = std::result::Result<(), ConnectionError>;

// Deduplicate multiple concurrent connect attempts to the same peer - they all will yield the
// same `Connection` instead of opening a separate connection each.
Expand Down
18 changes: 9 additions & 9 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,20 +260,20 @@ async fn read_on_uni_streams(
) -> Result<()> {
while let Some(result) = uni_streams.next().await {
match result {
Err(quinn::ConnectionError::ConnectionClosed(close)) => {
Err(error @ quinn::ConnectionError::ConnectionClosed(_)) => {
trace!("Connection closed by peer {:?}", peer_addr);
return Err(Error::ConnectionClosed(close.into()));
return Err(error.into());
}
Err(quinn::ConnectionError::ApplicationClosed(close)) => {
Err(error @ quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::ConnectionClosed(close.into()));
return Err(error.into());
}
Err(err) => {
warn!(
"Failed to read incoming message on uni-stream for peer {:?} with: {:?}",
peer_addr, err
);
return Err(Error::from(err));
return Err(err.into());
}
Ok(mut recv) => loop {
match read_bytes(&mut recv).await {
Expand Down Expand Up @@ -309,13 +309,13 @@ async fn read_on_bi_streams<I: ConnId>(
) -> Result<()> {
while let Some(result) = bi_streams.next().await {
match result {
Err(quinn::ConnectionError::ConnectionClosed(close)) => {
Err(error @ quinn::ConnectionError::ConnectionClosed(_)) => {
trace!("Connection closed by peer {:?}", peer_addr);
return Err(Error::ConnectionClosed(close.into()));
return Err(error.into());
}
Err(quinn::ConnectionError::ApplicationClosed(close)) => {
Err(error @ quinn::ConnectionError::ApplicationClosed(_)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::ConnectionClosed(close.into()));
return Err(error.into());
}
Err(err) => {
warn!(
Expand Down
11 changes: 7 additions & 4 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection,
DisconnectionEvents, RecvStream, SendStream,
},
error::{ClientEndpointError, Result},
error::{ClientEndpointError, ConnectionError, Result},
};
use bytes::Bytes;
use std::net::SocketAddr;
Expand Down Expand Up @@ -434,7 +434,10 @@ impl<I: ConnId> Endpoint<I> {
}

/// Attempt a connection to a node_addr using exponential backoff
async fn attempt_connection(&self, node_addr: &SocketAddr) -> Result<quinn::NewConnection> {
async fn attempt_connection(
&self,
node_addr: &SocketAddr,
) -> Result<quinn::NewConnection, ConnectionError> {
self.retry(|| async {
trace!("Attempting to connect to {:?}", node_addr);
let connecting = match self.quic_endpoint.connect_with(
Expand All @@ -445,7 +448,7 @@ impl<I: ConnId> Endpoint<I> {
Ok(conn) => Ok(conn),
Err(error) => {
warn!("Connection attempt failed due to {:?}", error);
Err(Error::from(error))
Err(ConnectionError::from(error))
}
}?;

Expand All @@ -460,7 +463,7 @@ impl<I: ConnId> Endpoint<I> {
.complete(node_addr, Err(error.clone().into()))
.await;

Err(Error::from(error))
Err(ConnectionError::from(error))
}
}?;

Expand Down
211 changes: 102 additions & 109 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use super::wire_msg::WireMsg;
use crate::config::ConfigError;
use bytes::Bytes;
use std::{fmt, io};
use std::{fmt, io, net::SocketAddr};
use thiserror::Error;

/// Result used by `QuicP2p`.
Expand All @@ -20,12 +20,9 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum Error {
/// quinn connection closed
#[error("Connection was closed by {0}")]
ConnectionClosed(Close),
/// quinn connection error
#[error("Connection lost due to error: {0}")]
ConnectionError(#[source] ConnectionError),
ConnectionError(#[from] ConnectionError),
/// Error occurred when attempting to connect to any
/// of the peers provided as a list of contacts.
#[error("Network bootstrap failed")]
Expand All @@ -36,9 +33,6 @@ pub enum Error {
/// I/O failure when attempting to access a local resource.
#[error("I/O Error")]
Io(#[from] io::Error),
/// Failure encountered when establishing a connection with another peer.
#[error("Establishing connection")]
Connect(#[from] quinn::ConnectError),
/// Failed to create a new endpoint.
#[error("Creating endpoint")]
Endpoint(#[from] quinn::EndpointError),
Expand Down Expand Up @@ -105,20 +99,7 @@ pub enum Error {

impl From<quinn::ConnectionError> for Error {
fn from(error: quinn::ConnectionError) -> Self {
match error.into() {
ConnectionCloseOrError::Close(close) => Self::ConnectionClosed(close),
ConnectionCloseOrError::Error(error) => Self::ConnectionError(error),
}
}
}

impl From<ConnectFailed> for Error {
fn from(error: ConnectFailed) -> Self {
match error {
ConnectFailed::Connect(error) => Self::Connect(error),
ConnectFailed::Connection(error) => Self::ConnectionError(error),
ConnectFailed::Close(close) => Self::ConnectionClosed(close),
}
Self::ConnectionError(error.into())
}
}

Expand All @@ -142,6 +123,105 @@ impl From<quinn::EndpointError> for ClientEndpointError {
}
}

/// Errors that can cause connection loss.
// This is a copy of `quinn::ConnectionError` without the `*Closed` variants, since we want to
// separate them in our interface.
#[derive(Clone, Debug, Error)]
pub enum ConnectionError {
/// The endpoint has been stopped.
#[error("The endpoint has been stopped")]
Stopped,

/// The number of active connections on the local endpoint is at the limit.
///
/// This limit is imposed by the underlying connection ID generator, which is not currently
/// configurable.
// NOTE: We could make this configurable by exposing a way to set
// quinn_proto::RandomConnectionIdGenerator cid_len
#[error("The number of active connections on the local endpoint is at the limit")]
TooManyConnections,

/// Invalid remote address.
///
/// Examples include attempting to connect to port `0` or using an inappropriate address family.
#[error("Invalid remote address: {0}")]
InvalidAddress(SocketAddr),

/// Internal configuration error.
///
/// This should not occur (if it does, there's a bug!), but it covers possible misconfigurations
/// of the underlying transport library.
#[error("BUG: internal configuration error")]
InternalConfigError(#[source] InternalConfigError),

/// The peer doesn't implement the supported version.
#[error("{}", quinn::ConnectionError::VersionMismatch)]
VersionMismatch,

/// The peer violated the QUIC specification as understood by this implementation.
#[error("{0}")]
TransportError(#[source] quinn_proto::TransportError),

/// The peer is unable to continue processing this connection, usually due to having restarted.
#[error("{}", quinn::ConnectionError::Reset)]
Reset,

/// Communication with the peer has lapsed for longer than the negotiated idle timeout.
#[error("{}", quinn::ConnectionError::TimedOut)]
TimedOut,

/// The connection was closed.
#[error("The connection was closed by {0}")]
Closed(Close),
}

impl From<quinn::ConnectError> for ConnectionError {
fn from(error: quinn::ConnectError) -> Self {
match error {
quinn::ConnectError::EndpointStopping => Self::Stopped,
quinn::ConnectError::TooManyConnections => Self::TooManyConnections,
quinn::ConnectError::InvalidRemoteAddress(addr) => Self::InvalidAddress(addr),
quinn::ConnectError::InvalidDnsName(_) => {
// We currently use a hard-coded domain name, so if it's invalid we have a library
// breaking bug. We want to avoid panics though, so we propagate this as an opaque
// error.
Self::InternalConfigError(InternalConfigError(error))
}
quinn::ConnectError::Config(_) => {
// This is logically impossible, but has not been removed from the quinn API
// (PR: https://github.com/quinn-rs/quinn/pull/1181). As above, we don't want to
// allow any panics, so we propagate it as an opaque internal error.
Self::InternalConfigError(InternalConfigError(error))
}
}
}
}

impl From<quinn::ConnectionError> for ConnectionError {
fn from(error: quinn::ConnectionError) -> Self {
match error {
quinn::ConnectionError::LocallyClosed => Self::Closed(Close::Local),
quinn::ConnectionError::ApplicationClosed(close) => Self::Closed(Close::Application {
error_code: close.error_code.into_inner(),
reason: close.reason,
}),
quinn::ConnectionError::ConnectionClosed(close) => Self::Closed(Close::Transport {
error_code: TransportErrorCode(close.error_code),
reason: close.reason,
}),
quinn::ConnectionError::VersionMismatch => Self::VersionMismatch,
quinn::ConnectionError::TransportError(error) => Self::TransportError(error),
quinn::ConnectionError::Reset => Self::Reset,
quinn::ConnectionError::TimedOut => Self::TimedOut,
}
}
}

/// An internal configuration error encountered by [`Endpoint`](crate::Endpoint) connect methods.
#[derive(Clone, Debug, Error)]
#[error(transparent)]
pub struct InternalConfigError(quinn::ConnectError);

/// The reason a connection was closed.
#[derive(Clone, Debug)]
pub enum Close {
Expand Down Expand Up @@ -206,28 +286,6 @@ impl From<quinn::ConnectionClose> for Close {
}
}

/// Errors that can cause connection loss.
// This is a copy of `quinn::ConnectionError` without the `*Closed` variants, since we want to
// separate them in our interface.
#[derive(Clone, Debug, Error)]
pub enum ConnectionError {
/// The peer doesn't implement the supported version.
#[error("{}", quinn::ConnectionError::VersionMismatch)]
VersionMismatch,

/// The peer violated the QUIC specification as understood by this implementation.
#[error("{0}")]
TransportError(#[source] quinn_proto::TransportError),

/// The peer is unable to continue processing this connection, usually due to having restarted.
#[error("{}", quinn::ConnectionError::Reset)]
Reset,

/// Communication with the peer has lapsed for longer than the negotiated idle timeout.
#[error("{}", quinn::ConnectionError::TimedOut)]
TimedOut,
}

/// An opaque error code indicating a transport failure.
///
/// This can be turned to a string via its `Debug` and `Display` impls, but is otherwise opaque.
Expand All @@ -245,68 +303,3 @@ impl fmt::Display for TransportErrorCode {
write!(f, "{}", self.0)
}
}

/// Errors preventing a `Connection` from being created.
///
/// The `quinn` API means that, before we can get a handle to a `Connection`, we have to handle both
/// `ConnectError` (indicating a misconfiguration, such that quinn didn't even try to open a
/// connection) and `ConnectionError` (indicating an I/O or protocol error when trying to open a
/// connection).
///
/// In our public API, we also want to separate quinn's `ConnectionError` into our own
/// `ConnectionError`/`Close`. This enum is intended to facilitate this. Note that it does not
/// `impl Error` and is not meant to be public.
#[derive(Clone, Debug)]
pub(crate) enum ConnectFailed {
Connect(quinn::ConnectError),
Connection(ConnectionError),
Close(Close),
}

impl From<quinn::ConnectError> for ConnectFailed {
fn from(error: quinn::ConnectError) -> Self {
Self::Connect(error)
}
}

impl From<quinn::ConnectionError> for ConnectFailed {
fn from(error: quinn::ConnectionError) -> Self {
match error.into() {
ConnectionCloseOrError::Close(close) => Self::Close(close),
ConnectionCloseOrError::Error(error) => Self::Connection(error),
}
}
}

/// Errors that can interrupt a connection.
///
/// We want to separate "close" and "error" in our API, to enable more robust error handling. This
/// is a helper enum for separating `quinn::ConnectionError` into those parts.
enum ConnectionCloseOrError {
Close(Close),
Error(ConnectionError),
}

impl From<quinn::ConnectionError> for ConnectionCloseOrError {
fn from(error: quinn::ConnectionError) -> Self {
match error {
quinn::ConnectionError::LocallyClosed => Self::Close(Close::Local),
quinn::ConnectionError::ApplicationClosed(close) => Self::Close(Close::Application {
error_code: close.error_code.into_inner(),
reason: close.reason,
}),
quinn::ConnectionError::ConnectionClosed(close) => Self::Close(Close::Transport {
error_code: TransportErrorCode(close.error_code),
reason: close.reason,
}),
quinn::ConnectionError::VersionMismatch => {
Self::Error(ConnectionError::VersionMismatch)
}
quinn::ConnectionError::TransportError(error) => {
Self::Error(ConnectionError::TransportError(error))
}
quinn::ConnectionError::Reset => Self::Error(ConnectionError::Reset),
quinn::ConnectionError::TimedOut => Self::Error(ConnectionError::Reset),
}
}
}
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ pub use config::{Config, ConfigError};
pub use connection_pool::ConnId;
pub use connections::{DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
pub use error::{ClientEndpointError, Close, ConnectionError, Error, Result, TransportErrorCode};
pub use error::{
ClientEndpointError, Close, ConnectionError, Error, InternalConfigError, Result,
TransportErrorCode,
};

#[cfg(test)]
mod tests;

0 comments on commit 9aae2e3

Please sign in to comment.