From cf5bda29e29a281fdc91a91221a7286d8e39d2cd Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Thu, 11 Aug 2022 16:29:38 +0200 Subject: [PATCH 01/21] transports/tcp: Update to `if-watch` v2.0.0 With if-watch `2.0.0` `IfWatcher::new` is not async anymore, hence the `IfWatch` wrapping logic is obsolete. --- transports/tcp/Cargo.toml | 2 +- transports/tcp/src/lib.rs | 93 +++++++++---------------- transports/tcp/src/provider.rs | 2 +- transports/tcp/src/provider/async_io.rs | 7 +- transports/tcp/src/provider/tokio.rs | 9 ++- 5 files changed, 42 insertions(+), 71 deletions(-) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 7273db58c51..9ef3940f634 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = { version = "1.1.1", optional = true } +if-watch = { version = "2.0.0", optional = true, git = "https://github.com/mxinden/if-watch.git" } if-addrs = { version = "0.7.0", optional = true } ipnet = "2.0.0" libc = "0.2.80" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 981c896bcb5..f0fe7776bed 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -43,9 +43,8 @@ pub use provider::tokio; pub type TokioTcpTransport = GenTcpTransport; use futures::{ - future::{self, BoxFuture, Ready}, + future::{self, Ready}, prelude::*, - ready, }; use futures_timer::Delay; use libp2p_core::{ @@ -605,11 +604,6 @@ pub enum TcpListenerEvent { Error(io::Error), } -enum IfWatch { - Pending(BoxFuture<'static, io::Result>), - Ready(TIfWatcher), -} - /// The listening addresses of a [`TcpListenStream`]. enum InAddr { /// The stream accepts connections on a single interface. @@ -620,7 +614,7 @@ enum InAddr { /// The stream accepts connections on all interfaces. Any { addrs: HashSet, - if_watch: IfWatch, + if_watch: TIfWatcher, }, } @@ -678,7 +672,7 @@ where // `TcpListenStream` is polled. InAddr::Any { addrs: HashSet::new(), - if_watch: IfWatch::Pending(T::if_watcher()), + if_watch: T::if_watcher()?, } } else { InAddr::One { @@ -743,63 +737,40 @@ where loop { match &mut me.in_addr { - InAddr::Any { if_watch, addrs } => match if_watch { - // If we listen on all interfaces, wait for `if-watch` to be ready. - IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) { - Ok(w) => { - *if_watch = IfWatch::Ready(w); - continue; - } - Err(err) => { - log::debug! { - "Failed to begin observing interfaces: {:?}. Scheduling retry.", - err - }; - *if_watch = IfWatch::Pending(T::if_watcher()); - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); - } - }, - // Consume all events for up/down interface changes. - IfWatch::Ready(watch) => { - while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) { - match ev { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) - { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("New listen address: {}", ma); - me.port_reuse.register(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::NewAddress(ma), - ))); - } + InAddr::Any { if_watch, addrs } => { + while let Poll::Ready(ev) = T::poll_interfaces(if_watch, cx) { + match ev { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("New listen address: {}", ma); + me.port_reuse.register(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) - { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("Expired listen address: {}", ma); - me.port_reuse.unregister(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::AddressExpired(ma), - ))); - } - } - Err(err) => { - log::debug! { - "Failure polling interfaces: {:?}. Scheduling retry.", - err - }; - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("Expired listen address: {}", ma); + me.port_reuse.unregister(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok( + TcpListenerEvent::AddressExpired(ma), + ))); } } + Err(err) => { + log::debug! { + "Failure polling interfaces: {:?}. Scheduling retry.", + err + }; + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); + } } } - }, + } // If the listener is bound to a single interface, make sure the // address is registered for port reuse and reported once. InAddr::One { addr, out } => { diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index 7ebeaa49ee8..3989d2cce66 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -59,7 +59,7 @@ pub trait Provider: Clone + Send + 'static { /// Creates an instance of [`Self::IfWatcher`] that can be polled for /// network interface changes via [`Self::poll_interfaces`]. - fn if_watcher() -> BoxFuture<'static, io::Result>; + fn if_watcher() -> io::Result; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index acbb4fbdcca..1d0f273a3bd 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -22,6 +22,7 @@ use super::{IfEvent, Incoming, Provider}; use async_io_crate::Async; use futures::future::{BoxFuture, FutureExt}; +use if_watch::IfWatcher; use std::io; use std::net; use std::task::{Context, Poll}; @@ -34,8 +35,8 @@ impl Provider for Tcp { type Listener = Async; type IfWatcher = if_watch::IfWatcher; - fn if_watcher() -> BoxFuture<'static, io::Result> { - if_watch::IfWatcher::new().boxed() + fn if_watcher() -> io::Result { + if_watch::IfWatcher::new() } fn new_listener(l: net::TcpListener) -> io::Result { @@ -89,7 +90,7 @@ impl Provider for Tcp { } fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - w.poll_unpin(cx).map_ok(|e| match e { + IfWatcher::poll_next(std::pin::Pin::new(w), cx).map_ok(|e| match e { if_watch::IfEvent::Up(a) => IfEvent::Up(a), if_watch::IfEvent::Down(a) => IfEvent::Down(a), }) diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 564eebfa48b..74dbbefa5d4 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -21,7 +21,7 @@ use super::{IfEvent, Incoming, Provider}; use futures::{ - future::{self, BoxFuture, FutureExt}, + future::{BoxFuture, FutureExt}, prelude::*, }; use futures_timer::Delay; @@ -49,13 +49,12 @@ impl Provider for Tcp { type Listener = tokio_crate::net::TcpListener; type IfWatcher = IfWatcher; - fn if_watcher() -> BoxFuture<'static, io::Result> { - future::ready(Ok(IfWatcher { + fn if_watcher() -> io::Result { + Ok(IfWatcher { addrs: HashSet::new(), delay: Delay::new(Duration::from_secs(0)), pending: Vec::new(), - })) - .boxed() + }) } fn new_listener(l: net::TcpListener) -> io::Result { From f832d084bd13c329324fcf2d628f463043269f8c Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Thu, 11 Aug 2022 16:51:27 +0200 Subject: [PATCH 02/21] transports/tcp: use `if-watcher` in all runtimes --- transports/tcp/Cargo.toml | 10 ++-- transports/tcp/src/lib.rs | 15 +++--- transports/tcp/src/provider.rs | 18 ------- transports/tcp/src/provider/async_io.rs | 15 +----- transports/tcp/src/provider/tokio.rs | 67 +------------------------ 5 files changed, 13 insertions(+), 112 deletions(-) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 9ef3940f634..ba27c8f7bb1 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -14,9 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = { version = "2.0.0", optional = true, git = "https://github.com/mxinden/if-watch.git" } -if-addrs = { version = "0.7.0", optional = true } -ipnet = "2.0.0" +if-watch = { version = "2.0.0", git = "https://github.com/mxinden/if-watch.git" } libc = "0.2.80" libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.11" @@ -25,10 +23,10 @@ tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, [features] default = ["async-io"] -tokio = ["tokio-crate", "if-addrs"] -async-io = ["async-io-crate", "if-watch"] +tokio = ["tokio-crate"] +async-io = ["async-io-crate"] [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt"] } +tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] } env_logger = "0.9.0" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index f0fe7776bed..df5da39a95a 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -28,6 +28,7 @@ mod provider; +use if_watch::{IfEvent, IfWatcher}; #[cfg(feature = "async-io")] pub use provider::async_io; @@ -63,7 +64,7 @@ use std::{ time::Duration, }; -use provider::{IfEvent, Provider}; +use provider::Provider; /// The configuration for a TCP/IP transport capability for libp2p. #[derive(Clone, Debug)] @@ -397,7 +398,6 @@ impl Transport for GenTcpTransport where T: Provider + Send + 'static, T::Listener: Unpin, - T::IfWatcher: Unpin, T::Stream: Unpin, { type Output = T::Stream; @@ -605,7 +605,7 @@ pub enum TcpListenerEvent { } /// The listening addresses of a [`TcpListenStream`]. -enum InAddr { +enum InAddr { /// The stream accepts connections on a single interface. One { addr: IpAddr, @@ -614,7 +614,7 @@ enum InAddr { /// The stream accepts connections on all interfaces. Any { addrs: HashSet, - if_watch: TIfWatcher, + if_watch: IfWatcher, }, } @@ -636,7 +636,7 @@ where /// /// If the listen socket listens on all interfaces, these may change over /// time as interfaces become available or unavailable. - in_addr: InAddr, + in_addr: InAddr, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -672,7 +672,7 @@ where // `TcpListenStream` is polled. InAddr::Any { addrs: HashSet::new(), - if_watch: T::if_watcher()?, + if_watch: IfWatcher::new()?, } } else { InAddr::One { @@ -728,7 +728,6 @@ where T: Provider, T::Listener: Unpin, T::Stream: Unpin, - T::IfWatcher: Unpin, { type Item = Result, io::Error>; @@ -738,7 +737,7 @@ where loop { match &mut me.in_addr { InAddr::Any { if_watch, addrs } => { - while let Poll::Ready(ev) = T::poll_interfaces(if_watch, cx) { + while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watch), cx) { match ev { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index 3989d2cce66..a341026e7e6 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -28,18 +28,10 @@ pub mod tokio; use futures::future::BoxFuture; use futures::io::{AsyncRead, AsyncWrite}; -use ipnet::IpNet; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::task::{Context, Poll}; use std::{fmt, io}; -/// An event relating to a change of availability of an address -/// on a network interface. -pub enum IfEvent { - Up(IpNet), - Down(IpNet), -} - /// An incoming connection returned from [`Provider::poll_accept()`]. pub struct Incoming { pub stream: S, @@ -54,12 +46,6 @@ pub trait Provider: Clone + Send + 'static { type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; /// The type of TCP listeners obtained from [`Provider::new_listener`]. type Listener: Send + Unpin; - /// The type of network interface observers obtained from [`Provider::if_watcher`]. - type IfWatcher: Send + Unpin; - - /// Creates an instance of [`Self::IfWatcher`] that can be polled for - /// network interface changes via [`Self::poll_interfaces`]. - fn if_watcher() -> io::Result; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. @@ -77,8 +63,4 @@ pub trait Provider: Clone + Send + 'static { _: &mut Self::Listener, _: &mut Context<'_>, ) -> Poll>>; - - /// Polls a [`Self::IfWatcher`] for network interface changes, ensuring a task wakeup, - /// if necessary. - fn poll_interfaces(_: &mut Self::IfWatcher, _: &mut Context<'_>) -> Poll>; } diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index 1d0f273a3bd..fc613d8fe86 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -18,11 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{IfEvent, Incoming, Provider}; +use super::{Incoming, Provider}; use async_io_crate::Async; use futures::future::{BoxFuture, FutureExt}; -use if_watch::IfWatcher; use std::io; use std::net; use std::task::{Context, Poll}; @@ -33,11 +32,6 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = Async; type Listener = Async; - type IfWatcher = if_watch::IfWatcher; - - fn if_watcher() -> io::Result { - if_watch::IfWatcher::new() - } fn new_listener(l: net::TcpListener) -> io::Result { Async::new(l) @@ -88,11 +82,4 @@ impl Provider for Tcp { remote_addr, })) } - - fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - IfWatcher::poll_next(std::pin::Pin::new(w), cx).map_ok(|e| match e { - if_watch::IfEvent::Up(a) => IfEvent::Up(a), - if_watch::IfEvent::Down(a) => IfEvent::Down(a), - }) - } } diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 74dbbefa5d4..994a12a33c7 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -18,44 +18,24 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{IfEvent, Incoming, Provider}; +use super::{Incoming, Provider}; use futures::{ future::{BoxFuture, FutureExt}, prelude::*, }; -use futures_timer::Delay; -use if_addrs::{get_if_addrs, IfAddr}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use std::collections::HashSet; use std::convert::TryFrom; use std::io; use std::net; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; #[derive(Copy, Clone)] pub enum Tcp {} -pub struct IfWatcher { - addrs: HashSet, - delay: Delay, - pending: Vec, -} - impl Provider for Tcp { type Stream = TcpStream; type Listener = tokio_crate::net::TcpListener; - type IfWatcher = IfWatcher; - - fn if_watcher() -> io::Result { - Ok(IfWatcher { - addrs: HashSet::new(), - delay: Delay::new(Duration::from_secs(0)), - pending: Vec::new(), - }) - } fn new_listener(l: net::TcpListener) -> io::Result { tokio_crate::net::TcpListener::try_from(l) @@ -103,51 +83,6 @@ impl Provider for Tcp { remote_addr, })) } - - fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - loop { - if let Some(event) = w.pending.pop() { - return Poll::Ready(Ok(event)); - } - - match Pin::new(&mut w.delay).poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(()) => { - let ifs = get_if_addrs()?; - let addrs = ifs - .into_iter() - .map(|iface| match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = - (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = - (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len can not exceed 128"); - IpNet::V6(ipnet) - } - }) - .collect::>(); - - for down in w.addrs.difference(&addrs) { - w.pending.push(IfEvent::Down(*down)); - } - - for up in addrs.difference(&w.addrs) { - w.pending.push(IfEvent::Up(*up)); - } - - w.addrs = addrs; - w.delay.reset(Duration::from_secs(10)); - } - } - } - } } /// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. From e72b2fe7fd81de256f6c5333d4187ffadb0e2659 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Thu, 11 Aug 2022 16:53:45 +0200 Subject: [PATCH 03/21] transports/tcp: fix docs for not feature async-io --- transports/tcp/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index df5da39a95a..b91b39b1811 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -243,6 +243,9 @@ impl GenTcpConfig { /// # use libp2p_core::transport::{ListenerId, TransportEvent}; /// # use libp2p_core::{Multiaddr, Transport}; /// # use std::pin::Pin; + /// # #[cfg(not(feature = "async-io"))] + /// # fn main() {} + /// # /// #[cfg(feature = "async-io")] /// #[async_std::main] /// async fn main() -> std::io::Result<()> { From 9b1dc6b2b6ea7cac7db96212d6c28110111aa70d Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Thu, 11 Aug 2022 17:10:20 +0200 Subject: [PATCH 04/21] tcp/transports: remove addr(s) from `InAddr` - In case of `InAddr::One` we can use `self.listen_addr`. - In case of `InAddr::Any` we can use `IfWatcher::iter`. --- transports/tcp/src/lib.rs | 45 ++++++++++++++++----------------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index b91b39b1811..79dfc75494b 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -610,15 +610,9 @@ pub enum TcpListenerEvent { /// The listening addresses of a [`TcpListenStream`]. enum InAddr { /// The stream accepts connections on a single interface. - One { - addr: IpAddr, - out: Option, - }, + One(Option), /// The stream accepts connections on all interfaces. - Any { - addrs: HashSet, - if_watch: IfWatcher, - }, + Any(IfWatcher), } /// A stream of incoming connections on one or more interfaces. @@ -673,15 +667,9 @@ where } { // The `addrs` are populated via `if_watch` when the // `TcpListenStream` is polled. - InAddr::Any { - addrs: HashSet::new(), - if_watch: IfWatcher::new()?, - } + InAddr::Any(IfWatcher::new()?) } else { - InAddr::One { - out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())), - addr: listen_addr.ip(), - } + InAddr::One(Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port()))) }; let listener = T::new_listener(listener)?; @@ -705,12 +693,14 @@ where /// Has no effect if port reuse is disabled. fn disable_port_reuse(&mut self) { match &self.in_addr { - InAddr::One { addr, .. } => { - self.port_reuse.unregister(*addr, self.listen_addr.port()); + InAddr::One(_) => { + self.port_reuse + .unregister(self.listen_addr.ip(), self.listen_addr.port()); } - InAddr::Any { addrs, .. } => { - for addr in addrs { - self.port_reuse.unregister(*addr, self.listen_addr.port()); + InAddr::Any(if_watcher) => { + for ip_net in if_watcher.iter() { + self.port_reuse + .unregister(ip_net.addr(), self.listen_addr.port()); } } } @@ -739,12 +729,12 @@ where loop { match &mut me.in_addr { - InAddr::Any { if_watch, addrs } => { - while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watch), cx) { + InAddr::Any(if_watcher) => { + while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watcher), cx) { match ev { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) { + if me.listen_addr.is_ipv4() == ip.is_ipv4() { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); log::debug!("New listen address: {}", ma); me.port_reuse.register(ip, me.listen_addr.port()); @@ -753,7 +743,7 @@ where } Ok(IfEvent::Down(inet)) => { let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) { + if me.listen_addr.is_ipv4() == ip.is_ipv4() { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); log::debug!("Expired listen address: {}", ma); me.port_reuse.unregister(ip, me.listen_addr.port()); @@ -775,9 +765,10 @@ where } // If the listener is bound to a single interface, make sure the // address is registered for port reuse and reported once. - InAddr::One { addr, out } => { + InAddr::One(out) => { if let Some(multiaddr) = out.take() { - me.port_reuse.register(*addr, me.listen_addr.port()); + me.port_reuse + .register(me.listen_addr.ip(), me.listen_addr.port()); return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr)))); } } From 2198640fdc062fba5ed2b70741ca76f7ee9c6f79 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Thu, 18 Aug 2022 21:00:27 +0200 Subject: [PATCH 05/21] transports/tcp: fix clippy --- transports/tcp/src/lib.rs | 133 ++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 69 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 79dfc75494b..06c1ef07b89 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -612,7 +612,7 @@ enum InAddr { /// The stream accepts connections on a single interface. One(Option), /// The stream accepts connections on all interfaces. - Any(IfWatcher), + Any(Box), } /// A stream of incoming connections on one or more interfaces. @@ -667,7 +667,7 @@ where } { // The `addrs` are populated via `if_watch` when the // `TcpListenStream` is polled. - InAddr::Any(IfWatcher::new()?) + InAddr::Any(Box::new(IfWatcher::new()?)) } else { InAddr::One(Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port()))) }; @@ -727,87 +727,82 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); - loop { - match &mut me.in_addr { - InAddr::Any(if_watcher) => { - while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watcher), cx) { - match ev { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("New listen address: {}", ma); - me.port_reuse.register(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); - } - } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("Expired listen address: {}", ma); - me.port_reuse.unregister(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::AddressExpired(ma), - ))); - } + match &mut me.in_addr { + InAddr::Any(if_watcher) => { + while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watcher), cx) { + match ev { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("New listen address: {}", ma); + me.port_reuse.register(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); } - Err(err) => { - log::debug! { - "Failure polling interfaces: {:?}. Scheduling retry.", - err - }; - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("Expired listen address: {}", ma); + me.port_reuse.unregister(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); } } + Err(err) => { + log::debug! { + "Failure polling interfaces: {:?}. Scheduling retry.", + err + }; + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); + } } } - // If the listener is bound to a single interface, make sure the - // address is registered for port reuse and reported once. - InAddr::One(out) => { - if let Some(multiaddr) = out.take() { - me.port_reuse - .register(me.listen_addr.ip(), me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr)))); - } + } + // If the listener is bound to a single interface, make sure the + // address is registered for port reuse and reported once. + InAddr::One(out) => { + if let Some(multiaddr) = out.take() { + me.port_reuse + .register(me.listen_addr.ip(), me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr)))); } } + } - if let Some(mut pause) = me.pause.take() { - match Pin::new(&mut pause).poll(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - me.pause = Some(pause); - return Poll::Pending; - } + if let Some(mut pause) = me.pause.take() { + match Pin::new(&mut pause).poll(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + me.pause = Some(pause); + return Poll::Pending; } } + } - // Take the pending connection from the backlog. - let incoming = match T::poll_accept(&mut me.listener, cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(incoming)) => incoming, - Poll::Ready(Err(e)) => { - // These errors are non-fatal for the listener stream. - log::error!("error accepting incoming connection: {}", e); - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); - } - }; + // Take the pending connection from the backlog. + let incoming = match T::poll_accept(&mut me.listener, cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(incoming)) => incoming, + Poll::Ready(Err(e)) => { + // These errors are non-fatal for the listener stream. + log::error!("error accepting incoming connection: {}", e); + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); + } + }; - let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); - let remote_addr = - ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); + let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); + let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); - log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); + log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); - return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { - upgrade: future::ok(incoming.stream), - local_addr, - remote_addr, - }))); - } + Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { + upgrade: future::ok(incoming.stream), + local_addr, + remote_addr, + }))) } } From a139f7675e9ed1accb8e404081aa9a6aa7716f0a Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Fri, 19 Aug 2022 09:37:48 +0200 Subject: [PATCH 06/21] transports/tcp: adapt to latest `if-watch` master --- transports/tcp/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 06c1ef07b89..8ce13008d86 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -729,7 +729,7 @@ where match &mut me.in_addr { InAddr::Any(if_watcher) => { - while let Poll::Ready(ev) = IfWatcher::poll_next(Pin::new(if_watcher), cx) { + while let Poll::Ready(ev) = if_watcher.as_mut().poll_if_event(cx) { match ev { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); From e59d66550b76e4cb189efbc7f2627529803b1e96 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 09:33:59 +0200 Subject: [PATCH 07/21] transports/tcp: Use `if-watch` from crates.io --- transports/tcp/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index ba27c8f7bb1..97ea0d92bdf 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = { version = "2.0.0", git = "https://github.com/mxinden/if-watch.git" } +if-watch = "2.0.0" libc = "0.2.80" libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.11" From 64a31659b087119e0b0cf0bd65ce4ace026de51e Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Thu, 11 Aug 2022 17:36:30 +0200 Subject: [PATCH 08/21] transports/tcp: remove `InAddr` --- transports/tcp/src/lib.rs | 121 +++++++++++++++----------------------- 1 file changed, 49 insertions(+), 72 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 8ce13008d86..df3cac4e7dc 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -371,7 +371,20 @@ where socket.bind(&socket_addr.into())?; socket.listen(self.config.backlog as _)?; socket.set_nonblocking(true)?; - TcpListenStream::::new(id, socket.into(), self.port_reuse.clone()) + let listener: TcpListener = socket.into(); + let local_addr = listener.local_addr()?; + let if_watcher = if local_addr.ip().is_unspecified() { + Some(IfWatcher::new()?) + } else { + self.port_reuse.register(local_addr.ip(), local_addr.port()); + let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + self.pending_events.push_back(TransportEvent::NewAddress { + listener_id: id, + listen_addr, + }); + None + }; + TcpListenStream::::new(id, listener, if_watcher, self.port_reuse.clone()) } } @@ -607,14 +620,6 @@ pub enum TcpListenerEvent { Error(io::Error), } -/// The listening addresses of a [`TcpListenStream`]. -enum InAddr { - /// The stream accepts connections on a single interface. - One(Option), - /// The stream accepts connections on all interfaces. - Any(Box), -} - /// A stream of incoming connections on one or more interfaces. pub struct TcpListenStream where @@ -628,12 +633,8 @@ where listen_addr: SocketAddr, /// The async listening socket for incoming connections. listener: T::Listener, - /// The IP addresses of network interfaces on which the listening socket - /// is accepting connections. - /// - /// If the listen socket listens on all interfaces, these may change over - /// time as interfaces become available or unavailable. - in_addr: InAddr, + + if_watcher: Option, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -657,21 +658,10 @@ where fn new( listener_id: ListenerId, listener: TcpListener, + if_watcher: Option, port_reuse: PortReuse, ) -> io::Result { let listen_addr = listener.local_addr()?; - - let in_addr = if match &listen_addr { - SocketAddr::V4(a) => a.ip().is_unspecified(), - SocketAddr::V6(a) => a.ip().is_unspecified(), - } { - // The `addrs` are populated via `if_watch` when the - // `TcpListenStream` is polled. - InAddr::Any(Box::new(IfWatcher::new()?)) - } else { - InAddr::One(Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port()))) - }; - let listener = T::new_listener(listener)?; Ok(TcpListenStream { @@ -679,7 +669,7 @@ where listener, listener_id, listen_addr, - in_addr, + if_watcher, pause: None, sleep_on_error: Duration::from_millis(100), }) @@ -692,17 +682,16 @@ where /// /// Has no effect if port reuse is disabled. fn disable_port_reuse(&mut self) { - match &self.in_addr { - InAddr::One(_) => { - self.port_reuse - .unregister(self.listen_addr.ip(), self.listen_addr.port()); - } - InAddr::Any(if_watcher) => { + match &self.if_watcher { + Some(if_watcher) => { for ip_net in if_watcher.iter() { self.port_reuse .unregister(ip_net.addr(), self.listen_addr.port()); } } + None => self + .port_reuse + .unregister(self.listen_addr.ip(), self.listen_addr.port()), } } } @@ -726,47 +715,35 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); - - match &mut me.in_addr { - InAddr::Any(if_watcher) => { - while let Poll::Ready(ev) = if_watcher.as_mut().poll_if_event(cx) { - match ev { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("New listen address: {}", ma); - me.port_reuse.register(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); - } - } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("Expired listen address: {}", ma); - me.port_reuse.unregister(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); - } + if let Some(if_watcher) = me.if_watcher.as_mut() { + while let Poll::Ready(ev) = if_watcher.poll_if_event(cx) { + match ev { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("New listen address: {}", ma); + me.port_reuse.register(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); } - Err(err) => { - log::debug! { - "Failure polling interfaces: {:?}. Scheduling retry.", - err - }; - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("Expired listen address: {}", ma); + me.port_reuse.unregister(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); } } - } - } - // If the listener is bound to a single interface, make sure the - // address is registered for port reuse and reported once. - InAddr::One(out) => { - if let Some(multiaddr) = out.take() { - me.port_reuse - .register(me.listen_addr.ip(), me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr)))); + Err(err) => { + log::debug! { + "Failure polling interfaces: {:?}. Scheduling retry.", + err + }; + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); + } } } } From 8d16ad7936ef3fb74bd58161761a4139f9550fc5 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 16:15:01 +0200 Subject: [PATCH 09/21] transports/tcp: consistent poll style --- transports/tcp/src/lib.rs | 45 ++++++++++++++++++++++----------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index df3cac4e7dc..478f8f80b37 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -64,7 +64,7 @@ use std::{ time::Duration, }; -use provider::Provider; +use provider::{Incoming, Provider}; /// The configuration for a TCP/IP transport capability for libp2p. #[derive(Clone, Debug)] @@ -716,9 +716,9 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); if let Some(if_watcher) = me.if_watcher.as_mut() { - while let Poll::Ready(ev) = if_watcher.poll_if_event(cx) { - match ev { - Ok(IfEvent::Up(inet)) => { + loop { + match if_watcher.poll_if_event(cx) { + Poll::Ready(Ok(IfEvent::Up(inet))) => { let ip = inet.addr(); if me.listen_addr.is_ipv4() == ip.is_ipv4() { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); @@ -727,7 +727,7 @@ where return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); } } - Ok(IfEvent::Down(inet)) => { + Poll::Ready(Ok(IfEvent::Down(inet))) => { let ip = inet.addr(); if me.listen_addr.is_ipv4() == ip.is_ipv4() { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); @@ -736,7 +736,7 @@ where return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); } } - Err(err) => { + Poll::Ready(Err(err)) => { log::debug! { "Failure polling interfaces: {:?}. Scheduling retry.", err @@ -744,6 +744,7 @@ where me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } + Poll::Pending => break, } } } @@ -759,27 +760,33 @@ where } // Take the pending connection from the backlog. - let incoming = match T::poll_accept(&mut me.listener, cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(incoming)) => incoming, + match T::poll_accept(&mut me.listener, cx) { + Poll::Ready(Ok(Incoming { + local_addr, + remote_addr, + stream, + })) => { + let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port()); + + log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); + + return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { + upgrade: future::ok(stream), + local_addr, + remote_addr, + }))); + } Poll::Ready(Err(e)) => { // These errors are non-fatal for the listener stream. log::error!("error accepting incoming connection: {}", e); me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); } + Poll::Pending => {} }; - let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); - let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); - - log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); - - Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { - upgrade: future::ok(incoming.stream), - local_addr, - remote_addr, - }))) + Poll::Pending } } From 603b39cb76dd0ed16119b0b8c984c1d02606348c Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 16:16:10 +0200 Subject: [PATCH 10/21] transports/tcp: poll `pause` before `IfWatcher` --- transports/tcp/src/lib.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 478f8f80b37..7ea6c78f358 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -715,6 +715,17 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); + + if let Some(mut pause) = me.pause.take() { + match Pin::new(&mut pause).poll(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + me.pause = Some(pause); + return Poll::Pending; + } + } + } + if let Some(if_watcher) = me.if_watcher.as_mut() { loop { match if_watcher.poll_if_event(cx) { @@ -749,16 +760,6 @@ where } } - if let Some(mut pause) = me.pause.take() { - match Pin::new(&mut pause).poll(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - me.pause = Some(pause); - return Poll::Pending; - } - } - } - // Take the pending connection from the backlog. match T::poll_accept(&mut me.listener, cx) { Poll::Ready(Ok(Incoming { From 9f3b14e36e77ab7d67f864225bbb94440925cc01 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 16:19:21 +0200 Subject: [PATCH 11/21] transports/tcp/tests: fix clippy --- transports/tcp/src/lib.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 7ea6c78f358..441fcbd94da 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -935,7 +935,7 @@ mod tests { #[cfg(feature = "tokio")] { let (ready_tx, ready_rx) = mpsc::channel(1); - let listener = listener::(addr.clone(), ready_tx); + let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() @@ -1004,7 +1004,7 @@ mod tests { #[cfg(feature = "tokio")] { let (ready_tx, ready_rx) = mpsc::channel(1); - let listener = listener::(addr.clone(), ready_tx); + let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() @@ -1112,7 +1112,7 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let (port_reuse_tx, port_reuse_rx) = oneshot::channel(); let listener = listener::(addr.clone(), ready_tx, port_reuse_rx); - let dialer = dialer::(addr.clone(), ready_rx, port_reuse_tx); + let dialer = dialer::(addr, ready_rx, port_reuse_tx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .build() @@ -1153,10 +1153,7 @@ mod tests { match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await { TransportEvent::NewAddress { listen_addr: addr2, .. - } => { - assert_eq!(addr1, addr2); - return; - } + } => assert_eq!(addr1, addr2), e => panic!("Unexpected transport event: {:?}", e), } } @@ -1173,7 +1170,7 @@ mod tests { #[cfg(feature = "tokio")] { - let listener = listen_twice::(addr.clone()); + let listener = listen_twice::(addr); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .build() @@ -1211,7 +1208,7 @@ mod tests { .enable_io() .build() .unwrap(); - let new_addr = rt.block_on(listen::(addr.clone())); + let new_addr = rt.block_on(listen::(addr)); assert!(!new_addr.to_string().contains("tcp/0")); } } @@ -1234,7 +1231,7 @@ mod tests { #[cfg(feature = "tokio")] { let mut tcp = TokioTcpTransport::new(GenTcpConfig::new()); - assert!(tcp.listen_on(addr.clone()).is_err()); + assert!(tcp.listen_on(addr).is_err()); } } From e2ca55d492013aabb662a76db343d75eaae80c94 Mon Sep 17 00:00:00 2001 From: Elena Frank Date: Sat, 20 Aug 2022 16:23:17 +0200 Subject: [PATCH 12/21] transports/tcp: use `poll_unpin` to poll `pause` Co-authored-by: Thomas Eizinger --- transports/tcp/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 441fcbd94da..34e90bac8ff 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -717,7 +717,7 @@ where let me = Pin::into_inner(self); if let Some(mut pause) = me.pause.take() { - match Pin::new(&mut pause).poll(cx) { + match pause.poll_unpin(cx) { Poll::Ready(_) => {} Poll::Pending => { me.pause = Some(pause); From 2e5b11d354c63c7768765367c9f4ab59a238edaf Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 16:51:14 +0200 Subject: [PATCH 13/21] transports/tcp: add changelog entry --- transports/tcp/CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index bf7a5e0daba..23c5ebcd7e8 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -2,7 +2,10 @@ - Update to `libp2p-core` `v0.35.0`. -- Update to `if-watch` `v1.1.1`. +- Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration. + Use `if_watch::IfWatcher` for all runtimes. See [PR 2813]. + +[PR 2813]: https://github.com/libp2p/rust-libp2p/pull/2813 # 0.34.0 From 582582a8d47da631d7c1c49b6138bc2045b6a708 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 17:02:19 +0200 Subject: [PATCH 14/21] transports/tcp: docs --- transports/tcp/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 34e90bac8ff..09331f7d82a 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -633,7 +633,11 @@ where listen_addr: SocketAddr, /// The async listening socket for incoming connections. listener: T::Listener, - + /// Watcher for network interface changes. + /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces + /// become or stop being available. + /// + /// `None` if the socket is only listening on a single interface. if_watcher: Option, /// The port reuse configuration for outgoing connections. /// From 856065bd46203fb918e178a6c84fef0ff9665cc8 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sat, 20 Aug 2022 17:04:20 +0200 Subject: [PATCH 15/21] transports/tcp: fmt --- transports/tcp/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 09331f7d82a..0c3fec6e380 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -634,9 +634,9 @@ where /// The async listening socket for incoming connections. listener: T::Listener, /// Watcher for network interface changes. - /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces + /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces /// become or stop being available. - /// + /// /// `None` if the socket is only listening on a single interface. if_watcher: Option, /// The port reuse configuration for outgoing connections. From e2d001fb6d3afacce423294a013221163967c36f Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Tue, 23 Aug 2022 20:34:54 +0200 Subject: [PATCH 16/21] transports/tcp: minor style change --- transports/tcp/src/lib.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 0c3fec6e380..c276de636b1 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -731,9 +731,9 @@ where } if let Some(if_watcher) = me.if_watcher.as_mut() { - loop { - match if_watcher.poll_if_event(cx) { - Poll::Ready(Ok(IfEvent::Up(inet))) => { + while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { + match event { + Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); if me.listen_addr.is_ipv4() == ip.is_ipv4() { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); @@ -742,7 +742,7 @@ where return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); } } - Poll::Ready(Ok(IfEvent::Down(inet))) => { + Ok(IfEvent::Down(inet)) => { let ip = inet.addr(); if me.listen_addr.is_ipv4() == ip.is_ipv4() { let ma = ip_to_multiaddr(ip, me.listen_addr.port()); @@ -751,7 +751,7 @@ where return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); } } - Poll::Ready(Err(err)) => { + Err(err) => { log::debug! { "Failure polling interfaces: {:?}. Scheduling retry.", err @@ -759,7 +759,6 @@ where me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } - Poll::Pending => break, } } } From 4a63d84975276fed338b0db097505e12b61aa917 Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Tue, 30 Aug 2022 09:42:35 +0200 Subject: [PATCH 17/21] transports/tcp: return early if ip is unspecified --- transports/tcp/src/lib.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index c276de636b1..96c942882c0 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -373,18 +373,23 @@ where socket.set_nonblocking(true)?; let listener: TcpListener = socket.into(); let local_addr = listener.local_addr()?; - let if_watcher = if local_addr.ip().is_unspecified() { - Some(IfWatcher::new()?) - } else { - self.port_reuse.register(local_addr.ip(), local_addr.port()); - let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); - self.pending_events.push_back(TransportEvent::NewAddress { - listener_id: id, - listen_addr, - }); - None - }; - TcpListenStream::::new(id, listener, if_watcher, self.port_reuse.clone()) + + if local_addr.ip().is_unspecified() { + return TcpListenStream::::new( + id, + listener, + Some(IfWatcher::new()?), + self.port_reuse.clone(), + ); + } + + self.port_reuse.register(local_addr.ip(), local_addr.port()); + let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + self.pending_events.push_back(TransportEvent::NewAddress { + listener_id: id, + listen_addr, + }); + TcpListenStream::::new(id, listener, None, self.port_reuse.clone()) } } From 036f56fe1268d806db44ed168dedf51166c4e43f Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Tue, 30 Aug 2022 09:54:22 +0200 Subject: [PATCH 18/21] transports/tcp: don't log error before returning --- transports/tcp/src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 96c942882c0..f7b897c0d47 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -757,10 +757,6 @@ where } } Err(err) => { - log::debug! { - "Failure polling interfaces: {:?}. Scheduling retry.", - err - }; me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } @@ -788,7 +784,6 @@ where } Poll::Ready(Err(e)) => { // These errors are non-fatal for the listener stream. - log::error!("error accepting incoming connection: {}", e); me.pause = Some(Delay::new(me.sleep_on_error)); return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); } From 99fa0cba238a5b472c4dd8a3e9e34db94bdd6784 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 9 Sep 2022 17:34:04 +0900 Subject: [PATCH 19/21] transports/tcp: Move changelog entry and bump version --- Cargo.toml | 2 +- transports/tcp/CHANGELOG.md | 12 ++++++++---- transports/tcp/Cargo.toml | 4 ++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 62b8df404f8..3d5b8f42d5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,7 +110,7 @@ smallvec = "1.6.1" libp2p-deflate = { version = "0.36.0", path = "transports/deflate", optional = true } libp2p-dns = { version = "0.36.0", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false } -libp2p-tcp = { version = "0.36.0", path = "transports/tcp", default-features = false, optional = true } +libp2p-tcp = { version = "0.37.0", path = "transports/tcp", default-features = false, optional = true } libp2p-websocket = { version = "0.38.0", path = "transports/websocket", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 5d4a083833a..cf1ef3af5ba 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.37.0 + +- Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration. + Use `if_watch::IfWatcher` for all runtimes. See [PR 2813]. + +[PR 2813]: https://github.com/libp2p/rust-libp2p/pull/2813 + # 0.36.0 - Update to `libp2p-core` `v0.36.0`. @@ -6,10 +13,7 @@ - Update to `libp2p-core` `v0.35.0`. -- Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration. - Use `if_watch::IfWatcher` for all runtimes. See [PR 2813]. - -[PR 2813]: https://github.com/libp2p/rust-libp2p/pull/2813 +- Update to `if-watch` `v1.1.1`. # 0.34.0 diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 95296e41b42..948d9507f0a 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-tcp" edition = "2021" rust-version = "1.56.1" description = "TCP/IP transport protocol for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = "2.0.0" +if-watch = "2.0.0" libc = "0.2.80" libp2p-core = { version = "0.36.0", path = "../../core", default-features = false } log = "0.4.11" From 9f802b2b4caaf951ac5831ef70d032f2404ec57f Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 9 Sep 2022 17:38:56 +0900 Subject: [PATCH 20/21] transports/tcp/CHANGELOG: Mark entry as unreleased --- transports/tcp/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index cf1ef3af5ba..ff34ae49407 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.37.0 +# 0.37.0 - [unreleased] - Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration. Use `if_watch::IfWatcher` for all runtimes. See [PR 2813]. From d5af2cab1f4bb18da5eafd0befec59afbaf403bf Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 9 Sep 2022 18:00:15 +0900 Subject: [PATCH 21/21] Cargo.toml: Bump version --- CHANGELOG.md | 4 ++++ Cargo.toml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95e41bb020c..d28138e1e7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,10 @@ # `libp2p` facade crate +# 0.49.0 - [unreleased] + +- Update to [`libp2p-tcp` `v0.37.0`](transports/tcp/CHANGELOG.md#0370). + # 0.48.0 - Update to [`libp2p-core` `v0.36.0`](core/CHANGELOG.md#0360). diff --git a/Cargo.toml b/Cargo.toml index 3d5b8f42d5f..89e63a4e5cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p" edition = "2021" rust-version = "1.60.0" description = "Peer-to-peer networking library" -version = "0.48.0" +version = "0.49.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p"