From 14978dd0c4e04e246b6bce0b32e5c4c7811997fc Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Wed, 9 Nov 2022 11:43:29 +0100 Subject: [PATCH 1/4] tcp: udpate if-watch 3.0.0, pass through features --- transports/tcp/Cargo.toml | 14 ++++----- transports/tcp/src/lib.rs | 28 +++++++++--------- transports/tcp/src/provider.rs | 10 +++++++ transports/tcp/src/provider/async_io.rs | 11 ++++++- transports/tcp/src/provider/tokio.rs | 39 +++++++++++++++---------- 5 files changed, 65 insertions(+), 37 deletions(-) diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index 502e05cdf3a..585d5a91480 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -11,26 +11,26 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-io-crate = { package = "async-io", version = "1.2.0", optional = true } +async-io = { version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = "2.0.0" +if-watch = "3.0.0" libc = "0.2.80" libp2p-core = { version = "0.38.0", path = "../../core" } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } -tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true } +tokio = { version = "1.19.0", default-features = false, features = ["net"], optional = true } [features] -tokio = ["tokio-crate"] -async-io = ["async-io-crate"] +tokio = ["dep:tokio", "if-watch/tokio"] +async-io = ["dep:async-io", "if-watch/smol"] [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["full"] } +tokio = { version = "1.0.1", default-features = false, features = ["full"] } env_logger = "0.9.0" -# Passing arguments to the docsrs builder in order to properly document cfg's. +# Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] all-features = true diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 34ea22377fb..c6464dc4086 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -41,7 +41,7 @@ use futures::{ prelude::*, }; use futures_timer::Delay; -use if_watch::{IfEvent, IfWatcher}; +use if_watch::IfEvent; use libp2p_core::{ address_translation, multiaddr::{Multiaddr, Protocol}, @@ -385,7 +385,7 @@ where return TcpListenStream::::new( id, listener, - Some(IfWatcher::new()?), + Some(T::new_if_watcher()?), self.port_reuse.clone(), ); } @@ -656,7 +656,7 @@ where /// become or stop being available. /// /// `None` if the socket is only listening on a single interface. - if_watcher: Option, + if_watcher: Option, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -680,7 +680,7 @@ where fn new( listener_id: ListenerId, listener: TcpListener, - if_watcher: Option, + if_watcher: Option, port_reuse: PortReuse, ) -> io::Result { let listen_addr = listener.local_addr()?; @@ -706,7 +706,7 @@ where fn disable_port_reuse(&mut self) { match &self.if_watcher { Some(if_watcher) => { - for ip_net in if_watcher.iter() { + for ip_net in T::addrs(if_watcher) { self.port_reuse .unregister(ip_net.addr(), self.listen_addr.port()); } @@ -749,7 +749,7 @@ where } if let Some(if_watcher) = me.if_watcher.as_mut() { - while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { + while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) { match event { Ok(IfEvent::Up(inet)) => { let ip = inet.addr(); @@ -986,11 +986,11 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1055,11 +1055,11 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1162,11 +1162,11 @@ mod tests { let (port_reuse_tx, port_reuse_rx) = oneshot::channel(); let listener = listener::(addr.clone(), ready_tx, port_reuse_rx); let dialer = dialer::(addr, ready_rx, port_reuse_tx); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); - let tasks = tokio_crate::task::LocalSet::new(); + let tasks = ::tokio::task::LocalSet::new(); let listener = tasks.spawn_local(listener); tasks.block_on(&rt, dialer); tasks.block_on(&rt, listener).unwrap(); @@ -1220,7 +1220,7 @@ mod tests { #[cfg(feature = "tokio")] { let listener = listen_twice::(addr); - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); @@ -1253,7 +1253,7 @@ mod tests { #[cfg(feature = "tokio")] { - let rt = tokio_crate::runtime::Builder::new_current_thread() + let rt = ::tokio::runtime::Builder::new_current_thread() .enable_io() .build() .unwrap(); diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index a341026e7e6..d94da7a6fc3 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -28,6 +28,8 @@ pub mod tokio; use futures::future::BoxFuture; use futures::io::{AsyncRead, AsyncWrite}; +use futures::Stream; +use if_watch::{IfEvent, IpNet}; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::task::{Context, Poll}; use std::{fmt, io}; @@ -46,6 +48,14 @@ pub trait Provider: Clone + Send + 'static { type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; /// The type of TCP listeners obtained from [`Provider::new_listener`]. type Listener: Send + Unpin; + /// The type of IfWatcher obtained from [`Provider::new_if_watcher`]. + type IfWatcher: Stream> + Send + Unpin; + + /// Create a new IfWatcher responsible for detecting IP address changes. + fn new_if_watcher() -> io::Result; + + /// An iterator over all currently discovered addresses. + fn addrs(_: &Self::IfWatcher) -> Vec; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index 0fc1102ff42..590f109d3c3 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -20,7 +20,7 @@ use super::{Incoming, Provider}; -use async_io_crate::Async; +use async_io::Async; use futures::future::{BoxFuture, FutureExt}; use std::io; use std::net; @@ -55,6 +55,15 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = Async; type Listener = Async; + type IfWatcher = if_watch::smol::IfWatcher; + + fn new_if_watcher() -> io::Result { + Self::IfWatcher::new() + } + + fn addrs(if_watcher: &Self::IfWatcher) -> Vec { + if_watcher.iter().copied().collect() + } fn new_listener(l: net::TcpListener) -> io::Result { Async::new(l) diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 48647833892..0fe8a514388 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -39,7 +39,7 @@ use std::task::{Context, Poll}; /// # use libp2p_core::Transport; /// # use futures::future; /// # use std::pin::Pin; -/// # use tokio_crate as tokio; +/// # use tokio as tokio; /// # /// # #[tokio::main] /// # async fn main() { @@ -59,17 +59,26 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = TcpStream; - type Listener = tokio_crate::net::TcpListener; + type Listener = tokio::net::TcpListener; + type IfWatcher = if_watch::tokio::IfWatcher; + + fn new_if_watcher() -> io::Result { + Self::IfWatcher::new() + } + + fn addrs(if_watcher: &Self::IfWatcher) -> Vec { + if_watcher.iter().copied().collect() + } fn new_listener(l: net::TcpListener) -> io::Result { - tokio_crate::net::TcpListener::try_from(l) + tokio::net::TcpListener::try_from(l) } fn new_stream(s: net::TcpStream) -> BoxFuture<'static, io::Result> { async move { - // Taken from [`tokio_crate::net::TcpStream::connect_mio`]. + // Taken from [`tokio::net::TcpStream::connect_mio`]. - let stream = tokio_crate::net::TcpStream::try_from(s)?; + let stream = tokio::net::TcpStream::try_from(s)?; // Once we've connected, wait for the stream to be writable as // that's when the actual connection has been initiated. Once we're @@ -109,12 +118,12 @@ impl Provider for Tcp { } } -/// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. +/// A [`tokio::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`]. #[derive(Debug)] -pub struct TcpStream(pub tokio_crate::net::TcpStream); +pub struct TcpStream(pub tokio::net::TcpStream); -impl From for tokio_crate::net::TcpStream { - fn from(t: TcpStream) -> tokio_crate::net::TcpStream { +impl From for tokio::net::TcpStream { + fn from(t: TcpStream) -> tokio::net::TcpStream { t.0 } } @@ -125,8 +134,8 @@ impl AsyncRead for TcpStream { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let mut read_buf = tokio_crate::io::ReadBuf::new(buf); - futures::ready!(tokio_crate::io::AsyncRead::poll_read( + let mut read_buf = tokio::io::ReadBuf::new(buf); + futures::ready!(tokio::io::AsyncRead::poll_read( Pin::new(&mut self.0), cx, &mut read_buf @@ -141,15 +150,15 @@ impl AsyncWrite for TcpStream { cx: &mut Context, buf: &[u8], ) -> Poll> { - tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) + tokio::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio_crate::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) + tokio::io::AsyncWrite::poll_flush(Pin::new(&mut self.0), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - tokio_crate::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) + tokio::io::AsyncWrite::poll_shutdown(Pin::new(&mut self.0), cx) } fn poll_write_vectored( @@ -157,6 +166,6 @@ impl AsyncWrite for TcpStream { cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll> { - tokio_crate::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) + tokio::io::AsyncWrite::poll_write_vectored(Pin::new(&mut self.0), cx, bufs) } } From 1aa13fa31b2c8662e2ab5f08c41c8b0449aa052b Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Wed, 9 Nov 2022 11:49:14 +0100 Subject: [PATCH 2/4] tcp: add CHANGELOG entry --- transports/tcp/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 4a9ed144bda..e0f03577a8d 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,11 +1,14 @@ # 0.38.0 [unreleased] +- Update `if-watch` to version 3.0.0 and pass through `tokio` and `async-io` features. See [PR 3101]. + - Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961]. - Remove `TcpListenStream` and `TcpListenerEvent` from public API. See [PR 2961]. - Update to `libp2p-core` `v0.38.0`. +[PR 3101]: https://github.com/libp2p/rust-libp2p/pull/3101 [PR 2961]: https://github.com/libp2p/rust-libp2p/pull/2961 # 0.37.0 From a1d93917e36b14f5e25e2f88d4928183d8fe9946 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Wed, 9 Nov 2022 16:10:41 +0100 Subject: [PATCH 3/4] Update transports/tcp/CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Oliveira --- transports/tcp/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index e0f03577a8d..58fb99808f7 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,6 +1,6 @@ # 0.38.0 [unreleased] -- Update `if-watch` to version 3.0.0 and pass through `tokio` and `async-io` features. See [PR 3101]. +- Update to `if-watch` `v3.0.0` and pass through `tokio` and `async-io` features. See [PR 3101]. - Deprecate types with `Tcp` prefix (`GenTcpConfig`, `TcpTransport` and `TokioTcpTransport`) in favor of referencing them by module / crate. See [PR 2961]. From ab1a9ae2e45437cadeb0addaaa6f427b36aa3b78 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Wed, 9 Nov 2022 16:12:57 +0100 Subject: [PATCH 4/4] tcp: remove unused import in doc sample --- transports/tcp/src/provider/tokio.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 0fe8a514388..e4b75c8d814 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -39,7 +39,6 @@ use std::task::{Context, Poll}; /// # use libp2p_core::Transport; /// # use futures::future; /// # use std::pin::Pin; -/// # use tokio as tokio; /// # /// # #[tokio::main] /// # async fn main() {