diff --git a/CHANGELOG.md b/CHANGELOG.md index aad50b260e5..ccd645c93ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,9 +25,10 @@ # Version 0.34.0 [unreleased] -- Update `libp2p-gossipsub`, `libp2p-kad` and `libp2p-request-response`. +- Update `libp2p-core` and all dependent crates. -- Update dependencies. +- The `tcp-async-std` feature is now `tcp-async-io`, still + enabled by default. # Version 0.33.0 [2020-12-17] diff --git a/Cargo.toml b/Cargo.toml index 7020fafb4ef..bfa51880762 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ default = [ "pnet", "request-response", "secp256k1", - "tcp-async-std", + "tcp-async-io", "uds", "wasm-ext", "websocket", @@ -44,7 +44,7 @@ ping = ["libp2p-ping"] plaintext = ["libp2p-plaintext"] pnet = ["libp2p-pnet"] request-response = ["libp2p-request-response"] -tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] +tcp-async-io = ["libp2p-tcp", "libp2p-tcp/async-io"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] uds = ["libp2p-uds"] wasm-ext = ["libp2p-wasm-ext"] @@ -91,7 +91,7 @@ libp2p-tcp = { version = "0.27.0", path = "transports/tcp", optional = true } libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true } [dev-dependencies] -async-std = "1.6.2" +async-std = { version = "1.6.2", features = ["attributes"] } env_logger = "0.8.1" tokio = { version = "0.3", features = ["io-util", "io-std", "stream", "macros", "rt", "rt-multi-thread"] } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 8b090996531..5c066ce7c93 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.27.0 [unreleased] +- (Re)add `Transport::address_translation` to permit transport-specific + translations of observed addresses onto listening addresses. + [PR 1887](https://github.com/libp2p/rust-libp2p/pull/1887) + - Update dependencies. # 0.26.0 [2020-12-17] diff --git a/core/Cargo.toml b/core/Cargo.toml index 81dd1830e56..ed688d4c56d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -39,11 +39,11 @@ zeroize = "1" ring = { version = "0.16.9", features = ["alloc", "std"], default-features = false } [dev-dependencies] -async-std = "1.6.2" +async-std = { version = "1.6.2", features = ["attributes"] } criterion = "0.3" libp2p-mplex = { path = "../muxers/mplex" } libp2p-noise = { path = "../protocols/noise" } -libp2p-tcp = { path = "../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../transports/tcp" } multihash = { version = "0.13", default-features = false, features = ["arb"] } quickcheck = "0.9.0" wasm-timer = "0.2" diff --git a/core/src/connection/listeners.rs b/core/src/connection/listeners.rs index 962c14a7b0d..02982d87393 100644 --- a/core/src/connection/listeners.rs +++ b/core/src/connection/listeners.rs @@ -428,6 +428,8 @@ mod tests { fn dial(self, _: Multiaddr) -> Result> { panic!() } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } } async_std::task::block_on(async move { @@ -466,6 +468,8 @@ mod tests { fn dial(self, _: Multiaddr) -> Result> { panic!() } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { None } } async_std::task::block_on(async move { diff --git a/core/src/either.rs b/core/src/either.rs index 48257fc6914..4d991936121 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -477,4 +477,11 @@ where }, } } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + match self { + EitherTransport::Left(a) => a.address_translation(server, observed), + EitherTransport::Right(b) => b.address_translation(server, observed), + } + } } diff --git a/core/src/network.rs b/core/src/network.rs index 5819ba26be0..5fd2189907d 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -30,7 +30,6 @@ use crate::{ Executor, Multiaddr, PeerId, - address_translation, connection::{ ConnectionId, ConnectionLimit, @@ -176,30 +175,27 @@ where self.listeners.listen_addrs() } - /// Call this function in order to know which address remotes should dial to - /// access your local node. + /// Maps the given `observed_addr`, representing an address of the local + /// node observed by a remote peer, onto the locally known listen addresses + /// to yield one or more addresses of the local node that may be publicly + /// reachable. /// - /// When receiving an observed address on a tcp connection that we initiated, the observed - /// address contains our tcp dial port, not our tcp listen port. We know which port we are - /// listening on, thereby we can replace the port within the observed address. - /// - /// When receiving an observed address on a tcp connection that we did **not** initiated, the - /// observed address should contain our listening port. In case it differs from our listening - /// port there might be a proxy along the path. - /// - /// # Arguments - /// - /// * `observed_addr` - should be an address a remote observes you as, which can be obtained for - /// example with the identify protocol. + /// I.e. this method incorporates the view of other peers into the listen + /// addresses seen by the local node to account for possible IP and port + /// mappings performed by intermediate network devices in an effort to + /// obtain addresses for the local peer that are also reachable for peers + /// other than the peer who reported the `observed_addr`. /// + /// The translation is transport-specific. See [`Transport::address_translation`]. pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr) -> impl Iterator + 'a where TMuxer: 'a, THandler: 'a, { + let transport = self.listeners.transport(); let mut addrs: Vec<_> = self.listen_addrs() - .filter_map(move |server| address_translation(server, observed_addr)) + .filter_map(move |server| transport.address_translation(server, observed_addr)) .collect(); // remove duplicates diff --git a/core/src/transport.rs b/core/src/transport.rs index 50499ec1b82..f6e70c44628 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -128,6 +128,11 @@ pub trait Transport { where Self: Sized; + /// Performs a transport-specific mapping of an address `observed` by + /// a remote onto a local `listen` address to yield an address for + /// the local node that may be reachable for other peers. + fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option; + /// Boxes the transport, including custom transport errors. fn boxed(self) -> boxed::Boxed where diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index ba7513283c6..22018729a07 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -69,6 +69,10 @@ where }; Ok(future) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Custom `Stream` to avoid boxing. diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index 7f2e721e81f..5322b517dbe 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -51,6 +51,7 @@ type ListenerUpgrade = Pin> + Send>>; trait Abstract { fn listen_on(&self, addr: Multiaddr) -> Result, TransportError>; fn dial(&self, addr: Multiaddr) -> Result, TransportError>; + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; } impl Abstract for T @@ -78,6 +79,10 @@ where .map_err(|e| e.map(box_err))?; Ok(Box::pin(fut) as Dial<_>) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + Transport::address_translation(self, server, observed) + } } impl fmt::Debug for Boxed { @@ -108,6 +113,10 @@ impl Transport for Boxed { fn dial(self, addr: Multiaddr) -> Result> { self.inner.dial(addr) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } fn box_err(e: E) -> io::Error { diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index c6593912761..3488b06884d 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -74,4 +74,12 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if let Some(addr) = self.0.address_translation(server, observed) { + Some(addr) + } else { + self.1.address_translation(server, observed) + } + } } diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index 0f9ee6725da..5839a6a5928 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -67,6 +67,10 @@ impl Transport for DummyTransport { fn dial(self, addr: Multiaddr) -> Result> { Err(TransportError::MultiaddrNotSupported(addr)) } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } /// Implementation of `AsyncRead` and `AsyncWrite`. Not meant to be instanciated. diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index f9fb2cf7d49..0305af6626d 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -57,6 +57,10 @@ where let p = ConnectedPoint::Dialer { address: addr }; Ok(MapFuture { inner: future, args: Some((self.fun, p)) }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Custom `Stream` implementation to avoid boxing. diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 90e65eb29d8..c0be6485204 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -64,6 +64,10 @@ where Err(err) => Err(err.map(map)), } } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Listening stream for `MapErr`. diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index e7d306302a8..366abd4e9c8 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -191,6 +191,10 @@ impl Transport for MemoryTransport { DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } /// Error that can be produced from the `MemoryTransport`. diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 283b50d71f7..2b29773ee22 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -74,4 +74,12 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + if let Some(inner) = &self.0 { + inner.address_translation(server, observed) + } else { + None + } + } } diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index dc29af81c50..d55d007df08 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -101,6 +101,10 @@ where timer: Delay::new(self.outgoing_timeout), }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } // TODO: can be removed and replaced with an `impl Stream` once impl Trait is fully stable diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 4304314b91f..b2cb7b46804 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -334,6 +334,10 @@ where fn listen_on(self, addr: Multiaddr) -> Result> { self.0.listen_on(addr) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.0.address_translation(server, observed) + } } /// An inbound or outbound upgrade. @@ -383,6 +387,10 @@ where upgrade: self.upgrade }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } /// Errors produced by a transport upgrade. diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 13e532d7016..6597240ccd6 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -41,10 +41,12 @@ fn deny_incoming_connec() { swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); let address = async_std::task::block_on(future::poll_fn(|cx| { - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { - Poll::Ready(listen_addr) - } else { - panic!("Was expecting the listen address to be reported") + match swarm1.poll(cx) { + Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => { + Poll::Ready(listen_addr) + } + Poll::Pending => Poll::Pending, + _ => panic!("Was expecting the listen address to be reported"), } })); @@ -95,15 +97,15 @@ fn dial_self() { let mut swarm = test_network(NetworkConfig::default()); swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let (local_address, mut swarm) = async_std::task::block_on( - future::lazy(move |cx| { - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) { - Ok::<_, void::Void>((listen_addr, swarm)) - } else { - panic!("Was expecting the listen address to be reported") + let local_address = async_std::task::block_on(future::poll_fn(|cx| { + match swarm.poll(cx) { + Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) => { + Poll::Ready(listen_addr) } - })) - .unwrap(); + Poll::Pending => Poll::Pending, + _ => panic!("Was expecting the listen address to be reported"), + } + })); swarm.dial(&local_address, TestHandler()).unwrap(); diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index 15577ac6dfc..a4c7452ae82 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -36,7 +36,6 @@ //! --features="floodsub mplex noise tcp-tokio mdns-tokio" //! ``` -use futures::prelude::*; use libp2p::{ Multiaddr, NetworkBehaviour, @@ -154,10 +153,15 @@ async fn main() -> Result<(), Box> { loop { let to_publish = { tokio::select! { - line = stdin.try_next() => Some((floodsub_topic.clone(), line?.expect("Stdin closed"))), + line = stdin.next_line() => { + let line = line?.expect("stdin closed"); + Some((floodsub_topic.clone(), line)) + } event = swarm.next() => { - println!("New Event: {:?}", event); - None + // All events are handled by the `NetworkBehaviourEventProcess`es. + // I.e. the `swarm.next()` future drives the `Swarm` without ever + // terminating. + panic!("Unexpected event: {:?}", event); } } }; @@ -171,4 +175,4 @@ async fn main() -> Result<(), Box> { } } } -} +} \ No newline at end of file diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 4fd8565c48d..0ad50e0d138 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -26,7 +26,7 @@ async-std = "1.7.0" criterion = "0.3" env_logger = "0.8" futures = "0.3" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } libp2p-plaintext = { path = "../../protocols/plaintext" } quickcheck = "0.9" rand = "0.7" diff --git a/protocols/deflate/Cargo.toml b/protocols/deflate/Cargo.toml index 9dfee35c413..6de83b8bf89 100644 --- a/protocols/deflate/Cargo.toml +++ b/protocols/deflate/Cargo.toml @@ -16,6 +16,6 @@ flate2 = "1.0" [dev-dependencies] async-std = "1.6.2" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } quickcheck = "0.9" rand = "0.7" diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 3be6c2f951e..4d21ba57f3a 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -22,8 +22,7 @@ wasm-timer = "0.2" async-std = "1.6.2" libp2p-mplex = { path = "../../muxers/mplex" } libp2p-noise = { path = "../../protocols/noise" } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } [build-dependencies] prost-build = "0.6" - diff --git a/protocols/noise/Cargo.toml b/protocols/noise/Cargo.toml index 13f6555fa32..7b6309b16a9 100644 --- a/protocols/noise/Cargo.toml +++ b/protocols/noise/Cargo.toml @@ -28,11 +28,11 @@ snow = { version = "0.7.1", features = ["ring-resolver"], default-features = fal snow = { version = "0.7.1", features = ["default-resolver"], default-features = false } [dev-dependencies] +async-io = "1.2.0" env_logger = "0.8.1" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } quickcheck = "0.9.0" sodiumoxide = "0.2.5" [build-dependencies] prost-build = "0.6" - diff --git a/protocols/noise/tests/smoke.rs b/protocols/noise/tests/smoke.rs index 744d447a247..4a4c81b5eb8 100644 --- a/protocols/noise/tests/smoke.rs +++ b/protocols/noise/tests/smoke.rs @@ -18,15 +18,16 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use async_io::Async; use futures::{future::{self, Either}, prelude::*}; use libp2p_core::identity; use libp2p_core::upgrade::{self, Negotiated, apply_inbound, apply_outbound}; use libp2p_core::transport::{Transport, ListenerEvent}; use libp2p_noise::{Keypair, X25519, X25519Spec, NoiseConfig, RemoteIdentity, NoiseError, NoiseOutput}; -use libp2p_tcp::{TcpConfig, TcpTransStream}; +use libp2p_tcp::TcpConfig; use log::info; use quickcheck::QuickCheck; -use std::{convert::TryInto, io}; +use std::{convert::TryInto, io, net::TcpStream}; #[allow(dead_code)] fn core_upgrade_compat() { @@ -175,7 +176,7 @@ fn ik_xx() { QuickCheck::new().max_tests(30).quickcheck(prop as fn(Vec) -> bool) } -type Output = (RemoteIdentity, NoiseOutput>); +type Output = (RemoteIdentity, NoiseOutput>>); fn run(server_transport: T, client_transport: U, messages: I) where diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 17b0ec67f14..c9ea8b66416 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -20,7 +20,7 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } libp2p-noise = { path = "../../protocols/noise" } libp2p-yamux = { path = "../../muxers/yamux" } libp2p-mplex = { path = "../../muxers/mplex" } diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index 52056be4801..556f27ed21f 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -62,21 +62,16 @@ fn ping_pong() { let mut count2 = count.get(); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).await.unwrap(); - } - loop { - match swarm1.next().await { - PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => { + match swarm1.next_event().await { + SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(), + SwarmEvent::Behaviour(PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) }) => { count1 -= 1; if count1 == 0 { return (pid1.clone(), peer, rtt) } }, - PingEvent { result: Err(e), .. } => panic!("Ping failure: {:?}", e), + SwarmEvent::Behaviour(PingEvent { result: Err(e), .. }) => panic!("Ping failure: {:?}", e), _ => {} } } @@ -132,16 +127,11 @@ fn max_failures() { Swarm::listen_on(&mut swarm1, addr).unwrap(); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - for l in Swarm::listeners(&swarm1) { - tx.send(l.clone()).await.unwrap(); - } - let mut count1: u8 = 0; loop { match swarm1.next_event().await { + SwarmEvent::NewListenAddr(listener) => tx.send(listener).await.unwrap(), SwarmEvent::Behaviour(PingEvent { result: Ok(PingSuccess::Ping { .. }), .. }) => { diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index aba58731897..0595c39b4a9 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -26,6 +26,6 @@ wasm-timer = "0.2" [dev-dependencies] async-std = "1.6.2" libp2p-noise = { path = "../noise" } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } libp2p-yamux = { path = "../../muxers/yamux" } rand = "0.7" diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 8f02975615e..d1da0c064cf 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -31,7 +31,7 @@ use libp2p_core::{ }; use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; use libp2p_request_response::*; -use libp2p_swarm::Swarm; +use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::mpsc, executor::LocalPool, task::SpawnExt}; use rand::{self, Rng}; @@ -64,27 +64,24 @@ fn ping_protocol() { let expected_pong = pong.clone(); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - let l = Swarm::listeners(&swarm1).next().unwrap(); - tx.send(l.clone()).await.unwrap(); - loop { - match swarm1.next().await { - RequestResponseEvent::Message { + match swarm1.next_event().await { + SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(), + SwarmEvent::Behaviour(RequestResponseEvent::Message { peer, message: RequestResponseMessage::Request { request, channel, .. } - } => { + }) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); swarm1.send_response(channel, pong.clone()).unwrap(); }, - RequestResponseEvent::ResponseSent { + SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { peer, .. - } => { + }) => { assert_eq!(&peer, &peer2_id); } - e => panic!("Peer1: Unexpected event: {:?}", e) + SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), + _ => {} } } }; @@ -205,26 +202,24 @@ fn ping_protocol_throttled() { swarm2.set_receive_limit(NonZeroU16::new(limit2).unwrap()); let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - let l = Swarm::listeners(&swarm1).next().unwrap(); - tx.send(l.clone()).await.unwrap(); for i in 1 .. { - match swarm1.next().await { - throttled::Event::Event(RequestResponseEvent::Message { + match swarm1.next_event().await { + SwarmEvent::NewListenAddr(addr) => tx.send(addr).await.unwrap(), + SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::Message { peer, message: RequestResponseMessage::Request { request, channel, .. }, - }) => { + })) => { assert_eq!(&request, &expected_ping); assert_eq!(&peer, &peer2_id); swarm1.send_response(channel, pong.clone()).unwrap(); }, - throttled::Event::Event(RequestResponseEvent::ResponseSent { + SwarmEvent::Behaviour(throttled::Event::Event(RequestResponseEvent::ResponseSent { peer, .. - }) => { + })) => { assert_eq!(&peer, &peer2_id); } - e => panic!("Peer1: Unexpected event: {:?}", e) + SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), + _ => {} } if i % 31 == 0 { let lim = rand::thread_rng().gen_range(1, 17); diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index d520b26c762..bc5fcd5fd92 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -52,4 +52,4 @@ aes-all = ["aesni"] async-std = "1.6.2" criterion = "0.3" libp2p-mplex = { path = "../../muxers/mplex" } -libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] } +libp2p-tcp = { path = "../../transports/tcp" } diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 705164b52b4..87b66653cfc 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -74,6 +74,10 @@ where .dial(addr) .map(move |fut| BandwidthFuture { inner: fut, sinks }) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } /// Wraps around a `Stream` that produces connections. Wraps each connection around a bandwidth diff --git a/src/lib.rs b/src/lib.rs index 514c352d887..74c5e91b0f3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,7 +85,7 @@ //! Example ([`noise`] + [`yamux`] Protocol Upgrade): //! //! ```rust -//! # #[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "tcp-async-std", feature = "noise", feature = "yamux"))] { +//! # #[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), feature = "noise", feature = "yamux"))] { //! use libp2p::{Transport, core::upgrade, tcp::TcpConfig, noise, identity::Keypair, yamux}; //! let tcp = TcpConfig::new(); //! let id_keys = Keypair::generate_ed25519(); @@ -215,8 +215,8 @@ pub use libp2p_ping as ping; pub use libp2p_plaintext as plaintext; #[doc(inline)] pub use libp2p_swarm as swarm; -#[cfg(any(feature = "tcp-async-std", feature = "tcp-tokio"))] -#[cfg_attr(docsrs, doc(cfg(any(feature = "tcp-async-std", feature = "tcp-tokio"))))] +#[cfg(any(feature = "tcp-async-io", feature = "tcp-tokio"))] +#[cfg_attr(docsrs, doc(cfg(any(feature = "tcp-async-io", feature = "tcp-tokio"))))] #[cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))] #[doc(inline)] pub use libp2p_tcp as tcp; @@ -268,8 +268,8 @@ pub use self::transport_ext::TransportExt; /// /// > **Note**: This `Transport` is not suitable for production usage, as its implementation /// > reserves the right to support additional protocols or remove deprecated protocols. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] -#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] +#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] pub fn build_development_transport(keypair: identity::Keypair) -> std::io::Result> { @@ -280,13 +280,13 @@ pub fn build_development_transport(keypair: identity::Keypair) /// /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, /// and mplex or yamux as the multiplexing layer. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] -#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))] +#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux"))))] pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) -> std::io::Result> { let transport = { - #[cfg(feature = "tcp-async-std")] + #[cfg(feature = "tcp-async-io")] let tcp = tcp::TcpConfig::new().nodelay(true); #[cfg(feature = "tcp-tokio")] let tcp = tcp::TokioTcpConfig::new().nodelay(true); @@ -311,13 +311,13 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair) /// /// The implementation supports TCP/IP, WebSockets over TCP/IP, noise as the encryption layer, /// and mplex or yamux as the multiplexing layer. -#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))] -#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-std", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))))] +#[cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))] +#[cfg_attr(docsrs, doc(cfg(all(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")), any(feature = "tcp-async-io", feature = "tcp-tokio"), feature = "websocket", feature = "noise", feature = "mplex", feature = "yamux", feature = "pnet"))))] pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreSharedKey) -> std::io::Result> { let transport = { - #[cfg(feature = "tcp-async-std")] + #[cfg(feature = "tcp-async-io")] let tcp = tcp::TcpConfig::new().nodelay(true); #[cfg(feature = "tcp-tokio")] let tcp = tcp::TokioTcpConfig::new().nodelay(true); diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index b9bd3763e4a..beba6778689 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -202,6 +202,10 @@ where Ok(future.boxed().right_future()) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.inner.address_translation(server, observed) + } } /// Error that can be generated by the DNS layer. @@ -289,6 +293,10 @@ mod tests { }; Ok(Box::pin(future::ready(Ok(())))) } + + fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option { + None + } } futures::executor::block_on(async move { diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 63a3cdab484..62c0b6423dd 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,5 +1,12 @@ # 0.27.0 [unreleased] +- Add support for port reuse and (re)add transport-specific + address translation. Thereby use only `async-io` instead of + `async-std`, renaming the feature accordingly. `async-io` + is a default feature, with an additional `tokio` feature + as before. + [PR 1887](https://github.com/libp2p/rust-libp2p/pull/1887) + - Update dependencies. # 0.26.0 [2020-12-17] diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 78d1dcb0664..aa968a29c41 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,16 +10,24 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = { version = "1.6.5", optional = true } -futures = "0.3.1" +async-io-crate = { package = "async-io", version = "1.2.0", optional = true } +futures = "0.3.8" futures-timer = "3.0" -if-addrs = "0.6.4" +if-watch = { version = "0.1.4", optional = true } +if-addrs = { version = "0.6.4", optional = true } ipnet = "2.0.0" +libc = "0.2.80" libp2p-core = { version = "0.27.0", path = "../../core" } -log = "0.4.1" -socket2 = { version = "0.3.12" } -tokio = { version = "0.3", default-features = false, features = ["net"], optional = true } +log = "0.4.11" +socket2 = { version = "0.3.17", features = ["reuseport"] } +tokio-crate = { package = "tokio", version = "0.3", default-features = false, features = ["net"], optional = true } -[dev-dependencies] -libp2p-tcp = { path = ".", features = ["async-std"] } +[features] +default = ["async-io"] +tokio = ["tokio-crate", "if-addrs"] +async-io = ["async-io-crate", "if-watch"] +[dev-dependencies] +async-std = { version = "1.6.5", features = ["attributes"] } +tokio-crate = { package = "tokio", version = "0.3", default-features = false, features = ["net", "rt"] } +env_logger = "0.8.2" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index b08062fe114..df8256b47ff 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -22,362 +22,638 @@ //! //! # Usage //! -//! This crate provides two structs, `TcpConfig` and `TokioTcpConfig`, depending on which -//! features are enabled. -//! -//! Both the `TcpConfig` and `TokioTcpConfig` structs implement the `Transport` trait of the -//! `core` library. See the documentation of `core` and of libp2p in general to learn how to -//! use the `Transport` trait. +//! This crate provides a `TcpConfig` and `TokioTcpConfig`, depending on +//! the enabled features, which implement the `Transport` trait for use as a +//! transport with `libp2p-core` or `libp2p-swarm`. + +mod provider; + +#[cfg(feature = "async-io")] +pub use provider::async_io; -use futures::{future::{self, Ready}, prelude::*}; +/// The type of a [`GenTcpConfig`] using the `async-io` implementation. +#[cfg(feature = "async-io")] +pub type TcpConfig = GenTcpConfig; + +#[cfg(feature = "tokio")] +pub use provider::tokio; + +/// The type of a [`GenTcpConfig`] using the `tokio` implementation. +#[cfg(feature = "tokio")] +pub type TokioTcpConfig = GenTcpConfig; + +use futures::{ + future::{self, BoxFuture, Ready}, + prelude::*, + ready, +}; use futures_timer::Delay; -use if_addrs::{IfAddr, get_if_addrs}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use libp2p_core::{ - Transport, - multiaddr::{Protocol, Multiaddr}, - transport::{ListenerEvent, TransportError} + address_translation, + multiaddr::{Multiaddr, Protocol}, + transport::{ListenerEvent, Transport, TransportError}, }; -use log::{debug, trace}; -use socket2::{Socket, Domain, Type}; +use socket2::{Domain, Socket, Type}; use std::{ - collections::VecDeque, - convert::TryFrom, + collections::HashSet, io, - iter::{self, FromIterator}, - net::{IpAddr, SocketAddr}, + net::{SocketAddr, IpAddr, TcpListener}, pin::Pin, + sync::{Arc, RwLock}, task::{Context, Poll}, - time::Duration + time::Duration, }; -macro_rules! codegen { - ($feature_name:expr, $tcp_config:ident, $tcp_trans_stream:ident, $tcp_listen_stream:ident, $apply_config:ident, $tcp_stream:ty, $tcp_listener:ty) => { +use provider::{Provider, IfEvent}; -/// Represents the configuration for a TCP/IP transport capability for libp2p. +/// The configuration for a TCP/IP transport capability for libp2p. /// -/// The TCP sockets created by libp2p will need to be progressed by running the futures and streams -/// obtained by libp2p through the tokio reactor. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -#[derive(Debug, Clone, Default)] -pub struct $tcp_config { - /// How long a listener should sleep after receiving an error, before trying again. - sleep_on_error: Duration, +/// A [`GenTcpConfig`] implements the [`Transport`] interface and thus +/// is consumed on [`Transport::listen_on`] and [`Transport::dial`]. +/// However, the config can be cheaply cloned to perform multiple such +/// operations with the same config. +#[derive(Clone, Debug)] +pub struct GenTcpConfig { + /// The type of the I/O provider. + _impl: std::marker::PhantomData, /// TTL to set for opened sockets, or `None` to keep default. ttl: Option, /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default. nodelay: Option, + /// Size of the listen backlog for listen sockets. + backlog: u32, + /// The configuration of port reuse when dialing. + port_reuse: PortReuse, } -impl $tcp_config { - /// Creates a new configuration object for TCP/IP. - pub fn new() -> $tcp_config { - $tcp_config { - sleep_on_error: Duration::from_millis(100), +type Port = u16; + +/// The configuration for port reuse of listening sockets. +#[derive(Debug, Clone)] +enum PortReuse { + /// Port reuse is disabled, i.e. ephemeral local ports are + /// used for outgoing TCP connections. + Disabled, + /// Port reuse when dialing is enabled, i.e. the local + /// address and port that a new socket for an outgoing + /// connection is bound to are chosen from an existing + /// listening socket, if available. + Enabled { + /// The addresses and ports of the listening sockets + /// registered as eligible for port reuse when dialing. + listen_addrs: Arc>> + }, +} + +impl PortReuse { + /// Registers a socket address for port reuse. + /// + /// Has no effect if port reuse is disabled. + fn register(&mut self, ip: IpAddr, port: Port) { + if let PortReuse::Enabled { listen_addrs } = self { + log::trace!("Registering for port reuse: {}:{}", ip, port); + listen_addrs + .write() + .expect("`register()` and `unregister()` never panic while holding the lock") + .insert((ip, port)); + } + } + + /// Unregisters a socket address for port reuse. + /// + /// Has no effect if port reuse is disabled. + fn unregister(&mut self, ip: IpAddr, port: Port) { + if let PortReuse::Enabled { listen_addrs } = self { + log::trace!("Unregistering for port reuse: {}:{}", ip, port); + listen_addrs + .write() + .expect("`register()` and `unregister()` never panic while holding the lock") + .remove(&(ip, port)); + } + } + + /// Selects a listening socket address suitable for use + /// as the local socket address when dialing. + /// + /// If multiple listening sockets are registered for port + /// reuse, one is chosen whose IP protocol version and + /// loopback status is the same as that of `remote_ip`. + /// + /// Returns `None` if port reuse is disabled or no suitable + /// listening socket address is found. + fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option { + if let PortReuse::Enabled { listen_addrs } = self { + for (ip, port) in listen_addrs + .read() + .expect("`register()` and `unregister()` never panic while holding the lock") + .iter() + { + if ip.is_ipv4() == remote_ip.is_ipv4() + && ip.is_loopback() == remote_ip.is_loopback() + { + return Some(SocketAddr::new(*ip, *port)) + } + } + } + + None + } +} + +impl GenTcpConfig +where + T: Provider + Send, +{ + /// Creates a new configuration for a TCP/IP transport: + /// + /// * Nagle's algorithm, i.e. `TCP_NODELAY`, is _enabled_. + /// See [`GenTcpConfig::nodelay`]. + /// * Reuse of listening ports is _disabled_. + /// See [`GenTcpConfig::port_reuse`]. + /// * No custom `IP_TTL` is set. The default of the OS TCP stack applies. + /// See [`GenTcpConfig::ttl`]. + /// * The size of the listen backlog for new listening sockets is `1024`. + /// See [`GenTcpConfig::listen_backlog`]. + pub fn new() -> Self { + Self { ttl: None, nodelay: None, + backlog: 1024, + port_reuse: PortReuse::Disabled, + _impl: std::marker::PhantomData, } } - /// Sets the TTL to set for opened sockets. + /// Configures the `IP_TTL` option for new sockets. pub fn ttl(mut self, value: u32) -> Self { self.ttl = Some(value); self } - /// Sets the `TCP_NODELAY` to set for opened sockets. + /// Configures the `TCP_NODELAY` option for new sockets. pub fn nodelay(mut self, value: bool) -> Self { self.nodelay = Some(value); self } -} -impl Transport for $tcp_config { - type Output = $tcp_trans_stream; - type Error = io::Error; - type Listener = Pin, Self::Error>> + Send>>; - type ListenerUpgrade = Ready>; - type Dial = Pin> + Send>>; - - fn listen_on(self, addr: Multiaddr) -> Result> { - let socket_addr = - if let Ok(sa) = multiaddr_to_socketaddr(&addr) { - sa - } else { - return Err(TransportError::MultiaddrNotSupported(addr)) - }; + /// Configures the listen backlog for new listen sockets. + pub fn listen_backlog(mut self, backlog: u32) -> Self { + self.backlog = backlog; + self + } - async fn do_listen(cfg: $tcp_config, socket_addr: SocketAddr) - -> Result>, io::Error>, io::Error>>, io::Error> - { - let socket = if socket_addr.is_ipv4() { - Socket::new(Domain::ipv4(), Type::stream(), Some(socket2::Protocol::tcp()))? - } else { - let s = Socket::new(Domain::ipv6(), Type::stream(), Some(socket2::Protocol::tcp()))?; - s.set_only_v6(true)?; - s - }; - if cfg!(target_family = "unix") { - socket.set_reuse_address(true)?; + /// Configures port reuse for local sockets, which implies + /// reuse of listening ports for outgoing connections to + /// enhance NAT traversal capabilities. + /// + /// Please refer to e.g. [RFC 4787](https://tools.ietf.org/html/rfc4787) + /// section 4 and 5 for some of the NAT terminology used here. + /// + /// There are two main use-cases for port reuse among local + /// sockets: + /// + /// 1. Creating multiple listening sockets for the same address + /// and port to allow accepting connections on multiple threads + /// without having to synchronise access to a single listen socket. + /// + /// 2. Creating outgoing connections whose local socket is bound to + /// the same address and port as a listening socket. In the rare + /// case of simple NATs with both endpoint-independent mapping and + /// endpoint-independent filtering, this can on its own already + /// permit NAT traversal by other nodes sharing the observed + /// external address of the local node. For the common case of + /// NATs with address-dependent or address and port-dependent + /// filtering, port reuse for outgoing connections can facilitate + /// further TCP hole punching techniques for NATs that perform + /// endpoint-independent mapping. Port reuse cannot facilitate + /// NAT traversal in the presence of "symmetric" NATs that employ + /// both address/port-dependent mapping and filtering, unless + /// there is some means of port prediction. + /// + /// Both use-cases are enabled when port reuse is enabled, with port reuse + /// for outgoing connections (`2.` above) always being implied. + /// + /// > **Note**: Due to the identification of a TCP socket by a 4-tuple + /// > of source IP address, source port, destination IP address and + /// > destination port, with port reuse enabled there can be only + /// > a single outgoing connection to a particular address and port + /// > of a peer per local listening socket address. + /// + /// If enabled, the returned `GenTcpConfig` and all of its `Clone`s + /// keep track of the listen socket addresses as they are reported + /// by polling [`TcpListenStream`]s obtained from [`GenTcpConfig::listen_on()`]. + /// + /// In contrast, two `GenTcpConfig`s constructed separately via [`GenTcpConfig::new()`] + /// maintain these addresses independently. It is thus possible to listen on + /// multiple addresses, enabling port reuse for each, knowing exactly which + /// listen address is reused when dialing with a specific `GenTcpConfig`, as in + /// the following example: + /// + /// ```no_run + /// # use libp2p_core::transport::ListenerEvent; + /// # use libp2p_core::{Multiaddr, Transport}; + /// # use futures::stream::StreamExt; + /// #[cfg(feature = "async-io")] + /// #[async_std::main] + /// async fn main() -> std::io::Result<()> { + /// use libp2p_tcp::TcpConfig; + /// + /// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap(); + /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap(); + /// + /// let tcp1 = TcpConfig::new().port_reuse(true); + /// let mut listener1 = tcp1.clone().listen_on(listen_addr1.clone()).expect("listener"); + /// match listener1.next().await.expect("event")? { + /// ListenerEvent::NewAddress(listen_addr) => { + /// println!("Listening on {:?}", listen_addr); + /// let mut stream = tcp1.dial(listen_addr2.clone()).unwrap().await?; + /// // `stream` has `listen_addr1` as its local socket address. + /// } + /// _ => {} + /// } + /// + /// let tcp2 = TcpConfig::new().port_reuse(true); + /// let mut listener2 = tcp2.clone().listen_on(listen_addr2).expect("listener"); + /// match listener2.next().await.expect("event")? { + /// ListenerEvent::NewAddress(listen_addr) => { + /// println!("Listening on {:?}", listen_addr); + /// let mut socket = tcp2.dial(listen_addr1).unwrap().await?; + /// // `stream` has `listen_addr2` as its local socket address. + /// } + /// _ => {} + /// } + /// Ok(()) + /// } + /// ``` + /// + /// If a single `GenTcpConfig` is used and cloned for the creation of multiple + /// listening sockets or a wildcard listen socket address is used to listen + /// on any interface, there can be multiple such addresses registered for + /// port reuse. In this case, one is chosen whose IP protocol version and + /// loopback status is the same as that of the remote address. Consequently, for + /// maximum control of the local listening addresses and ports that are used + /// for outgoing connections, a new `GenTcpConfig` should be created for each + /// listening socket, avoiding the use of wildcard addresses which bind a + /// socket to all network interfaces. + /// + /// When this option is enabled on a unix system, the socket + /// option `SO_REUSEPORT` is set, if available, to permit + /// reuse of listening ports for multiple sockets. + pub fn port_reuse(mut self, port_reuse: bool) -> Self { + self.port_reuse = if port_reuse { + PortReuse::Enabled { + listen_addrs: Arc::new(RwLock::new(HashSet::new())) } - socket.bind(&socket_addr.into())?; - socket.listen(1024)?; // we may want to make this configurable - - // Note: Tokio's TcpListener::from_std, which the TcpListener's TryFrom implementation - // uses, does not set the socket into non-blocking mode. - #[cfg(feature = "tokio")] - socket.set_nonblocking(true); - - let listener = <$tcp_listener>::try_from(socket.into_tcp_listener()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - let local_addr = listener.local_addr()?; - let port = local_addr.port(); - - // Determine all our listen addresses which is either a single local IP address - // or (if a wildcard IP address was used) the addresses of all our interfaces, - // as reported by `get_if_addrs`. - let addrs = - if socket_addr.ip().is_unspecified() { - let addrs = host_addresses(port)?; - debug!("Listening on {:?}", addrs.iter().map(|(_, _, ma)| ma).collect::>()); - Addresses::Many(addrs) - } else { - let ma = ip_to_multiaddr(local_addr.ip(), port); - debug!("Listening on {:?}", ma); - Addresses::One(ma) - }; - - // Generate `NewAddress` events for each new `Multiaddr`. - let pending = match addrs { - Addresses::One(ref ma) => { - let event = ListenerEvent::NewAddress(ma.clone()); - let mut list = VecDeque::new(); - list.push_back(Ok(event)); - list - } - Addresses::Many(ref aa) => { - aa.iter() - .map(|(_, _, ma)| ma) - .cloned() - .map(ListenerEvent::NewAddress) - .map(Result::Ok) - .collect::>() - } - }; + } else { + PortReuse::Disabled + }; - let listen_stream = $tcp_listen_stream { - stream: listener, - pause: None, - pause_duration: cfg.sleep_on_error, - port, - addrs, - pending, - config: cfg - }; + self + } - Ok(stream::unfold(listen_stream, |s| s.next().map(Some))) + fn create_socket(&self, socket_addr: &SocketAddr) -> io::Result { + let domain = if socket_addr.is_ipv4() { + Domain::ipv4() + } else { + Domain::ipv6() + }; + let socket = Socket::new(domain, Type::stream(), Some(socket2::Protocol::tcp()))?; + if socket_addr.is_ipv6() { + socket.set_only_v6(true)?; } - - Ok(Box::pin(do_listen(self, socket_addr).try_flatten_stream())) + if let Some(ttl) = self.ttl { + socket.set_ttl(ttl)?; + } + if let Some(nodelay) = self.nodelay { + socket.set_nodelay(nodelay)?; + } + socket.set_reuse_address(true)?; + #[cfg(unix)] + if let PortReuse::Enabled { .. } = &self.port_reuse { + socket.set_reuse_port(true)?; + } + Ok(socket) } - fn dial(self, addr: Multiaddr) -> Result> { - let socket_addr = - if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { - debug!("Instantly refusing dialing {}, as it is invalid", addr); - return Err(TransportError::Other(io::ErrorKind::ConnectionRefused.into())) - } - socket_addr - } else { - return Err(TransportError::MultiaddrNotSupported(addr)) - }; + fn do_listen(self, socket_addr: SocketAddr) -> io::Result> { + let socket = self.create_socket(&socket_addr)?; + socket.bind(&socket_addr.into())?; + socket.listen(self.backlog as _)?; + socket.set_nonblocking(true)?; + TcpListenStream::::new(socket.into_tcp_listener(), self.port_reuse) + } - debug!("Dialing {}", addr); + async fn do_dial(self, socket_addr: SocketAddr) -> Result { + let socket = self.create_socket(&socket_addr)?; - async fn do_dial(cfg: $tcp_config, socket_addr: SocketAddr) -> Result<$tcp_trans_stream, io::Error> { - let stream = <$tcp_stream>::connect(&socket_addr).await?; - $apply_config(&cfg, &stream)?; - Ok($tcp_trans_stream { inner: stream }) + if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) { + log::trace!("Binding dial socket to listen socket {}", addr); + socket.bind(&addr.into())?; } - Ok(Box::pin(do_dial(self, socket_addr))) + socket.set_nonblocking(true)?; + + match socket.connect(&socket_addr.into()) { + Ok(()) => {} + Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => return Err(err), + }; + + let stream = T::new_stream(socket.into_tcp_stream()).await?; + Ok(stream) } } -/// Stream that listens on an TCP/IP address. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -pub struct $tcp_listen_stream { - /// The incoming connections. - stream: $tcp_listener, - /// The current pause if any. - pause: Option, - /// How long to pause after an error. - pause_duration: Duration, - /// The port which we use as our listen port in listener event addresses. - port: u16, - /// The set of known addresses. - addrs: Addresses, - /// Temporary buffer of listener events. - pending: Buffer<$tcp_trans_stream>, - /// Original configuration. - config: $tcp_config -} +impl Transport for GenTcpConfig +where + T: Provider + Send + 'static, + T::Listener: Unpin, + T::IfWatcher: Unpin, + T::Stream: Unpin, +{ + type Output = T::Stream; + type Error = io::Error; + type Dial = Pin> + Send>>; + type Listener = TcpListenStream; + type ListenerUpgrade = Ready>; -impl $tcp_listen_stream { - /// Takes ownership of the listener, and returns the next incoming event and the listener. - async fn next(mut self) -> (Result>, io::Error>, io::Error>, Self) { - loop { - if let Some(event) = self.pending.pop_front() { - return (event, self); - } + fn listen_on(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) { + sa + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + log::debug!("listening on {}", socket_addr); + self.do_listen(socket_addr) + .map_err(|e| TransportError::Other(e)) + } - if let Some(pause) = self.pause.take() { - let _ = pause.await; + fn dial(self, addr: Multiaddr) -> Result> { + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { + return Err(TransportError::MultiaddrNotSupported(addr)); } + socket_addr + } else { + return Err(TransportError::MultiaddrNotSupported(addr)); + }; + log::debug!("dialing {}", socket_addr); + Ok(Box::pin(self.do_dial(socket_addr))) + } - // TODO: do we get the peer_addr at the same time? - let (sock, _) = match self.stream.accept().await { - Ok(s) => s, - Err(e) => { - debug!("error accepting incoming connection: {}", e); - self.pause = Some(Delay::new(self.pause_duration)); - return (Ok(ListenerEvent::Error(e)), self); - } - }; - - let sock_addr = match sock.peer_addr() { - Ok(addr) => addr, - Err(err) => { - debug!("Failed to get peer address: {:?}", err); - continue - } - }; - - let local_addr = match sock.local_addr() { - Ok(sock_addr) => { - if let Addresses::Many(ref mut addrs) = self.addrs { - if let Err(err) = check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending) { - return (Ok(ListenerEvent::Error(err)), self); - } - } - ip_to_multiaddr(sock_addr.ip(), sock_addr.port()) - } - Err(err) => { - debug!("Failed to get local address of incoming socket: {:?}", err); - continue - } - }; - - let remote_addr = ip_to_multiaddr(sock_addr.ip(), sock_addr.port()); - - match $apply_config(&self.config, &sock) { - Ok(()) => { - trace!("Incoming connection from {} at {}", remote_addr, local_addr); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::ok($tcp_trans_stream { inner: sock }), - local_addr, - remote_addr - })) - } - Err(err) => { - debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err); - self.pending.push_back(Ok(ListenerEvent::Upgrade { - upgrade: future::err(err), - local_addr, - remote_addr - })) - } - } + /// When port reuse is disabled and hence ephemeral local ports are + /// used for outgoing connections, the returned address is the + /// `observed` address with the port replaced by the port of the + /// `listen` address. + /// + /// If port reuse is enabled, `Some(observed)` is returned, as there + /// is a chance that the `observed` address _and_ port are reachable + /// for other peers if there is a NAT in the way that does endpoint- + /// independent filtering. Furthermore, even if that is not the case + /// and TCP hole punching techniques must be used for NAT traversal, + /// the `observed` address is still the one that a remote should connect + /// to for the purpose of the hole punching procedure, as it represents + /// the mapped IP and port of the NAT device in front of the local + /// node. + /// + /// `None` is returned if one of the given addresses is not a TCP/IP + /// address. + fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option { + match &self.port_reuse { + PortReuse::Disabled => address_translation(listen, observed), + PortReuse::Enabled { .. } => Some(observed.clone()), } } } -/// Wraps around a `TcpStream` and adds logging for important events. -#[cfg_attr(docsrs, doc(cfg(feature = $feature_name)))] -#[derive(Debug)] -pub struct $tcp_trans_stream { - inner: $tcp_stream, -} +type TcpListenerEvent = ListenerEvent>, io::Error>; -impl Drop for $tcp_trans_stream { - fn drop(&mut self) { - if let Ok(addr) = self.inner.peer_addr() { - debug!("Dropped TCP connection to {:?}", addr); - } else { - debug!("Dropped TCP connection to undeterminate peer"); - } - } +enum IfWatch { + Pending(BoxFuture<'static, io::Result>), + Ready(TIfWatcher), } -/// Applies the socket configuration parameters to a socket. -fn $apply_config(config: &$tcp_config, socket: &$tcp_stream) -> Result<(), io::Error> { - if let Some(ttl) = config.ttl { - socket.set_ttl(ttl)?; +/// The listening addresses of a [`TcpListenStream`]. +enum InAddr { + /// The stream accepts connections on a single interface. + One { + addr: IpAddr, + out: Option + }, + /// The stream accepts connections on all interfaces. + Any { + addrs: HashSet, + if_watch: IfWatch, } - - if let Some(nodelay) = config.nodelay { - socket.set_nodelay(nodelay)?; - } - - Ok(()) } -}; +/// A stream of incoming connections on one or more interfaces. +pub struct TcpListenStream +where + T: Provider +{ + /// The socket address that the listening socket is bound to, + /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY` + /// when listening on all interfaces for IPv4 respectively IPv6 connections. + listen_addr: SocketAddr, + /// The async listening socket for incoming connections. + listener: T::Listener, + /// The IP addresses of network interfaces on which the listening socket + /// is accepting connections. + /// + /// If the listen socket listens on all interfaces, these may change over + /// time as interfaces become available or unavailable. + in_addr: InAddr, + /// The port reuse configuration for outgoing connections. + /// + /// If enabled, all IP addresses on which this listening stream + /// is accepting connections (`in_addr`) are registered for reuse + /// as local addresses for the sockets of outgoing connections. They are + /// unregistered when the stream encounters an error or is dropped. + port_reuse: PortReuse, + /// How long to sleep after a (non-fatal) error while trying + /// to accept a new connection. + sleep_on_error: Duration, + /// The current pause, if any. + pause: Option, } -#[cfg(feature = "async-std")] -codegen!("async-std", TcpConfig, TcpTransStream, TcpListenStream, apply_config_async_std, async_std::net::TcpStream, async_std::net::TcpListener); - -#[cfg(feature = "tokio")] -codegen!("tokio", TokioTcpConfig, TokioTcpTransStream, TokioTcpListenStream, apply_config_tokio, tokio::net::TcpStream, tokio::net::TcpListener); - -#[cfg(feature = "async-std")] -impl AsyncRead for TcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - AsyncRead::poll_read(Pin::new(&mut self.inner), cx, buf) - } -} +impl TcpListenStream +where + T: Provider +{ + /// Constructs a `TcpListenStream` for incoming connections around + /// the given `TcpListener`. + fn new(listener: TcpListener, port_reuse: PortReuse) -> io::Result { + let listen_addr = listener.local_addr()?; + + let in_addr = if match &listen_addr { + SocketAddr::V4(a) => a.ip().is_unspecified(), + SocketAddr::V6(a) => a.ip().is_unspecified(), + } { + // The `addrs` are populated via `if_watch` when the + // `TcpListenStream` is polled. + InAddr::Any { + addrs: HashSet::new(), + if_watch: IfWatch::Pending(T::if_watcher()), + } + } else { + InAddr::One { + out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())), + addr: listen_addr.ip(), + } + }; -#[cfg(feature = "async-std")] -impl AsyncWrite for TcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } + let listener = T::new_listener(listener)?; - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) + Ok(TcpListenStream { + port_reuse, + listener, + listen_addr, + in_addr, + pause: None, + sleep_on_error: Duration::from_millis(100), + }) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - AsyncWrite::poll_close(Pin::new(&mut self.inner), cx) + /// Disables port reuse for any listen address of this stream. + /// + /// This is done when the `TcpListenStream` encounters a fatal + /// error (for the stream) or is dropped. + /// + /// Has no effect if port reuse is disabled. + fn disable_port_reuse(&mut self) { + match &self.in_addr { + InAddr::One { addr, .. } => { + self.port_reuse.unregister(*addr, self.listen_addr.port()); + }, + InAddr::Any { addrs, .. } => { + for addr in addrs { + self.port_reuse.unregister(*addr, self.listen_addr.port()); + } + } + } } } -#[cfg(feature = "tokio")] -impl AsyncRead for TokioTcpTransStream { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - // Adapted from - // https://github.com/tokio-rs/tokio/blob/6d99e1c7dec4c6a37c4c7bf2801bc82cc210351d/tokio-util/src/compat.rs#L126. - let mut read_buf = tokio::io::ReadBuf::new(buf); - futures::ready!(tokio::io::AsyncRead::poll_read(Pin::new(&mut self.inner), cx, &mut read_buf))?; - Poll::Ready(Ok(read_buf.filled().len())) +impl Drop for TcpListenStream +where + T: Provider +{ + fn drop(&mut self) { + self.disable_port_reuse(); } } -#[cfg(feature = "tokio")] -impl AsyncWrite for TokioTcpTransStream { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.inner), cx, buf) - } +impl Stream for TcpListenStream +where + T: Provider, + T::Listener: Unpin, + T::Stream: Unpin, + T::IfWatcher: Unpin, +{ + type Item = Result, io::Error>; - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.inner), cx) - } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let me = Pin::into_inner(self); + + loop { + match &mut me.in_addr { + InAddr::Any { if_watch, addrs } => match if_watch { + // If we listen on all interfaces, wait for `if-watch` to be ready. + IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) { + Ok(w) => { + *if_watch = IfWatch::Ready(w); + continue + } + Err(err) => { + log::debug! { + "Failed to begin observing interfaces: {:?}. Scheduling retry.", + err + }; + *if_watch = IfWatch::Pending(T::if_watcher()); + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(ListenerEvent::Error(err)))); + } + }, + // Consume all events for up/down interface changes. + IfWatch::Ready(watch) => while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) { + match ev { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + if addrs.insert(ip) { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("New listen address: {}", ma); + me.port_reuse.register(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma)))); + } + } + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + if addrs.remove(&ip) { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("Expired listen address: {}", ma); + me.port_reuse.unregister(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma)))); + } + } + } + Err(err) => { + log::debug! { + "Failure polling interfaces: {:?}. Scheduling retry.", + err + }; + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(ListenerEvent::Error(err)))); + } + } + }, + }, + // If the listener is bound to a single interface, make sure the + // address is registered for port reuse and reported once. + InAddr::One { addr, out } => if let Some(multiaddr) = out.take() { + me.port_reuse.register(*addr, me.listen_addr.port()); + return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(multiaddr)))) + } + } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.inner), cx) + if let Some(mut pause) = me.pause.take() { + match Pin::new(&mut pause).poll(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + me.pause = Some(pause); + return Poll::Pending; + } + } + } + + // Take the pending connection from the backlog. + let incoming = match T::poll_accept(&mut me.listener, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(incoming)) => incoming, + Poll::Ready(Err(e)) => { + // These errors are non-fatal for the listener stream. + log::error!("error accepting incoming connection: {}", e); + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(ListenerEvent::Error(e)))); + } + }; + + let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); + let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); + + log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); + + return Poll::Ready(Some(Ok(ListenerEvent::Upgrade { + upgrade: future::ok(incoming.stream), + local_addr, + remote_addr, + }))); + } } } @@ -400,161 +676,19 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { // Create a [`Multiaddr`] from the given IP address and port number. fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr { - let proto = match ip { - IpAddr::V4(ip) => Protocol::Ip4(ip), - IpAddr::V6(ip) => Protocol::Ip6(ip) - }; - let it = iter::once(proto).chain(iter::once(Protocol::Tcp(port))); - Multiaddr::from_iter(it) -} - -// Collect all local host addresses and use the provided port number as listen port. -fn host_addresses(port: u16) -> io::Result> { - let mut addrs = Vec::new(); - for iface in get_if_addrs()? { - let ip = iface.ip(); - let ma = ip_to_multiaddr(ip, port); - let ipn = match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u32, so can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len is the number of bits in a u128, so can not exceed 128"); - IpNet::V6(ipnet) - } - }; - addrs.push((ip, ipn, ma)) - } - Ok(addrs) -} - -/// Listen address information. -#[derive(Debug)] -enum Addresses { - /// A specific address is used to listen. - One(Multiaddr), - /// A set of addresses is used to listen. - Many(Vec<(IpAddr, IpNet, Multiaddr)>) -} - -type Buffer = VecDeque>, io::Error>, io::Error>>; - -// If we listen on all interfaces, find out to which interface the given -// socket address belongs. In case we think the address is new, check -// all host interfaces again and report new and expired listen addresses. -fn check_for_interface_changes( - socket_addr: &SocketAddr, - listen_port: u16, - listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>, - pending: &mut Buffer -) -> Result<(), io::Error> { - // Check for exact match: - if listen_addrs.iter().find(|(ip, ..)| ip == &socket_addr.ip()).is_some() { - return Ok(()) - } - - // No exact match => check netmask - if listen_addrs.iter().find(|(_, net, _)| net.contains(&socket_addr.ip())).is_some() { - return Ok(()) - } - - // The local IP address of this socket is new to us. - // We check for changes in the set of host addresses and report new - // and expired addresses. - // - // TODO: We do not detect expired addresses unless there is a new address. - let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(listen_port)?); - - // Check for addresses no longer in use. - for (ip, _, ma) in old_listen_addrs.iter() { - if listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { - debug!("Expired listen address: {}", ma); - pending.push_back(Ok(ListenerEvent::AddressExpired(ma.clone()))); - } - } - - // Check for new addresses. - for (ip, _, ma) in listen_addrs.iter() { - if old_listen_addrs.iter().find(|(i, ..)| i == ip).is_none() { - debug!("New listen address: {}", ma); - pending.push_back(Ok(ListenerEvent::NewAddress(ma.clone()))); - } - } - - // We should now be able to find the local address, if not something - // is seriously wrong and we report an error. - if listen_addrs.iter() - .find(|(ip, net, _)| ip == &socket_addr.ip() || net.contains(&socket_addr.ip())) - .is_none() - { - let msg = format!("{} does not match any listen address", socket_addr.ip()); - return Err(io::Error::new(io::ErrorKind::Other, msg)) - } - - Ok(()) + Multiaddr::empty() + .with(ip.into()) + .with(Protocol::Tcp(port)) } #[cfg(test)] mod tests { - use futures::prelude::*; - use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use super::multiaddr_to_socketaddr; - #[cfg(feature = "async-std")] - use super::TcpConfig; - - #[test] - #[cfg(feature = "async-std")] - fn wildcard_expansion() { - fn test(addr: Multiaddr) { - let mut listener = TcpConfig::new().listen_on(addr).expect("listener"); - - // Get the first address. - let addr = futures::executor::block_on_stream(listener.by_ref()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); - - // Process all initial `NewAddress` events and make sure they - // do not contain wildcard address or port. - let server = listener - .take_while(|event| match event.as_ref().unwrap() { - ListenerEvent::NewAddress(a) => { - let mut iter = a.iter(); - match iter.next().expect("ip address") { - Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), - Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), - other => panic!("Unexpected protocol: {}", other) - } - if let Protocol::Tcp(port) = iter.next().expect("port") { - assert_ne!(0, port) - } else { - panic!("No TCP port in address: {}", a) - } - futures::future::ready(true) - } - _ => futures::future::ready(false) - }) - .for_each(|_| futures::future::ready(())); - - let client = TcpConfig::new().dial(addr).expect("dialer"); - async_std::task::block_on(futures::future::join(server, client)).1.unwrap(); - } - - test("/ip4/0.0.0.0/tcp/0".parse().unwrap()); - test("/ip6/::1/tcp/0".parse().unwrap()); - } + use futures::channel::mpsc; + use super::*; #[test] fn multiaddr_to_tcp_conversion() { - use std::net::Ipv6Addr; + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; assert!( multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) @@ -602,45 +736,200 @@ mod tests { } #[test] - #[cfg(feature = "async-std")] fn communicating_between_dialer_and_listener() { + env_logger::try_init().ok(); + + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let tcp = GenTcpConfig::::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.send(listen_addr).await.unwrap(); + } + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + return + } + e => panic!("Unexpected listener event: {:?}", e), + } + } + } + + async fn dialer(mut ready_rx: mpsc::Receiver) { + let addr = ready_rx.next().await.unwrap(); + let tcp = GenTcpConfig::::new(); + + // Obtain a future socket through dialing + let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + } + fn test(addr: Multiaddr) { - let (ready_tx, ready_rx) = futures::channel::oneshot::channel(); - let mut ready_tx = Some(ready_tx); + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } - async_std::task::spawn(async move { - let tcp = TcpConfig::new(); - let mut listener = tcp.listen_on(addr).unwrap(); - - loop { - match listener.next().await.unwrap().unwrap() { - ListenerEvent::NewAddress(listen_addr) => { - ready_tx.take().unwrap().send(listen_addr).unwrap(); - }, - ListenerEvent::Upgrade { upgrade, .. } => { - let mut upgrade = upgrade.await.unwrap(); - let mut buf = [0u8; 3]; - upgrade.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [1, 2, 3]); - upgrade.write_all(&[4, 5, 6]).await.unwrap(); - }, - _ => unreachable!() + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let tasks = tokio_crate::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } + } + + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); + test("/ip6/::1/tcp/0".parse().unwrap()); + } + + #[test] + fn wildcard_expansion() { + env_logger::try_init().ok(); + + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let tcp = GenTcpConfig::::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(a) => { + let mut iter = a.iter(); + match iter.next().expect("ip address") { + Protocol::Ip4(ip) => assert!(!ip.is_unspecified()), + Protocol::Ip6(ip) => assert!(!ip.is_unspecified()), + other => panic!("Unexpected protocol: {}", other), + } + if let Protocol::Tcp(port) = iter.next().expect("port") { + assert_ne!(0, port) + } else { + panic!("No TCP port in address: {}", a) + } + ready_tx.send(a).await.ok(); + return } + _ => {} } - }); + } + } - async_std::task::block_on(async move { - let addr = ready_rx.await.unwrap(); - let tcp = TcpConfig::new(); + async fn dialer(mut ready_rx: mpsc::Receiver) { + let dest_addr = ready_rx.next().await.unwrap(); + let tcp = GenTcpConfig::::new(); + tcp.dial(dest_addr).unwrap().await.unwrap(); + } + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } + + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(ready_rx); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let tasks = tokio_crate::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } + } - // Obtain a future socket through dialing - let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); - socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + test("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + test("/ip6/::1/tcp/0".parse().unwrap()); + } + + #[test] + fn port_reuse_dialing() { + env_logger::try_init().ok(); + + async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { + let tcp = GenTcpConfig::::new(); + let mut listener = tcp.listen_on(addr).unwrap(); + loop { + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(listen_addr) => { + ready_tx.send(listen_addr).await.ok(); + } + ListenerEvent::Upgrade { upgrade, .. } => { + let mut upgrade = upgrade.await.unwrap(); + let mut buf = [0u8; 3]; + upgrade.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [1, 2, 3]); + upgrade.write_all(&[4, 5, 6]).await.unwrap(); + return + } + e => panic!("Unexpected event: {:?}", e), + } + } + } + + async fn dialer(addr: Multiaddr, mut ready_rx: mpsc::Receiver) { + let dest_addr = ready_rx.next().await.unwrap(); + let tcp = GenTcpConfig::::new().port_reuse(true); + let mut listener = tcp.clone().listen_on(addr).unwrap(); + match listener.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(_) => { + // Obtain a future socket through dialing + let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap(); + socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap(); + // socket.flush().await; + let mut buf = [0u8; 3]; + socket.read_exact(&mut buf).await.unwrap(); + assert_eq!(buf, [4, 5, 6]); + } + e => panic!("Unexpected listener event: {:?}", e) + } + } + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(addr.clone(), ready_rx); + let listener = async_std::task::spawn(listener); + async_std::task::block_on(dialer); + async_std::task::block_on(listener); + } - let mut buf = [0u8; 3]; - socket.read_exact(&mut buf).await.unwrap(); - assert_eq!(buf, [4, 5, 6]); - }); + #[cfg(feature = "tokio")] + { + let (ready_tx, ready_rx) = mpsc::channel(1); + let listener = listener::(addr.clone(), ready_tx); + let dialer = dialer::(addr.clone(), ready_rx); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let tasks = tokio_crate::task::LocalSet::new(); + let listener = tasks.spawn_local(listener); + tasks.block_on(&rt, dialer); + tasks.block_on(&rt, listener).unwrap(); + } } test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); @@ -648,49 +937,99 @@ mod tests { } #[test] - #[cfg(feature = "async-std")] - fn replace_port_0_in_returned_multiaddr_ipv4() { - let tcp = TcpConfig::new(); + fn port_reuse_listening() { + env_logger::try_init().ok(); + + async fn listen_twice(addr: Multiaddr) { + let tcp = GenTcpConfig::::new().port_reuse(true); + let mut listener1 = tcp.clone().listen_on(addr).unwrap(); + match listener1.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(addr1) => { + // Listen on the same address a second time. + let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap(); + match listener2.next().await.unwrap().unwrap() { + ListenerEvent::NewAddress(addr2) => { + assert_eq!(addr1, addr2); + return + } + e => panic!("Unexpected listener event: {:?}", e), + } + } + e => panic!("Unexpected listener event: {:?}", e), + } + } - let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); - assert!(addr.to_string().contains("tcp/0")); + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let listener = listen_twice::(addr.clone()); + async_std::task::block_on(listener); + } - let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); + #[cfg(feature = "tokio")] + { + let listener = listen_twice::(addr.clone()); + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + rt.block_on(listener); + } + } - assert!(!new_addr.to_string().contains("tcp/0")); + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); } #[test] - #[cfg(feature = "async-std")] - fn replace_port_0_in_returned_multiaddr_ipv6() { - let tcp = TcpConfig::new(); + fn listen_port_0() { + env_logger::try_init().ok(); - let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); - assert!(addr.to_string().contains("tcp/0")); + async fn listen(addr: Multiaddr) -> Multiaddr { + GenTcpConfig::::new() + .listen_on(addr) + .unwrap() + .next() + .await + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address") + } - let new_addr = futures::executor::block_on_stream(tcp.listen_on(addr).unwrap()) - .next() - .expect("some event") - .expect("no error") - .into_new_address() - .expect("listen address"); + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let new_addr = async_std::task::block_on(listen::(addr.clone())); + assert!(!new_addr.to_string().contains("tcp/0")); + } - assert!(!new_addr.to_string().contains("tcp/0")); + #[cfg(feature = "tokio")] + { + let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap(); + let new_addr = rt.block_on(listen::(addr.clone())); + assert!(!new_addr.to_string().contains("tcp/0")); + } + } + + test("/ip6/::1/tcp/0".parse().unwrap()); + test("/ip4/127.0.0.1/tcp/0".parse().unwrap()); } #[test] - #[cfg(feature = "async-std")] - fn larger_addr_denied() { - let tcp = TcpConfig::new(); - - let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345" - .parse::() - .unwrap(); - assert!(tcp.listen_on(addr).is_err()); + fn listen_invalid_addr() { + env_logger::try_init().ok(); + + fn test(addr: Multiaddr) { + #[cfg(feature = "async-io")] + { + let tcp = TcpConfig::new(); + assert!(tcp.listen_on(addr.clone()).is_err()); + } + + #[cfg(feature = "tokio")] + { + let tcp = TokioTcpConfig::new(); + assert!(tcp.listen_on(addr.clone()).is_err()); + } + } + + test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap()); } } diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs new file mode 100644 index 00000000000..091a6691087 --- /dev/null +++ b/transports/tcp/src/provider.rs @@ -0,0 +1,81 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The interface for providers of non-blocking TCP implementations. + +#[cfg(feature = "async-io")] +pub mod async_io; + +#[cfg(feature = "tokio")] +pub mod tokio; + +use futures::io::{AsyncRead, AsyncWrite}; +use futures::future::BoxFuture; +use ipnet::IpNet; +use std::task::{Context, Poll}; +use std::{fmt, io}; +use std::net::{SocketAddr, TcpListener, TcpStream}; + +/// An event relating to a change of availability of an address +/// on a network interface. +pub enum IfEvent { + Up(IpNet), + Down(IpNet), +} + +/// An incoming connection returned from [`Provider::poll_accept()`]. +pub struct Incoming { + pub stream: S, + pub local_addr: SocketAddr, + pub remote_addr: SocketAddr, +} + +/// The interface for non-blocking TCP I/O providers. +pub trait Provider: Clone + Send + 'static { + /// The type of TCP streams obtained from [`Provider::new_stream`] + /// and [`Provider::poll_accept`]. + type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; + /// The type of TCP listeners obtained from [`Provider::new_listener`]. + type Listener: Send + Unpin; + /// The type of network interface observers obtained from [`Provider::if_watcher`]. + type IfWatcher: Send + Unpin; + + /// Creates an instance of [`Self::IfWatcher`] that can be polled for + /// network interface changes via [`Self::poll_interfaces`]. + fn if_watcher() -> BoxFuture<'static, io::Result>; + + /// Creates a new listener wrapping the given [`TcpListener`] that + /// can be polled for incoming connections via [`Self::poll_accept()`]. + fn new_listener(_: TcpListener) -> io::Result; + + /// Creates a new stream for an outgoing connection, wrapping the + /// given [`TcpStream`]. The given `TcpStream` is initiating a + /// connection, but implementations must wait for the connection + /// setup to complete, i.e. for the stream to be writable. + fn new_stream(_: TcpStream) -> BoxFuture<'static, io::Result>; + + /// Polls a [`Self::Listener`] for an incoming connection, ensuring a task wakeup, + /// if necessary. + fn poll_accept(_: &mut Self::Listener, _: &mut Context<'_>) -> Poll>>; + + /// Polls a [`Self::IfWatcher`] for network interface changes, ensuring a task wakeup, + /// if necessary. + fn poll_interfaces(_: &mut Self::IfWatcher, _: &mut Context<'_>) -> Poll>; +} diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs new file mode 100644 index 00000000000..c868e196f6d --- /dev/null +++ b/transports/tcp/src/provider/async_io.rs @@ -0,0 +1,83 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use super::{Provider, IfEvent, Incoming}; + +use async_io_crate::Async; +use futures::{ + future::{BoxFuture, FutureExt}, + prelude::*, +}; +use std::io; +use std::task::{Poll, Context}; +use std::net; + +#[derive(Copy, Clone)] +pub enum Tcp {} + +impl Provider for Tcp { + type Stream = Async; + type Listener = Async; + type IfWatcher = if_watch::IfWatcher; + + fn if_watcher() -> BoxFuture<'static, io::Result> { + if_watch::IfWatcher::new().boxed() + } + + fn new_listener(l: net::TcpListener) -> io::Result { + Async::new(l) + } + + fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result> { + async move { + let stream = Async::new(s)?; + stream.writable().await?; + Ok(stream) + }.boxed() + } + + fn poll_accept(l: &mut Self::Listener, cx: &mut Context<'_>) -> Poll>> { + let (stream, remote_addr) = loop { + match l.poll_readable(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Ready(Ok(())) => match l.accept().now_or_never() { + Some(Err(e)) => return Poll::Ready(Err(e)), + Some(Ok(res)) => break res, + None => { + // Since it doesn't do any harm, account for false positives of + // `poll_readable` just in case, i.e. try again. + } + } + } + }; + + let local_addr = stream.get_ref().local_addr()?; + + Poll::Ready(Ok(Incoming { stream, local_addr, remote_addr })) + } + + fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { + w.next().map_ok(|e| match e { + if_watch::IfEvent::Up(a) => IfEvent::Up(a), + if_watch::IfEvent::Down(a) => IfEvent::Down(a), + }).boxed().poll_unpin(cx) + } +} diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs new file mode 100644 index 00000000000..0e8136f2c60 --- /dev/null +++ b/transports/tcp/src/provider/tokio.rs @@ -0,0 +1,168 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use super::{Provider, IfEvent, Incoming}; + +use futures::{ + future::{self, BoxFuture, FutureExt}, + prelude::*, +}; +use futures_timer::Delay; +use if_addrs::{IfAddr, get_if_addrs}; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; +use std::collections::HashSet; +use std::convert::TryFrom; +use std::io; +use std::task::{Poll, Context}; +use std::time::Duration; +use std::net; +use std::pin::Pin; + +#[derive(Copy, Clone)] +pub enum Tcp {} + +pub struct IfWatcher { + addrs: HashSet, + delay: Delay, + pending: Vec, +} + +impl Provider for Tcp { + type Stream = TcpStream; + type Listener = tokio_crate::net::TcpListener; + type IfWatcher = IfWatcher; + + fn if_watcher() -> BoxFuture<'static, io::Result> { + future::ready(Ok( + IfWatcher { + addrs: HashSet::new(), + delay: Delay::new(Duration::from_secs(0)), + pending: Vec::new(), + } + )).boxed() + } + + fn new_listener(l: net::TcpListener) -> io::Result { + tokio_crate::net::TcpListener::try_from(l) + } + + fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result> { + async move { + let stream = tokio_crate::net::TcpStream::try_from(s)?; + stream.writable().await?; + Ok(TcpStream(stream)) + }.boxed() + } + + fn poll_accept(l: &mut Self::Listener, cx: &mut Context<'_>) + -> Poll>> + { + let (stream, remote_addr) = match l.poll_accept(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Ready(Ok((stream, remote_addr))) => (stream, remote_addr) + }; + + let local_addr = stream.local_addr()?; + let stream = TcpStream(stream); + + Poll::Ready(Ok(Incoming { stream, local_addr, remote_addr })) + } + + fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(event) = w.pending.pop() { + return Poll::Ready(Ok(event)) + } + + match Pin::new(&mut w.delay).poll(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(()) => { + let ifs = get_if_addrs()?; + let addrs = ifs.into_iter().map(|iface| match iface.addr { + IfAddr::V4(ip4) => { + let prefix_len = (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); + let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) + .expect("prefix_len can not exceed 32"); + IpNet::V4(ipnet) + } + IfAddr::V6(ip6) => { + let prefix_len = (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); + let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) + .expect("prefix_len can not exceed 128"); + IpNet::V6(ipnet) + } + }).collect::>(); + + for down in w.addrs.difference(&addrs) { + w.pending.push(IfEvent::Down(*down)); + } + + for up in addrs.difference(&w.addrs) { + w.pending.push(IfEvent::Up(*up)); + } + + w.addrs = addrs; + w.delay.reset(Duration::from_secs(10)); + } + } + } + } +} + +/// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. +#[derive(Debug)] +pub struct TcpStream(pub tokio_crate::net::TcpStream); + +impl Into for TcpStream { + fn into(self: TcpStream) -> tokio_crate::net::TcpStream { + self.0 + } +} + +impl AsyncRead for TcpStream { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + let mut read_buf = tokio_crate::io::ReadBuf::new(buf); + futures::ready!(tokio_crate::io::AsyncRead::poll_read(Pin::new(&mut self.0), cx, &mut read_buf))?; + Poll::Ready(Ok(read_buf.filled().len())) + } +} + +impl AsyncWrite for TcpStream { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + tokio_crate::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + tokio_crate::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[io::IoSlice<'_>] + ) -> Poll> { + tokio_crate::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) + } +} diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index 05efae630bb..ce698e22f5c 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -109,6 +109,10 @@ impl Transport for $uds_config { Err(TransportError::MultiaddrNotSupported(addr)) } } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } }; diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index 8c0e50129f0..4aeb906d06f 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -206,6 +206,10 @@ impl Transport for ExtTransport { inner: SendWrapper::new(promise.into()), }) } + + fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { + None + } } /// Future that dial a remote through an external transport. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 18a4d8c9709..dde04af187c 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -24,4 +24,4 @@ webpki = "0.21" webpki-roots = "0.21" [dev-dependencies] -libp2p-tcp = { path = "../tcp", features = ["async-std"] } +libp2p-tcp = { path = "../tcp" } diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 9f6d6efd3f3..718f7f95c5a 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -262,6 +262,10 @@ where Ok(Box::pin(future)) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } impl WsConfig @@ -586,4 +590,3 @@ where .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } } - diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 10026a4298f..0ee346fdfa8 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -113,6 +113,10 @@ where fn dial(self, addr: Multiaddr) -> Result> { self.transport.map(wrap_connection as WrapperFn).dial(addr) } + + fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.address_translation(server, observed) + } } /// Type alias corresponding to `framed::WsConfig::Listener`.