Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set SO_REUSEPORT and remove port translation. #1667

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ impl Endpoint {
pub enum ConnectedPoint {
/// We dialed the node.
Dialer {
/// Local connection address.
local_addr: Option<Multiaddr>,
/// Multiaddress that was successfully dialed.
address: Multiaddr,
},
Expand Down Expand Up @@ -138,7 +140,7 @@ impl ConnectedPoint {
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address } => *address = new_address,
ConnectedPoint::Dialer { address, .. } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
}
}
Expand Down Expand Up @@ -322,6 +324,7 @@ impl<'a> IncomingInfo<'a> {
/// Borrowed information about an outgoing connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct OutgoingInfo<'a, TPeerId> {
pub local_addr: Option<&'a Multiaddr>,
pub address: &'a Multiaddr,
pub peer_id: Option<&'a TPeerId>,
}
Expand All @@ -330,7 +333,8 @@ impl<'a, TPeerId> OutgoingInfo<'a, TPeerId> {
/// Builds a `ConnectedPoint` corresponding to the outgoing connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Dialer {
address: self.address.clone()
local_addr: self.local_addr.cloned(),
address: self.address.clone(),
}
}
}
Expand Down
27 changes: 24 additions & 3 deletions core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! Manage listening on multiple multiaddresses at once.

use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
use crate::multiaddr::Protocol;
use futures::{prelude::*, task::Context, task::Poll};
use log::debug;
use smallvec::SmallVec;
Expand Down Expand Up @@ -229,6 +230,26 @@ where
self.listeners.iter().flat_map(|l| l.addresses.iter())
}

/// Returns a listener suitable for port reuse if there is one.
pub fn listener_for_port_reuse(&self, address: &Multiaddr) -> Option<&Multiaddr> {
let ip4 = match address.iter().next() {
Some(Protocol::Ip4(addr)) if !addr.is_loopback() => true,
Some(Protocol::Dns4(_)) => true,
Some(Protocol::Ip6(addr)) if !addr.is_loopback() => false,
Some(Protocol::Dns6(_)) => false,
_ => return None,
};
self.listen_addrs().find(|addr| {
match addr.iter().next() {
Some(Protocol::Ip4(addr)) => !addr.is_loopback() && ip4,
Some(Protocol::Dns4(_)) => ip4,
Some(Protocol::Ip6(addr)) => !addr.is_loopback() && !ip4,
Some(Protocol::Dns6(_)) => !ip4,
_ => false,
}
})
}

/// Provides an API similar to `Stream`, except that it cannot end.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<ListenersEvent<TTrans>> {
// We remove each element from `listeners` one by one and add them back.
Expand Down Expand Up @@ -394,7 +415,7 @@ mod tests {

let address2 = address.clone();
async_std::task::spawn(async move {
mem_transport.dial(address2).unwrap().await.unwrap();
mem_transport.dial(None, address2).unwrap().await.unwrap();
});

match listeners.next().await.unwrap() {
Expand Down Expand Up @@ -427,7 +448,7 @@ mod tests {
})))
}

fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
fn dial(self, _: Option<Multiaddr>, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
Expand Down Expand Up @@ -465,7 +486,7 @@ mod tests {
})))
}

fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
fn dial(self, _: Option<Multiaddr>, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ where
.filter_map(|(_, ref endpoint, ref peer_id)| {
match endpoint {
ConnectedPoint::Listener { .. } => None,
ConnectedPoint::Dialer { address } =>
Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
ConnectedPoint::Dialer { local_addr, address } =>
Some(OutgoingInfo { local_addr: local_addr.as_ref(), address, peer_id: peer_id.as_ref() }),
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,15 @@ where
}
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.dial(addr) {
EitherTransport::Left(a) => match a.dial(local_addr, addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
},
EitherTransport::Right(b) => match b.dial(addr) {
EitherTransport::Right(b) => match b.dial(local_addr, addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
Expand Down
49 changes: 17 additions & 32 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{
Executor,
Multiaddr,
PeerId,
address_translation,
connection::{
ConnectionId,
ConnectionLimit,
Expand Down Expand Up @@ -183,31 +182,6 @@ where
self.listeners.listen_addrs()
}

/// Call this function in order to know which address remotes should dial to
/// access your local node.
///
/// When receiving an observed address on a tcp connection that we initiated, the observed
/// address contains our tcp dial port, not our tcp listen port. We know which port we are
/// listening on, thereby we can replace the port within the observed address.
///
/// When receiving an observed address on a tcp connection that we did **not** initiated, the
/// observed address should contain our listening port. In case it differs from our listening
/// port there might be a proxy along the path.
///
/// # Arguments
///
/// * `observed_addr` - should be an address a remote observes you as, which can be obtained for
/// example with the identify protocol.
///
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a
where
TMuxer: 'a,
THandler: 'a,
{
self.listen_addrs().flat_map(move |server| address_translation(server, observed_addr))
}

/// Returns the peer id of the local node.
pub fn local_peer_id(&self) -> &TPeerId {
&self.local_peer_id
Expand All @@ -231,8 +205,9 @@ where
TConnInfo: Send + 'static,
TPeerId: Send + 'static,
{
let info = OutgoingInfo { address, peer_id: None };
match self.transport().clone().dial(address.clone()) {
let local_addr = self.listeners.listener_for_port_reuse(address);
let info = OutgoingInfo { local_addr, address, peer_id: None };
match self.transport().clone().dial(local_addr.cloned(), address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
Expand Down Expand Up @@ -453,6 +428,7 @@ where
struct DialingOpts<TPeerId, THandler> {
peer: TPeerId,
handler: THandler,
local_addr: Option<Multiaddr>,
address: Multiaddr,
remaining: Vec<Multiaddr>,
}
Expand Down Expand Up @@ -484,15 +460,18 @@ where
TPeerId: Eq + Hash + Send + Clone + 'static,
TConnInfo: ConnectionInfo<PeerId = TPeerId> + Send + 'static,
{
let result = match transport.dial(opts.address.clone()) {
let info = OutgoingInfo {
local_addr: opts.local_addr.as_ref(),
address: &opts.address,
peer_id: Some(&opts.peer),
};
let result = match transport.dial(opts.local_addr.clone(), opts.address.clone()) {
Ok(fut) => {
let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
},
Err(err) => {
let fut = future::err(PendingConnectionError::Transport(err));
let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
pool.add_outgoing(fut, opts.handler, info)
},
};
Expand Down Expand Up @@ -551,9 +530,15 @@ where
if num_remain > 0 {
if let Some(handler) = handler {
let next_attempt = attempt.remaining.remove(0);
let local_addr = if let ConnectedPoint::Dialer { local_addr, .. } = &endpoint {
local_addr.clone()
} else {
unreachable!()
};
let opts = DialingOpts {
peer: peer_id.clone(),
handler,
local_addr,
address: next_attempt,
remaining: attempt.remaining
};
Expand All @@ -577,7 +562,7 @@ where
} else {
// A pending incoming connection or outgoing connection to an unknown peer failed.
match endpoint {
ConnectedPoint::Dialer { address } =>
ConnectedPoint::Dialer { address, .. } =>
(None, NetworkEvent::UnknownPeerDialError {
multiaddr: address,
error,
Expand Down
5 changes: 4 additions & 1 deletion core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,12 @@ where
Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
};

let local_addr = network.listeners.listener_for_port_reuse(&address).cloned();

let id = network.dial_peer(DialingOpts {
peer: peer_id.clone(),
handler,
local_addr,
address,
remaining: remaining.into_iter().collect(),
})?;
Expand Down Expand Up @@ -629,7 +632,7 @@ impl<'a, TInEvent, TConnInfo, TPeerId>
/// Returns the remote address of the current connection attempt.
pub fn address(&self) -> &Multiaddr {
match self.inner.endpoint() {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { .. } => unreachable!("by definition of a `DialingAttempt`.")
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub trait Transport {
///
/// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
/// try an alternative [`Transport`], if available.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>
where
Self: Sized;

Expand Down
7 changes: 4 additions & 3 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ where
Ok(stream)
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self.transport.dial(addr.clone()).map_err(|err| err.map(EitherError::A))?;
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let dialed_fut = self.transport.dial(local_addr.clone(), addr.clone())
.map_err(|err| err.map(EitherError::A))?;
let future = AndThenFuture {
inner: Either::Left(Box::pin(dialed_fut)),
args: Some((self.fun, ConnectedPoint::Dialer { address: addr })),
args: Some((self.fun, ConnectedPoint::Dialer { local_addr, address: addr })),
marker: PhantomPinned,
};
Ok(future)
Expand Down
10 changes: 5 additions & 5 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Sen

trait Abstract<O, E> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>>;
fn dial(&self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>>;
}

impl<T, O, E> Abstract<O, E> for T
Expand All @@ -62,8 +62,8 @@ where
Ok(Box::pin(fut))
}

fn dial(&self, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>> {
let fut = Transport::dial(self.clone(), addr)?;
fn dial(&self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Dial<O, E>, TransportError<E>> {
let fut = Transport::dial(self.clone(), local_addr, addr)?;
Ok(Box::pin(fut) as Dial<_, _>)
}
}
Expand Down Expand Up @@ -100,7 +100,7 @@ where E: error::Error,
self.inner.listen_on(addr)
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr)
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(local_addr, addr)
}
}
8 changes: 4 additions & 4 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ where
Err(TransportError::MultiaddrNotSupported(addr))
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let addr = match self.0.dial(addr) {
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
match self.0.dial(local_addr.clone(), addr.clone()) {
Ok(connec) => return Ok(EitherFuture::First(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::MultiaddrNotSupported(_)) => {},
Err(TransportError::Other(err)) => return Err(TransportError::Other(EitherError::A(err))),
};

let addr = match self.1.dial(addr) {
let addr = match self.1.dial(local_addr, addr) {
Ok(connec) => return Ok(EitherFuture::Second(connec)),
Err(TransportError::MultiaddrNotSupported(addr)) => addr,
Err(TransportError::Other(err)) => return Err(TransportError::Other(EitherError::B(err))),
Expand Down
2 changes: 1 addition & 1 deletion core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<TOut> Transport for DummyTransport<TOut> {
Err(TransportError::MultiaddrNotSupported(addr))
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(self, _local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ where
Ok(MapStream { stream, fun: self.fun })
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial(addr.clone())?;
let p = ConnectedPoint::Dialer { address: addr };
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let future = self.transport.dial(local_addr.clone(), addr.clone())?;
let p = ConnectedPoint::Dialer { local_addr, address: addr };
Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ where
}
}

fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(self, local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
let map = self.map;
match self.transport.dial(addr) {
match self.transport.dial(local_addr, addr) {
Ok(future) => Ok(MapErrDial { inner: future, map: Some(map) }),
Err(err) => Err(err.map(map)),
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl Transport for MemoryTransport {
Ok(listener)
}

fn dial(self, addr: Multiaddr) -> Result<DialFuture, TransportError<Self::Error>> {
fn dial(self, _local_addr: Option<Multiaddr>, addr: Multiaddr) -> Result<DialFuture, TransportError<Self::Error>> {
let port = if let Ok(port) = parse_memory_addr(&addr) {
if let Some(port) = NonZeroU64::new(port) {
port
Expand Down Expand Up @@ -306,9 +306,9 @@ mod tests {
#[test]
fn port_not_in_use() {
let transport = MemoryTransport::default();
assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_err());
assert!(transport.dial(None, "/memory/810172461024613".parse().unwrap()).is_err());
let _listener = transport.listen_on("/memory/810172461024613".parse().unwrap()).unwrap();
assert!(transport.dial("/memory/810172461024613".parse().unwrap()).is_ok());
assert!(transport.dial(None, "/memory/810172461024613".parse().unwrap()).is_ok());
}

#[test]
Expand Down Expand Up @@ -342,7 +342,7 @@ mod tests {

let t2 = MemoryTransport::default();
let dialer = async move {
let mut socket = t2.dial(cloned_t1_addr).unwrap().await.unwrap();
let mut socket = t2.dial(None, cloned_t1_addr).unwrap().await.unwrap();
socket.write_all(&msg).await.unwrap();
};

Expand Down
Loading