Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/src/transport: Add Transport::dial_as_listener #2363

Merged
merged 14 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 44 additions & 4 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ pub enum PendingPoint {
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer,
Dialer {
/// Whether the connection is dialed _as a listener_.
as_listener: DialAsListener,
},
/// The socket comes from a listener.
Listener {
/// Local connection address.
Expand All @@ -109,7 +112,7 @@ pub enum PendingPoint {
impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer,
ConnectedPoint::Dialer { as_listener, .. } => PendingPoint::Dialer { as_listener },
ConnectedPoint::Listener {
local_addr,
send_back_addr,
Expand All @@ -128,6 +131,8 @@ pub enum ConnectedPoint {
Dialer {
/// Multiaddress that was successfully dialed.
address: Multiaddr,
/// Whether the connection is dialed _as a listener_.
as_listener: DialAsListener,
},
/// We received the node.
Listener {
Expand Down Expand Up @@ -183,7 +188,7 @@ impl ConnectedPoint {
/// not be usable to establish new connections.
pub fn get_remote_address(&self) -> &Multiaddr {
match self {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
}
}
Expand All @@ -193,12 +198,47 @@ impl ConnectedPoint {
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address } => *address = new_address,
ConnectedPoint::Dialer { address, .. } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
}
}
}

/// Whether a connection is dialed _as a listener_.
///
/// This option is needed for NAT and firewall hole punching.
///
/// The concrete realization of this option depends on the transport
/// protocol. E.g. in the case of TCP, both endpoints dial each other,
/// resulting in a _simultaneous open_ TCP connection. On this new
/// connection both endpoints assume to be the dialer of the connection.
/// This is problematic during the connection upgrade process where an
/// upgrade assumes one side to be the listener. With the help of this
/// option, both peers can negotiate the roles (dialer and listener) for
/// the new connection ahead of time, through some external channel, and
/// thus have one peer dial the other as a dialer and one peer dial the
/// other _as a listener_.
#[derive(Debug, Default, Copy, Clone, Hash, Eq, PartialEq)]
pub struct DialAsListener(bool);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced this newtype instead of passing a bool through the various layers.

I like the fact that it (a) increases type safety and (b) carries its documentation. Unfortunately it is not particularly intuitive, nor idiomatic (never seen a newtype around a bool) nor convenient (e.g. see if as_listener == false).

@thomaseizinger you usually have good ideas when it comes to idiomatic Rust. Any alternatives that come to your mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of:

    Dialer {
        /// An optional override if the desired role for the upgrade process has already been negotiated out of band.
		////
		/// If this is set, the endpoint resulting from a successful dial should assume the given role instead of inferring it from the underlying transport.
        role_override: Option<Endpoint>,
    },

We could be even more descriptive and do:

enum UpgradeRole {
	InferredFromTransport,
	NegotiatedOutOfBand {
		role: Endpoint
	}
}
// -------------
Dialer {
    /// The strategy, how the role within the upgrade process should be determined.
    upgrade_role: UpgradeRole,
},

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    Dialer {
        /// An optional override if the desired role for the upgrade process has already been negotiated out of band.
		////
		/// If this is set, the endpoint resulting from a successful dial should assume the given role instead of inferring it from the underlying transport.
        role_override: Option<Endpoint>,
    },

This is a good idea, especially as it does not introduce a new type. Though it does allow for duplication of information and thus potentially confusion? PendingPoint::Dialer { role_override: Some(Endpoint::Dialer) } is the same as PendingPoint::Dialer { role_override: None }.

Copy link
Member Author

@mxinden mxinden Nov 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enum UpgradeRole {
	InferredFromTransport,
	NegotiatedOutOfBand {
		role: Endpoint
	}
}

Note that this whole out-of-band role negotiation does not only concern upgrades, but some transport protocols as well. The TCP transport implementation does not require this information as it will open a simultaneous open TCP connection either way. But QUIC does. In the case of QUIC the listener sends a random-garbage packet instead of a connection-initiation packet.

  • For a QUIC address:
    • Upon receiving the Sync, A immediately dials the address to B.
    • Upon expiry of the timer, B starts to send UDP packets filled with
      random bytes to A's address. Packets should be sent repeatedly in
      random intervals between 10 and 200 ms.
    • This will result in a QUIC connection where A is the client and B is
      the server.

https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back to your suggestion of role_override, how would the signatures of Network::dial and Peer::dial look like?

    pub fn dial(
        &mut self,
        address: &Multiaddr,
        handler: THandler,
        role_override: Option<Endpoint>,
    ) -> Result<ConnectionId, DialError<THandler>>

I find this descriptive but again confusing due to the duplication of information where Some(Endpoint::Dialer) is the same as None.


impl From<bool> for DialAsListener {
fn from(b: bool) -> Self {
Self(b)
}
}

impl From<DialAsListener> for bool {
fn from(b: DialAsListener) -> Self {
b.0
}
}

impl PartialEq<bool> for DialAsListener {
fn eq(&self, other: &bool) -> bool {
self.0 == *other
}
}

/// Information about a successfully established connection.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Connected {
Expand Down
43 changes: 28 additions & 15 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
use crate::{
connection::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo,
IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError, PendingPoint, Substream,
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
DialAsListener, IncomingInfo, IntoConnectionHandler, PendingConnectionError,
PendingInboundConnectionError, PendingOutboundConnectionError, PendingPoint, Substream,
},
muxing::StreamMuxer,
network::DialError,
Expand Down Expand Up @@ -460,7 +460,7 @@ where
local_addr,
send_back_addr,
}),
PendingPoint::Dialer => None,
PendingPoint::Dialer { .. } => None,
})
}

Expand Down Expand Up @@ -535,6 +535,7 @@ where
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>,
handler: THandler,
as_listener: DialAsListener,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Clone + Send,
Expand All @@ -544,7 +545,13 @@ where
return Err(DialError::ConnectionLimit { limit, handler });
};

let dial = ConcurrentDial::new(transport, peer, addresses, self.dial_concurrency_factor);
let dial = ConcurrentDial::new(
transport,
peer,
addresses,
self.dial_concurrency_factor,
as_listener,
);

let connection_id = self.next_connection_id();

Expand All @@ -560,13 +567,15 @@ where
.boxed(),
);

self.counters.inc_pending(&PendingPoint::Dialer);
let endpoint = PendingPoint::Dialer { as_listener };

self.counters.inc_pending(&endpoint);
self.pending.insert(
connection_id,
PendingConnectionInfo {
peer_id: peer,
handler,
endpoint: PendingPoint::Dialer,
endpoint: endpoint,
_drop_notifier: drop_notifier,
},
);
Expand Down Expand Up @@ -739,9 +748,13 @@ where
self.counters.dec_pending(&endpoint);

let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
(PendingPoint::Dialer, Some((address, errors))) => {
(ConnectedPoint::Dialer { address }, Some(errors))
}
(PendingPoint::Dialer { as_listener }, Some((address, errors))) => (
ConnectedPoint::Dialer {
address,
as_listener,
},
Some(errors),
),
(
PendingPoint::Listener {
local_addr,
Expand All @@ -755,7 +768,7 @@ where
},
None,
),
(PendingPoint::Dialer, None) => unreachable!(
(PendingPoint::Dialer { .. }, None) => unreachable!(
"Established incoming connection via pending outgoing connection."
),
(PendingPoint::Listener { .. }, Some(_)) => unreachable!(
Expand Down Expand Up @@ -904,7 +917,7 @@ where
self.counters.dec_pending(&endpoint);

match (endpoint, error) {
(PendingPoint::Dialer, Either::Left(error)) => {
(PendingPoint::Dialer { .. }, Either::Left(error)) => {
return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id,
error,
Expand All @@ -927,7 +940,7 @@ where
local_addr,
});
}
(PendingPoint::Dialer, Either::Right(_)) => {
(PendingPoint::Dialer { .. }, Either::Right(_)) => {
unreachable!("Inbound error for outbound connection.")
}
(PendingPoint::Listener { .. }, Either::Left(_)) => {
Expand Down Expand Up @@ -1170,7 +1183,7 @@ impl ConnectionCounters {

fn inc_pending(&mut self, endpoint: &PendingPoint) {
match endpoint {
PendingPoint::Dialer => {
PendingPoint::Dialer { .. } => {
self.pending_outgoing += 1;
}
PendingPoint::Listener { .. } => {
Expand All @@ -1185,7 +1198,7 @@ impl ConnectionCounters {

fn dec_pending(&mut self, endpoint: &PendingPoint) {
match endpoint {
PendingPoint::Dialer => {
PendingPoint::Dialer { .. } => {
self.pending_outgoing -= 1;
}
PendingPoint::Listener { .. } => {
Expand Down
6 changes: 3 additions & 3 deletions core/src/connection/pool/concurrent_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

pub use crate::connection::{ConnectionCounters, ConnectionLimits};

use crate::{
connection::DialAsListener,
transport::{Transport, TransportError},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -63,9 +62,10 @@ where
peer: Option<PeerId>,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
concurrency_factor: NonZeroU8,
as_listener: DialAsListener,
) -> Self {
let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) {
Ok(address) => match transport.clone().dial(address.clone()) {
Ok(address) => match transport.clone().dial(address.clone(), as_listener) {
Ok(fut) => fut
.map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(),
Expand Down
11 changes: 8 additions & 3 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
connection::DialAsListener,
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
Multiaddr, ProtocolName,
Expand Down Expand Up @@ -513,15 +514,19 @@ where
}
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(
self,
addr: Multiaddr,
as_listener: DialAsListener,
) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.dial(addr) {
EitherTransport::Left(a) => match a.dial(addr, as_listener) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.dial(addr) {
EitherTransport::Right(b) => match b.dial(addr, as_listener) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
Expand Down
11 changes: 8 additions & 3 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
mod event;
pub mod peer;

pub use crate::connection::{ConnectionCounters, ConnectionLimits};
pub use crate::connection::{ConnectionCounters, ConnectionLimits, DialAsListener};
pub use event::{IncomingConnection, NetworkEvent};
pub use peer::Peer;

Expand Down Expand Up @@ -95,7 +95,7 @@ where
self.pool
.iter_pending_info()
.filter(move |(_, endpoint, peer_id)| {
matches!(endpoint, PendingPoint::Dialer) && peer_id.as_ref() == Some(&peer)
matches!(endpoint, PendingPoint::Dialer { .. }) && peer_id.as_ref() == Some(&peer)
})
.map(|(connection_id, _, _)| connection_id)
}
Expand Down Expand Up @@ -195,6 +195,7 @@ where
&mut self,
address: &Multiaddr,
handler: THandler,
as_listener: DialAsListener,
) -> Result<ConnectionId, DialError<THandler>>
where
TTrans: Transport + Send,
Expand All @@ -213,6 +214,7 @@ where
peer,
addresses: std::iter::once(address.clone()),
handler,
as_listener,
});
}
}
Expand All @@ -222,6 +224,7 @@ where
std::iter::once(address.clone()),
None,
handler,
as_listener,
)
}

Expand All @@ -242,6 +245,7 @@ where
opts.addresses,
Some(opts.peer),
opts.handler,
opts.as_listener,
)?;

Ok(id)
Expand Down Expand Up @@ -279,7 +283,7 @@ where
pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
self.pool
.iter_pending_info()
.filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer))
.filter(|(_, endpoint, _)| matches!(endpoint, PendingPoint::Dialer { .. }))
.filter_map(|(_, _, peer)| peer.as_ref())
}

Expand Down Expand Up @@ -469,6 +473,7 @@ struct DialingOpts<THandler, I> {
peer: PeerId,
handler: THandler,
addresses: I,
as_listener: DialAsListener,
}

/// Information about the network obtained by [`Network::info()`].
Expand Down
11 changes: 5 additions & 6 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use super::{DialError, DialingOpts, Network};
use crate::{
connection::{
handler::THandlerInEvent, pool::Pool, ConnectionHandler, ConnectionId,
handler::THandlerInEvent, pool::Pool, ConnectionHandler, ConnectionId, DialAsListener,
EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler, PendingConnection,
},
Multiaddr, PeerId, Transport,
Expand Down Expand Up @@ -151,15 +151,13 @@ where

/// Initiates a new dialing attempt to this peer using the given addresses.
///
/// The connection ID of the first connection attempt, i.e. to `address`,
/// is returned, together with a [`DialingPeer`] for further use. The
/// `remaining` addresses are tried in order in subsequent connection
/// attempts in the context of the same dialing attempt, if the connection
/// attempt to the first address fails.
/// The [`ConnectionId`] of the connection attempt is returned together with
/// a [`DialingPeer`] for further use.
pub fn dial<I>(
self,
addresses: I,
handler: THandler,
as_listener: DialAsListener,
) -> Result<(ConnectionId, DialingPeer<'a, TTrans, THandler>), DialError<THandler>>
where
I: IntoIterator<Item = Multiaddr>,
Expand All @@ -176,6 +174,7 @@ where
peer: peer_id,
handler,
addresses: addresses.into_iter(),
as_listener,
})?;

Ok((id, DialingPeer { network, peer_id }))
Expand Down
9 changes: 7 additions & 2 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.

use crate::ConnectedPoint;
use crate::connection::{ConnectedPoint, DialAsListener};
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error::Error, fmt};
Expand Down Expand Up @@ -126,7 +126,12 @@ pub trait Transport {
///
/// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
/// try an alternative [`Transport`], if available.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
fn dial(
self,
addr: Multiaddr,
// TODO: In case the transport doesn't support as_listener, should we return an error at runtime?
as_listener: DialAsListener,
) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized;

Expand Down
Loading