Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the transport upgrade API. #1240

Merged
merged 5 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub use identity::PublicKey;
pub use transport::Transport;
pub use translation::address_translation;
pub use upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError, ProtocolName};
pub use nodes::ConnectionInfo;

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
12 changes: 3 additions & 9 deletions core/src/nodes/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,8 @@ where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: Send + 'static,
Expand Down Expand Up @@ -937,12 +936,10 @@ where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Dial: Send + 'static,
TTrans::Error: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: Send + 'static,
TPeerId: Send + 'static,
{
let reach_id = match self.transport().clone().dial(first.clone()) {
Expand Down Expand Up @@ -985,14 +982,12 @@ where
TTrans::Error: Send + 'static,
TTrans::Dial: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer: Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoNodeHandler<(TConnInfo, ConnectedPoint)> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
THandlerErr: error::Error + Send + 'static,
TConnInfo: Clone,
TPeerId: AsRef<[u8]> + Send + 'static,
Expand Down Expand Up @@ -1151,7 +1146,6 @@ where
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send,
TMuxer::Substream: Send,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Clone + Send + 'static,
Expand Down
99 changes: 45 additions & 54 deletions core/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.

use crate::{InboundUpgrade, OutboundUpgrade, ConnectedPoint};
use crate::ConnectedPoint;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, fmt};
use std::{error::Error, fmt};
use std::time::Duration;
use tokio_io::{AsyncRead, AsyncWrite};

pub mod and_then;
pub mod boxed;
Expand Down Expand Up @@ -69,11 +68,7 @@ pub use self::upgrade::Upgrade;
///
/// Additional protocols can be layered on top of the connections established
/// by a [`Transport`] through an upgrade mechanism that is initiated via
/// [`with_upgrade`](Transport::with_upgrade) and optionally followed by further upgrades
/// through chaining calls to [`with_upgrade`](Transport::with_upgrade) and
/// [`and_then`](Transport::and_then). Thereby every upgrade yields a new [`Transport`]
/// whose connection setup incorporates all earlier upgrades followed by the new upgrade,
/// i.e. the order of the upgrades is significant.
/// [`upgrade`](Transport::upgrade).
///
/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
/// > words, listening or dialing consumes the transport object. This has been designed
Expand All @@ -88,7 +83,7 @@ pub trait Transport {
type Output;

/// An error that occurred during connection setup.
type Error: error::Error;
type Error: Error;

/// A stream of [`Output`](Transport::Output)s for inbound connections.
///
Expand Down Expand Up @@ -127,7 +122,7 @@ pub trait Transport {
where
Self: Sized;

/// Turns this `Transport` into an abstract boxed transport.
/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
where Self: Sized + Clone + Send + Sync + 'static,
Self::Dial: Send + 'static,
Expand All @@ -138,93 +133,89 @@ pub trait Transport {
}

/// Applies a function on the connections created by the transport.
fn map<F, O>(self, map: F) -> map::Map<Self, F>
fn map<F, O>(self, f: F) -> map::Map<Self, F>
where
Self: Sized,
F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone
{
map::Map::new(self, map)
map::Map::new(self, f)
}

/// Applies a function on the errors generated by the futures of the transport.
fn map_err<F, TNewErr>(self, map_err: F) -> map_err::MapErr<Self, F>
fn map_err<F, E>(self, f: F) -> map_err::MapErr<Self, F>
where
Self: Sized,
F: FnOnce(Self::Error) -> TNewErr + Clone
F: FnOnce(Self::Error) -> E + Clone
{
map_err::MapErr::new(self, map_err)
map_err::MapErr::new(self, f)
}

/// Builds a new transport that falls back to another transport when
/// encountering errors on dialing or listening for connections.
/// Adds a fallback transport that is used when encountering errors
/// while establishing inbound or outbound connections.
///
/// The returned transport will act like `self`, except that if `listen_on` or `dial`
/// return an error then `other` will be tried.
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
fn or_transport<U>(self, other: U) -> OrTransport<Self, U>
where
Self: Sized,
U: Transport,
<U as Transport>::Error: 'static
{
OrTransport::new(self, other)
}

/// Wraps this transport inside an [`Upgrade`].
///
/// Whenever an inbound or outbound connection is established by this
/// transport, the upgrade is applied on the current state of the
/// connection (which may have already gone through previous upgrades)
/// as an [`upgrade::InboundUpgrade`] or [`upgrade::OutboundUpgrade`],
/// respectively.
fn with_upgrade<U, O, E>(self, upgrade: U) -> Upgrade<Self, U>
where
Self: Sized,
Self::Output: AsyncRead + AsyncWrite,
U: InboundUpgrade<Self::Output, Output = O, Error = E>,
U: OutboundUpgrade<Self::Output, Output = O, Error = E>
{
Upgrade::new(self, upgrade)
}

/// Applies a function producing an asynchronous result to every connection
/// created by this transport.
///
/// This function can be used for ad-hoc protocol upgrades on a transport or
/// for processing or adapting the output of an earlier upgrade before
/// applying the next upgrade.
fn and_then<C, F, O>(self, upgrade: C) -> and_then::AndThen<Self, C>
/// This function can be used for ad-hoc protocol upgrades or
/// for processing or adapting the output for following configurations.
///
/// For the high-level transport upgrade procedure, see [`Transport::upgrade`].
fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone,
F: IntoFuture<Item = O>
F: IntoFuture<Item = O>,
<F as IntoFuture>::Error: Error + 'static
{
and_then::AndThen::new(self, upgrade)
and_then::AndThen::new(self, f)
}

/// Adds a timeout to the connection setup (including upgrades) for all inbound
/// and outbound connection attempts.
fn with_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
/// Adds a timeout to the connection setup (including upgrades) for all
/// inbound and outbound connections established through the transport.
fn timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
Self: Sized
{
timeout::TransportTimeout::new(self, timeout)
}

/// Adds a timeout to the connection setup (including upgrades) for all outbound
/// connection attempts.
fn with_outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
/// connections established through the transport.
fn outbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
Self: Sized
{
timeout::TransportTimeout::with_outgoing_timeout(self, timeout)
}

/// Adds a timeout to the connection setup (including upgrades) for all inbound
/// connection attempts.
fn with_inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
/// connections established through the transport.
fn inbound_timeout(self, timeout: Duration) -> timeout::TransportTimeout<Self>
where
Self: Sized,
Self: Sized
{
timeout::TransportTimeout::with_ingoing_timeout(self, timeout)
}

/// Begins a series of protocol upgrades via an [`upgrade::Builder`].
fn upgrade(self) -> upgrade::Builder<Self>
where
Self: Sized,
Self::Error: 'static
{
upgrade::Builder::new(self)
}
}

/// Event produced by [`Transport::Listener`]s.
Expand Down Expand Up @@ -362,10 +353,10 @@ where TErr: fmt::Display,
}
}

impl<TErr> error::Error for TransportError<TErr>
where TErr: error::Error + 'static,
impl<TErr> Error for TransportError<TErr>
where TErr: Error + 'static,
{
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
TransportError::MultiaddrNotSupported(_) => None,
TransportError::Other(err) => Some(err),
Expand Down
Loading