Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tcp] Port-reuse, async-io, if-watch #1887

Merged
merged 19 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ default = [
"pnet",
"request-response",
"secp256k1",
"tcp-async-std",
"tcp",
"uds",
"wasm-ext",
"websocket",
Expand All @@ -44,8 +44,7 @@ ping = ["libp2p-ping"]
plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"]
request-response = ["libp2p-request-response"]
tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"]
tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"]
tcp = ["libp2p-tcp"]
uds = ["libp2p-uds"]
wasm-ext = ["libp2p-wasm-ext"]
wasm-ext-websocket = ["wasm-ext", "libp2p-wasm-ext/websocket"]
Expand Down Expand Up @@ -91,7 +90,7 @@ libp2p-tcp = { version = "0.25.1", path = "transports/tcp", optional = true }
libp2p-websocket = { version = "0.26.1", path = "transports/websocket", optional = true }

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
env_logger = "0.8.1"
tokio = { version = "0.3", features = ["io-util", "io-std", "stream", "macros", "rt", "rt-multi-thread"] }

Expand Down Expand Up @@ -121,7 +120,3 @@ members = [
"transports/websocket",
"transports/wasm-ext"
]

[[example]]
name = "chat-tokio"
required-features = ["tcp-tokio", "mdns"]
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ zeroize = "1"
ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false }

[dev-dependencies]
async-std = "1.6.2"
async-std = { version = "1.6.2", features = ["attributes"] }
criterion = "0.3"
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../protocols/noise" }
libp2p-tcp = { path = "../transports/tcp", features = ["async-std"] }
libp2p-tcp = { path = "../transports/tcp" }
multihash = { version = "0.13", default-features = false, features = ["arb"] }
quickcheck = "0.9.0"
wasm-timer = "0.2"
Expand Down
4 changes: 4 additions & 0 deletions core/src/connection/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ mod tests {
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
}

async_std::task::block_on(async move {
Expand Down Expand Up @@ -466,6 +468,8 @@ mod tests {
fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
panic!()
}

fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
}

async_std::task::block_on(async move {
Expand Down
7 changes: 7 additions & 0 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,4 +477,11 @@ where
},
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
match self {
EitherTransport::Left(a) => a.address_translation(server, observed),
EitherTransport::Right(b) => b.address_translation(server, observed),
}
}
}
28 changes: 12 additions & 16 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::{
Executor,
Multiaddr,
PeerId,
address_translation,
connection::{
ConnectionId,
ConnectionLimit,
Expand Down Expand Up @@ -176,30 +175,27 @@ where
self.listeners.listen_addrs()
}

/// Call this function in order to know which address remotes should dial to
/// access your local node.
/// Maps the given `observed_addr`, representing an address of the local
/// node observed by a remote peer, onto the locally known listen addresses
/// to yield one or more addresses of the local node that may be publicly
/// reachable.
///
/// When receiving an observed address on a tcp connection that we initiated, the observed
/// address contains our tcp dial port, not our tcp listen port. We know which port we are
/// listening on, thereby we can replace the port within the observed address.
///
/// When receiving an observed address on a tcp connection that we did **not** initiated, the
/// observed address should contain our listening port. In case it differs from our listening
/// port there might be a proxy along the path.
///
/// # Arguments
///
/// * `observed_addr` - should be an address a remote observes you as, which can be obtained for
/// example with the identify protocol.
/// I.e. this method incorporates the view of other peers into the listen
/// addresses seen by the local node to account for possible IP and port
/// mappings performed by intermediate network devices in an effort to
/// obtain addresses for the local peer that are also reachable for peers
/// other than the peer who reported the `observed_addr`.
///
/// The translation is transport-specific. See [`Transport::address_translation`].
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a
where
TMuxer: 'a,
THandler: 'a,
{
let transport = self.listeners.transport();
let mut addrs: Vec<_> = self.listen_addrs()
.filter_map(move |server| address_translation(server, observed_addr))
.filter_map(move |server| transport.address_translation(server, observed_addr))
.collect();

// remove duplicates
Expand Down
5 changes: 5 additions & 0 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ pub trait Transport {
where
Self: Sized;

/// Performs a transport-specific mapping of an address `observed` by
/// a remote onto a local `listen` address to yield an address for
/// the local node that may be reachable for other peers.
fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;

/// Boxes the transport, including custom transport errors.
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ where
};
Ok(future)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Custom `Stream` to avoid boxing.
Expand Down
9 changes: 9 additions & 0 deletions core/src/transport/boxed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ListenerUpgrade<O> = Pin<Box<dyn Future<Output = io::Result<O>> + Send>>;
trait Abstract<O> {
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O>, TransportError<io::Error>>;
fn dial(&self, addr: Multiaddr) -> Result<Dial<O>, TransportError<io::Error>>;
fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr>;
}

impl<T, O> Abstract<O> for T
Expand Down Expand Up @@ -78,6 +79,10 @@ where
.map_err(|e| e.map(box_err))?;
Ok(Box::pin(fut) as Dial<_>)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Transport::address_translation(self, server, observed)
}
}

impl<O> fmt::Debug for Boxed<O> {
Expand Down Expand Up @@ -108,6 +113,10 @@ impl<O> Transport for Boxed<O> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
self.inner.dial(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

fn box_err<E: Error + Send + Sync + 'static>(e: E) -> io::Error {
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/choice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ where

Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(addr) = self.0.address_translation(server, observed) {
Some(addr)
} else {
self.1.address_translation(server, observed)
}
}
}
4 changes: 4 additions & 0 deletions core/src/transport/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ impl<TOut> Transport for DummyTransport<TOut> {
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
Err(TransportError::MultiaddrNotSupported(addr))
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}

/// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be instanciated.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ where
let p = ConnectedPoint::Dialer { address: addr };
Ok(MapFuture { inner: future, args: Some((self.fun, p)) })
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Custom `Stream` implementation to avoid boxing.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/map_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ where
Err(err) => Err(err.map(map)),
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.transport.address_translation(server, observed)
}
}

/// Listening stream for `MapErr`.
Expand Down
4 changes: 4 additions & 0 deletions core/src/transport/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ impl Transport for MemoryTransport {

DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable))
}

fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option<Multiaddr> {
None
}
}

/// Error that can be produced from the `MemoryTransport`.
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/optional.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,12 @@ where
Err(TransportError::MultiaddrNotSupported(addr))
}
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
if let Some(inner) = &self.0 {
inner.address_translation(server, observed)
} else {
None
}
}
}
4 changes: 4 additions & 0 deletions core/src/transport/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ where
timer: Delay::new(self.outgoing_timeout),
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

// TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable
Expand Down
8 changes: 8 additions & 0 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ where
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
self.0.listen_on(addr)
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.0.address_translation(server, observed)
}
}

/// An inbound or outbound upgrade.
Expand Down Expand Up @@ -383,6 +387,10 @@ where
upgrade: self.upgrade
})
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
self.inner.address_translation(server, observed)
}
}

/// Errors produced by a transport upgrade.
Expand Down
26 changes: 14 additions & 12 deletions core/tests/network_dial_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ fn deny_incoming_connec() {
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let address = async_std::task::block_on(future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
Poll::Ready(listen_addr)
}
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}
}));

Expand Down Expand Up @@ -95,15 +97,15 @@ fn dial_self() {
let mut swarm = test_network(NetworkConfig::default());
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

let (local_address, mut swarm) = async_std::task::block_on(
future::lazy(move |cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) {
Ok::<_, void::Void>((listen_addr, swarm))
} else {
panic!("Was expecting the listen address to be reported")
let local_address = async_std::task::block_on(future::poll_fn(|cx| {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => {
Poll::Ready(listen_addr)
}
}))
.unwrap();
Poll::Pending => Poll::Pending,
_ => panic!("Was expecting the listen address to be reported"),
}
}));

swarm.dial(&local_address, TestHandler()).unwrap();

Expand Down
Loading