Skip to content

Commit

Permalink
refactor!: Separate connection close from errors
Browse files Browse the repository at this point in the history
This rationalises how connection closes and errors are represented by
the `Error` type. The `QuinnConnectionClosed` variant is gone, in favour
of a `ConnectionClosed` variant that carries information about who
closed the connection and why. The `Connection` variant has been renamed
to `ConnectionError` and contains a subset of `quinn::ConnectionError`'s
variants, excluding `*Closed` errors.

It's expected that this will make it easier and more reliable to respond
to 'normal' connection closures, vs. connection errors.

BREAKING CHANGE: The `Error::QuinnConnectionClosed` variant has been
renamed `Error::ConnectionClosed` and now contains closure information.
The `Error::Connection` variant has been renamed
`Error::ConnectionError`, and now contains a `ConnectionError`.
`quinn::ConnectionError` is no longer re-exported.
  • Loading branch information
Chris Connelly authored and connec committed Aug 11, 2021
1 parent f03ff41 commit 4d3d4eb
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ edition = "2018"
base64 = "~0.12.2"
bincode = "1.2.1"
futures = "~0.3.8"
quinn-proto = "0.7.3"
rcgen = "~0.8.4"
serde_json = "1.0.59"
structopt = "~0.3.15"
Expand Down
2 changes: 1 addition & 1 deletion src/connection_deduplicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl From<Error> for crate::error::Error {
fn from(src: Error) -> Self {
match src {
Error::Connect(source) => Self::Connect(source),
Error::Connection(source) => Self::Connection(source),
Error::Connection(source) => source.into(),
}
}
}
18 changes: 12 additions & 6 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,13 @@ async fn read_on_uni_streams(
) -> Result<()> {
while let Some(result) = uni_streams.next().await {
match result {
Err(quinn::ConnectionError::ConnectionClosed(_))
| Err(quinn::ConnectionError::ApplicationClosed(_)) => {
Err(quinn::ConnectionError::ConnectionClosed(close)) => {
trace!("Connection closed by peer {:?}", peer_addr);
return Err(Error::ConnectionClosed(close.into()));
}
Err(quinn::ConnectionError::ApplicationClosed(close)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::QuinnConnectionClosed);
return Err(Error::ConnectionClosed(close.into()));
}
Err(err) => {
warn!(
Expand Down Expand Up @@ -279,10 +282,13 @@ async fn read_on_bi_streams<I: ConnId>(
) -> Result<()> {
while let Some(result) = bi_streams.next().await {
match result {
Err(quinn::ConnectionError::ConnectionClosed(_))
| Err(quinn::ConnectionError::ApplicationClosed(_)) => {
Err(quinn::ConnectionError::ConnectionClosed(close)) => {
trace!("Connection closed by peer {:?}", peer_addr);
return Err(Error::ConnectionClosed(close.into()));
}
Err(quinn::ConnectionError::ApplicationClosed(close)) => {
trace!("Connection closed by peer {:?}.", peer_addr);
return Err(Error::QuinnConnectionClosed);
return Err(Error::ConnectionClosed(close.into()));
}
Err(err) => {
warn!(
Expand Down
8 changes: 1 addition & 7 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,13 +405,7 @@ impl<I: ConnId> Endpoint<I> {
pub async fn is_reachable(&self, peer_addr: &SocketAddr) -> Result<()> {
trace!("Checking is reachable");
let new_connection = self.create_new_connection(peer_addr).await?;
let (mut send_stream, mut recv_stream) = match new_connection.connection.open_bi().await {
Ok(cool) => cool,
Err(error) => {
error!("Reachablity check errored with: {:?}", error);
return Err(Error::QuinnConnectionClosed);
}
};
let (mut send_stream, mut recv_stream) = new_connection.connection.open_bi().await?;

let message = WireMsg::EndpointEchoReq;
message.write_to_stream(&mut send_stream).await?;
Expand Down
141 changes: 135 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
// Software.

use super::wire_msg::WireMsg;
use std::io;
use bytes::Bytes;
use std::{fmt, io};
use thiserror::Error;

/// Result used by `QuicP2p`.
Expand All @@ -19,8 +20,11 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[non_exhaustive]
pub enum Error {
/// quinn connection closed
#[error("Connection was closed by underlying quinn library")]
QuinnConnectionClosed,
#[error("Connection was closed by {0}")]
ConnectionClosed(Close),
/// quinn connection error
#[error("Connection lost due to error: {0}")]
ConnectionError(#[source] ConnectionError),
/// tokio channel error when trying to send disconnect notificaiton
#[error("Could not send disconnect notification")]
DisconnectionNotification,
Expand All @@ -43,9 +47,6 @@ pub enum Error {
/// Failure encountered when establishing a connection with another peer.
#[error("Establishing connection")]
Connect(#[from] quinn::ConnectError),
/// An existing connection with another peer has been lost.
#[error("Quinn Connection lost")]
Connection(#[from] quinn::ConnectionError),
/// Failed to create a new endpoint.
#[error("Creating endpoint")]
Endpoint(#[from] quinn::EndpointError),
Expand Down Expand Up @@ -134,3 +135,131 @@ pub enum Error {
#[error("Couldn't generate connection ID")]
ConnectionIdGeneration(String),
}

impl From<quinn::ConnectionError> for Error {
fn from(error: quinn::ConnectionError) -> Self {
let error = match error {
quinn::ConnectionError::LocallyClosed => return Self::ConnectionClosed(Close::Local),
quinn::ConnectionError::ApplicationClosed(close) => {
return Self::ConnectionClosed(Close::Application {
error_code: close.error_code.into_inner(),
reason: close.reason,
})
}
quinn::ConnectionError::ConnectionClosed(close) => {
return Self::ConnectionClosed(Close::Transport {
error_code: TransportErrorCode(close.error_code),
reason: close.reason,
})
}
quinn::ConnectionError::VersionMismatch => ConnectionError::VersionMismatch,
quinn::ConnectionError::TransportError(error) => ConnectionError::TransportError(error),
quinn::ConnectionError::Reset => ConnectionError::Reset,
quinn::ConnectionError::TimedOut => ConnectionError::Reset,
};
Self::ConnectionError(error)
}
}

/// The reason a connection was closed.
#[derive(Debug)]
pub enum Close {
/// This application closed the connection.
Local,

/// The remote application closed the connection.
Application {
/// The error code supplied by the application.
error_code: u64,

/// The reason supplied by the application.
reason: Bytes,
},

/// The transport layer closed the connection.
///
/// This would indicate an abrupt disconnection or a protocol violation.
Transport {
/// The error code supplied by the QUIC library.
error_code: TransportErrorCode,

/// The reason supplied by the QUIC library.
reason: Bytes,
},
}

impl fmt::Display for Close {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let (closed_by, error_code, reason): (_, &dyn fmt::Display, _) = match self {
Self::Local => return write!(f, "us"),
Self::Application { error_code, reason } => {
("application", error_code, String::from_utf8_lossy(reason))
}
Self::Transport { error_code, reason } => {
("transport", error_code, String::from_utf8_lossy(reason))
}
};
write!(
f,
"{} (error code: {}, reason: {})",
closed_by, error_code, reason
)
}
}

impl From<quinn::ApplicationClose> for Close {
fn from(close: quinn::ApplicationClose) -> Self {
Self::Application {
error_code: close.error_code.into_inner(),
reason: close.reason,
}
}
}

impl From<quinn::ConnectionClose> for Close {
fn from(close: quinn::ConnectionClose) -> Self {
Self::Transport {
error_code: TransportErrorCode(close.error_code),
reason: close.reason,
}
}
}

/// An opaque error code indicating a transport failure.
///
/// This can be turned to a string via its `Debug` and `Display` impls, but is otherwise opaque.
pub struct TransportErrorCode(quinn_proto::TransportErrorCode);

impl fmt::Debug for TransportErrorCode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.0)
}
}

impl fmt::Display for TransportErrorCode {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}

/// Errors that can cause connection loss.
// This is a copy of `quinn::ConnectionError` without the `*Closed` variants, since we want to
// separate them in our interface.
#[derive(Debug, Error)]
pub enum ConnectionError {
/// The peer doesn't implement the supported version.
#[error("{}", quinn::ConnectionError::VersionMismatch)]
VersionMismatch,

/// The peer violated the QUIC specification as understood by this implementation.
#[error("{0}")]
TransportError(#[source] quinn_proto::TransportError),

/// The peer is unable to continue processing this connection, usually due to having restarted.
#[error("{}", quinn::ConnectionError::Reset)]
Reset,

/// Communication with the peer has lapsed for longer than the negotiated idle timeout.
#[error("{}", quinn::ConnectionError::TimedOut)]
TimedOut,
}
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ pub use config::Config;
pub use connection_pool::ConnId;
pub use connections::{DisconnectionEvents, RecvStream, SendStream};
pub use endpoint::{Endpoint, IncomingConnections, IncomingMessages};
pub use error::{Error, Result};
pub use quinn::ConnectionError;
pub use error::{Close, ConnectionError, Error, Result};

#[cfg(test)]
mod tests;

0 comments on commit 4d3d4eb

Please sign in to comment.