Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/src/transport: Add Transport::dial_as_listener #2363

Merged
merged 14 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,24 @@

- Implement `Serialize` and `Deserialize` for `PeerId` (see [PR 2408])

- Allow overriding role when dialing. This option is needed for NAT and firewall
hole punching.

- Add `Transport::dial_as_listener`. As `Transport::dial` but
overrides the role of the local node on the connection . I.e. has the
local node act as a listener on the outgoing connection.

- Add `override_role` option to `DialOpts`.

See [PR 2363].

[PR 2339]: https://github.com/libp2p/rust-libp2p/pull/2339
[PR 2350]: https://github.com/libp2p/rust-libp2p/pull/2350
[PR 2352]: https://github.com/libp2p/rust-libp2p/pull/2352
[PR 2392]: https://github.com/libp2p/rust-libp2p/pull/2392
[PR 2404]: https://github.com/libp2p/rust-libp2p/pull/2404
[PR 2408]: https://github.com/libp2p/rust-libp2p/pull/2408
[PR 2363]: https://github.com/libp2p/rust-libp2p/pull/2363

# 0.30.1 [2021-11-16]

Expand Down
37 changes: 32 additions & 5 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ pub enum PendingPoint {
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer,
Dialer {
/// Same as [`ConnectedPoint::Dialer`] `role_override`.
role_override: Endpoint,
},
/// The socket comes from a listener.
Listener {
/// Local connection address.
Expand All @@ -110,7 +113,7 @@ pub enum PendingPoint {
impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer,
ConnectedPoint::Dialer { role_override, .. } => PendingPoint::Dialer { role_override },
ConnectedPoint::Listener {
local_addr,
send_back_addr,
Expand All @@ -129,6 +132,27 @@ pub enum ConnectedPoint {
Dialer {
/// Multiaddress that was successfully dialed.
address: Multiaddr,
/// Whether the role of the local node on the connection should be
/// overriden. I.e. whether the local node should act as a listener on
/// the outgoing connection.
///
/// This option is needed for NAT and firewall hole punching.
///
/// - [`Endpoint::Dialer`] represents the default non-overriding option.
///
/// - [`Endpoint::Listener`] represents the overriding option.
/// Realization depends on the transport protocol. E.g. in the case of
/// TCP, both endpoints dial each other, resulting in a _simultaneous
/// open_ TCP connection. On this new connection both endpoints assume
/// to be the dialer of the connection. This is problematic during the
/// connection upgrade process where an upgrade assumes one side to be
/// the listener. With the help of this option, both peers can
/// negotiate the roles (dialer and listener) for the new connection
/// ahead of time, through some external channel, e.g. the DCUtR
/// protocol, and thus have one peer dial the other and upgrade the
/// connection as a dialer and one peer dial the other and upgrade the
/// connection _as a listener_ overriding its role.
role_override: Endpoint,
},
/// We received the node.
Listener {
Expand Down Expand Up @@ -179,7 +203,10 @@ impl ConnectedPoint {
/// Returns true if the connection is relayed.
pub fn is_relayed(&self) -> bool {
match self {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Dialer {
address,
role_override: _,
} => address,
ConnectedPoint::Listener { local_addr, .. } => local_addr,
}
.iter()
Expand All @@ -194,7 +221,7 @@ impl ConnectedPoint {
/// not be usable to establish new connections.
pub fn get_remote_address(&self) -> &Multiaddr {
match self {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
}
}
Expand All @@ -204,7 +231,7 @@ impl ConnectedPoint {
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address } => *address = new_address,
ConnectedPoint::Dialer { address, .. } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
}
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,13 @@ mod tests {
panic!()
}

fn dial_as_listener(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
Expand Down Expand Up @@ -542,6 +549,13 @@ mod tests {
panic!()
}

fn dial_as_listener(
self,
_: Multiaddr,
) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> {
None
}
Expand Down
34 changes: 21 additions & 13 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
use crate::{
connection::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo,
IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError,
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, Endpoint,
IncomingInfo, IntoConnectionHandler, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError, PendingPoint, Substream,
},
muxing::StreamMuxer,
Expand Down Expand Up @@ -460,7 +460,7 @@ where
local_addr,
send_back_addr,
}),
PendingPoint::Dialer => None,
PendingPoint::Dialer { .. } => None,
})
}

Expand Down Expand Up @@ -535,6 +535,7 @@ where
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
peer: Option<PeerId>,
handler: THandler,
role_override: Endpoint,
dial_concurrency_factor_override: Option<NonZeroU8>,
) -> Result<ConnectionId, DialError<THandler>>
where
Expand All @@ -550,6 +551,7 @@ where
peer,
addresses,
dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor),
role_override,
);

let connection_id = self.next_connection_id();
Expand All @@ -566,13 +568,15 @@ where
.boxed(),
);

self.counters.inc_pending(&PendingPoint::Dialer);
let endpoint = PendingPoint::Dialer { role_override };

self.counters.inc_pending(&endpoint);
self.pending.insert(
connection_id,
PendingConnectionInfo {
peer_id: peer,
handler,
endpoint: PendingPoint::Dialer,
endpoint: endpoint,
_drop_notifier: drop_notifier,
},
);
Expand Down Expand Up @@ -745,9 +749,13 @@ where
self.counters.dec_pending(&endpoint);

let (endpoint, concurrent_dial_errors) = match (endpoint, outgoing) {
(PendingPoint::Dialer, Some((address, errors))) => {
(ConnectedPoint::Dialer { address }, Some(errors))
}
(PendingPoint::Dialer { role_override }, Some((address, errors))) => (
ConnectedPoint::Dialer {
address,
role_override,
},
Some(errors),
),
(
PendingPoint::Listener {
local_addr,
Expand All @@ -761,7 +769,7 @@ where
},
None,
),
(PendingPoint::Dialer, None) => unreachable!(
(PendingPoint::Dialer { .. }, None) => unreachable!(
"Established incoming connection via pending outgoing connection."
),
(PendingPoint::Listener { .. }, Some(_)) => unreachable!(
Expand Down Expand Up @@ -910,7 +918,7 @@ where
self.counters.dec_pending(&endpoint);

match (endpoint, error) {
(PendingPoint::Dialer, Either::Left(error)) => {
(PendingPoint::Dialer { .. }, Either::Left(error)) => {
return Poll::Ready(PoolEvent::PendingOutboundConnectionError {
id,
error,
Expand All @@ -933,7 +941,7 @@ where
local_addr,
});
}
(PendingPoint::Dialer, Either::Right(_)) => {
(PendingPoint::Dialer { .. }, Either::Right(_)) => {
unreachable!("Inbound error for outbound connection.")
}
(PendingPoint::Listener { .. }, Either::Left(_)) => {
Expand Down Expand Up @@ -1176,7 +1184,7 @@ impl ConnectionCounters {

fn inc_pending(&mut self, endpoint: &PendingPoint) {
match endpoint {
PendingPoint::Dialer => {
PendingPoint::Dialer { .. } => {
self.pending_outgoing += 1;
}
PendingPoint::Listener { .. } => {
Expand All @@ -1191,7 +1199,7 @@ impl ConnectionCounters {

fn dec_pending(&mut self, endpoint: &PendingPoint) {
match endpoint {
PendingPoint::Dialer => {
PendingPoint::Dialer { .. } => {
self.pending_outgoing -= 1;
}
PendingPoint::Listener { .. } => {
Expand Down
22 changes: 14 additions & 8 deletions core/src/connection/pool/concurrent_dial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

pub use crate::connection::{ConnectionCounters, ConnectionLimits};

use crate::{
connection::Endpoint,
transport::{Transport, TransportError},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -63,14 +62,21 @@ where
peer: Option<PeerId>,
addresses: impl Iterator<Item = Multiaddr> + Send + 'static,
concurrency_factor: NonZeroU8,
role_override: Endpoint,
) -> Self {
let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) {
Ok(address) => match transport.clone().dial(address.clone()) {
Ok(fut) => fut
.map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(),
Err(err) => futures::future::ready((address, Err(err))).boxed(),
},
Ok(address) => {
let dial = match role_override {
Endpoint::Dialer => transport.clone().dial(address.clone()),
Endpoint::Listener => transport.clone().dial_as_listener(address.clone()),
};
match dial {
Ok(fut) => fut
.map(|r| (address, r.map_err(|e| TransportError::Other(e))))
.boxed(),
Err(err) => futures::future::ready((address, Err(err))).boxed(),
}
}
Err(address) => futures::future::ready((
address.clone(),
Err(TransportError::MultiaddrNotSupported(address)),
Expand Down
19 changes: 19 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,25 @@ where
}
}

fn dial_as_listener(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized,
{
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.dial_as_listener(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.dial_as_listener(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
},
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match self {
EitherTransport::Left(a) => a.address_translation(server, observed),
Expand Down
Loading