diff --git a/CHANGELOG.md b/CHANGELOG.md index 95e41bb020c..2aa74dae1df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,16 @@ # `libp2p` facade crate +# 0.49.0 - [unreleased] + +- Update to [`libp2p-tcp` `v0.37.0`](transports/tcp/CHANGELOG.md#0370). + +- Update to [`libp2p-swarm-derive` `v0.30.1`](swarm-derive/CHANGELOG.md#0301). + +- Update to [`libp2p-metrics` `v0.10.0`](misc/metrics/CHANGELOG.md#0100). + +- Update to [`libp2p-kad` `v0.41.0`](protocols/kad/CHANGELOG.md#0410). + # 0.48.0 - Update to [`libp2p-core` `v0.36.0`](core/CHANGELOG.md#0360). diff --git a/Cargo.toml b/Cargo.toml index 8abdc7203e0..c4184ea2e7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p" edition = "2021" rust-version = "1.60.0" description = "Peer-to-peer networking library" -version = "0.48.0" +version = "0.49.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -87,10 +87,10 @@ libp2p-core = { version = "0.36.0", path = "core", default-features = false } libp2p-dcutr = { version = "0.6.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.39.0", path = "protocols/floodsub", optional = true } libp2p-identify = { version = "0.39.0", path = "protocols/identify", optional = true } -libp2p-kad = { version = "0.40.0", path = "protocols/kad", optional = true } -libp2p-metrics = { version = "0.9.0", path = "misc/metrics", optional = true } +libp2p-kad = { version = "0.41.0", path = "protocols/kad", optional = true } +libp2p-metrics = { version = "0.10.0", path = "misc/metrics", optional = true } libp2p-mplex = { version = "0.36.0", path = "muxers/mplex", optional = true } -libp2p-noise = { version = "0.39.0", path = "transports/noise", optional = true } +libp2p-noise = { version = "0.39.1", path = "transports/noise", optional = true } libp2p-ping = { version = "0.39.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.36.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } @@ -98,7 +98,7 @@ libp2p-relay = { version = "0.12.0", path = "protocols/relay", optional = true } libp2p-rendezvous = { version = "0.9.0", path = "protocols/rendezvous", optional = true } libp2p-request-response = { version = "0.21.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.39.0", path = "swarm" } -libp2p-swarm-derive = { version = "0.30.0", path = "swarm-derive" } +libp2p-swarm-derive = { version = "0.30.1", path = "swarm-derive" } libp2p-uds = { version = "0.35.0", path = "transports/uds", optional = true } libp2p-wasm-ext = { version = "0.36.0", path = "transports/wasm-ext", default-features = false, optional = true } libp2p-yamux = { version = "0.40.0", path = "muxers/yamux", optional = true } @@ -113,7 +113,7 @@ libp2p-deflate = { version = "0.36.0", path = "transports/deflate", optional = t libp2p-dns = { version = "0.36.0", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.40.0", path = "protocols/mdns", optional = true, default-features = false } libp2p-quic = { version = "0.7.0", path = "transports/quic", optional = true } -libp2p-tcp = { version = "0.36.0", path = "transports/tcp", default-features = false, optional = true } +libp2p-tcp = { version = "0.37.0", path = "transports/tcp", default-features = false, optional = true } libp2p-websocket = { version = "0.38.0", path = "transports/websocket", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] diff --git a/core/Cargo.toml b/core/Cargo.toml index 3e5285ef6f4..0970b3e74d6 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -45,7 +45,7 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } base64 = "0.13.0" -criterion = "0.3" +criterion = "0.4" libp2p-mplex = { path = "../muxers/mplex" } libp2p-noise = { path = "../transports/noise" } libp2p-tcp = { path = "../transports/tcp" } diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index 723a04b0780..dac84534369 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -79,12 +79,9 @@ where fn upgrade_pipeline() { let listener_keys = identity::Keypair::generate_ed25519(); let listener_id = listener_keys.public().to_peer_id(); - let listener_noise_keys = noise::Keypair::::new() - .into_authentic(&listener_keys) - .unwrap(); let mut listener_transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(listener_noise_keys).into_authenticated()) + .authenticate(noise::NoiseAuthenticated::xx(&listener_keys).unwrap()) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) @@ -93,12 +90,9 @@ fn upgrade_pipeline() { let dialer_keys = identity::Keypair::generate_ed25519(); let dialer_id = dialer_keys.public().to_peer_id(); - let dialer_noise_keys = noise::Keypair::::new() - .into_authentic(&dialer_keys) - .unwrap(); let mut dialer_transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(dialer_noise_keys).into_authenticated()) + .authenticate(noise::NoiseAuthenticated::xx(&dialer_keys).unwrap()) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs index f82d30934c9..5ee00f9eedc 100644 --- a/examples/chat-tokio.rs +++ b/examples/chat-tokio.rs @@ -70,16 +70,14 @@ async fn main() -> Result<(), Box> { let peer_id = PeerId::from(id_keys.public()); println!("Local peer id: {:?}", peer_id); - // Create a keypair for authenticated encryption of the transport. - let noise_keys = noise::Keypair::::new() - .into_authentic(&id_keys) - .expect("Signing libp2p-noise static DH keypair failed."); - // Create a tokio-based TCP transport use noise for authenticated // encryption and Mplex for multiplexing of substreams on a TCP stream. let transport = TokioTcpTransport::new(GenTcpConfig::default().nodelay(true)) .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate( + noise::NoiseAuthenticated::xx(&id_keys) + .expect("Signing libp2p-noise static DH keypair failed."), + ) .multiplex(mplex::MplexConfig::new()) .boxed(); diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index 93e73cd5976..c0596816919 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -57,10 +57,7 @@ pub fn build_transport( key_pair: identity::Keypair, psk: Option, ) -> transport::Boxed<(PeerId, StreamMuxerBox)> { - let noise_keys = noise::Keypair::::new() - .into_authentic(&key_pair) - .unwrap(); - let noise_config = noise::NoiseConfig::xx(noise_keys).into_authenticated(); + let noise_config = noise::NoiseAuthenticated::xx(&key_pair).unwrap(); let yamux_config = YamuxConfig::default(); let base_transport = TcpTransport::new(GenTcpConfig::default().nodelay(true)); diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 1ce05054600..1b784ffa1f9 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.10.0 [unreleased] + +- Update to `libp2p-kad` `v0.41.0`. + # 0.9.0 - Update to `libp2p-swarm` `v0.39.0`. diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index 30df869248c..1c29fceec24 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-metrics" edition = "2021" rust-version = "1.56.1" description = "Metrics for libp2p" -version = "0.9.0" +version = "0.10.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -22,7 +22,7 @@ dcutr = ["libp2p-dcutr"] libp2p-core = { version = "0.36.0", path = "../../core", default-features = false } libp2p-dcutr = { version = "0.6.0", path = "../../protocols/dcutr", optional = true } libp2p-identify = { version = "0.39.0", path = "../../protocols/identify", optional = true } -libp2p-kad = { version = "0.40.0", path = "../../protocols/kad", optional = true } +libp2p-kad = { version = "0.41.0", path = "../../protocols/kad", optional = true } libp2p-ping = { version = "0.39.0", path = "../../protocols/ping", optional = true } libp2p-relay = { version = "0.12.0", path = "../../protocols/relay", optional = true } libp2p-swarm = { version = "0.39.0", path = "../../swarm" } @@ -33,7 +33,7 @@ libp2p-gossipsub = { version = "0.41.0", path = "../../protocols/gossipsub", op [dev-dependencies] log = "0.4.0" -env_logger = "0.8.1" +env_logger = "0.9.0" futures = "0.3.1" libp2p = { path = "../../", default-features = false, features = ["metrics", "ping", "tcp-async-io", "dns-async-std", "websocket", "noise", "mplex", "yamux"] } hyper = { version="0.14", features = ["server", "tcp", "http1"] } diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 1e4f6974aa3..d4ea5342262 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -24,7 +24,7 @@ unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } [dev-dependencies] async-std = "1.7.0" -criterion = "0.3" +criterion = "0.4" env_logger = "0.9" futures = "0.3" libp2p-tcp = { path = "../../transports/tcp" } diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index bc015ab0d80..dd059e41422 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -29,7 +29,7 @@ void = "1" prost-build = "0.11" [dev-dependencies] -env_logger = "0.8.3" +env_logger = "0.9.0" libp2p = { path = "../..", default-features = false, features = ["dcutr", "relay", "plaintext", "identify", "tcp-async-io", "ping", "noise", "dns-async-std"] } libp2p-identify = { path = "../identify" } libp2p-plaintext = { path = "../../transports/plaintext" } diff --git a/protocols/dcutr/examples/client.rs b/protocols/dcutr/examples/client.rs index dd73b7d3ac3..54448ff635d 100644 --- a/protocols/dcutr/examples/client.rs +++ b/protocols/dcutr/examples/client.rs @@ -89,10 +89,6 @@ fn main() -> Result<(), Box> { let (relay_transport, client) = Client::new_transport_and_behaviour(local_peer_id); - let noise_keys = noise::Keypair::::new() - .into_authentic(&local_key) - .expect("Signing libp2p-noise static DH keypair failed."); - let transport = OrTransport::new( relay_transport, block_on(DnsConfig::system(TcpTransport::new( @@ -101,7 +97,10 @@ fn main() -> Result<(), Box> { .unwrap(), ) .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate( + noise::NoiseAuthenticated::xx(&local_key) + .expect("Signing libp2p-noise static DH keypair failed."), + ) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 954c68ae24b..21dd77562df 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -649,7 +649,7 @@ where set.iter() .filter(|p| { self.explicit_peers.contains(*p) - || !self.score_below_threshold(*p, |ts| ts.publish_threshold).0 + || !self.score_below_threshold(p, |ts| ts.publish_threshold).0 }) .cloned(), ); @@ -946,14 +946,11 @@ where ); // remove explicit peers, peers with negative scores, and backoffed peers - peers = peers - .into_iter() - .filter(|p| { - !self.explicit_peers.contains(p) - && !self.score_below_threshold(p, |_| 0.0).0 - && !self.backoffs.is_backoff_with_slack(topic_hash, p) - }) - .collect(); + peers.retain(|p| { + !self.explicit_peers.contains(p) + && !self.score_below_threshold(p, |_| 0.0).0 + && !self.backoffs.is_backoff_with_slack(topic_hash, p) + }); // Add up to mesh_n of them them to the mesh // NOTE: These aren't randomly added, currently FIFO @@ -1625,7 +1622,7 @@ where // //TODO: Once signed records are spec'd: Can we use peerInfo without any IDs if they have a // signed peer record? - px = px.into_iter().filter(|p| p.peer_id.is_some()).collect(); + px.retain(|p| p.peer_id.is_some()); if px.len() > n { // only use at most prune_peers many random peers let mut rng = thread_rng(); @@ -3204,7 +3201,7 @@ where debug!("Peer disconnected: {}", peer_id); { let topics = match self.peer_topics.get(peer_id) { - Some(topics) => (topics), + Some(topics) => topics, None => { debug_assert!( self.blacklisted_peers.contains(peer_id), diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index d86263aace4..4022a23185d 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -97,10 +97,9 @@ //! //! // Set up an encrypted TCP Transport over the Mplex //! // This is test transport (memory). -//! let noise_keys = libp2p_noise::Keypair::::new().into_authentic(&local_key).unwrap(); //! let transport = MemoryTransport::default() //! .upgrade(libp2p_core::upgrade::Version::V1) -//! .authenticate(libp2p_noise::NoiseConfig::xx(noise_keys).into_authenticated()) +//! .authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap()) //! .multiplex(libp2p_mplex::MplexConfig::new()) //! .boxed(); //! diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d161d93de83..2f41a093019 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.41.0 [unreleased] + +- Remove deprecated `set_protocol_name()` from `KademliaConfig` & `KademliaProtocolConfig`. + Use `set_protocol_names()` instead. See [PR 2866]. + +[PR 2866]: https://github.com/libp2p/rust-libp2p/pull/2866 + # 0.40.0 - Add support for multiple protocol names. Update `Kademlia`, `KademliaConfig`, diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 9aec22c609a..c65e34ecdbc 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = "1.56.1" description = "Kademlia protocol for libp2p" -version = "0.40.0" +version = "0.41.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index feba94f550e..b267f87d386 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -228,16 +228,6 @@ impl KademliaConfig { self } - /// Sets a custom protocol name. - /// - /// Kademlia nodes only communicate with other nodes using the same protocol - /// name. Using a custom name therefore allows to segregate the DHT from - /// others, if that is desired. - #[deprecated(since = "0.40.0", note = "use `set_protocol_names()` instead")] - pub fn set_protocol_name(&mut self, name: impl Into>) -> &mut Self { - self.set_protocol_names(std::iter::once(name.into()).collect()) - } - /// Sets the timeout for a single query. /// /// > **Note**: A single query usually comprises at least as many requests diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 1f67be5a19d..aab7fa0ef28 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -56,12 +56,9 @@ fn build_node() -> (Multiaddr, TestSwarm) { fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) { let local_key = identity::Keypair::generate_ed25519(); let local_public_key = local_key.public(); - let noise_keys = noise::Keypair::::new() - .into_authentic(&local_key) - .unwrap(); let transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap()) .multiplex(yamux::YamuxConfig::default()) .boxed(); diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index bcadb57f44c..5be9ae17737 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -658,14 +658,7 @@ where let pos = self .inbound_substreams .iter() - .position(|state| match state { - InboundSubstreamState::WaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + .position(|state| matches!(state, InboundSubstreamState::WaitingUser(ref conn_id, _) if conn_id == &request_id.connec_unique_id)); if let Some(pos) = pos { let (conn_id, substream) = match self.inbound_substreams.remove(pos) { @@ -737,14 +730,7 @@ where let pos = self .inbound_substreams .iter() - .position(|state| match state { - InboundSubstreamState::WaitingUser(ref conn_id, _) - if conn_id == &request_id.connec_unique_id => - { - true - } - _ => false, - }); + .position(|state| matches!(state, InboundSubstreamState::WaitingUser(ref conn_id, _) if conn_id == &request_id.connec_unique_id)); if let Some(pos) = pos { let (conn_id, substream) = match self.inbound_substreams.remove(pos) { diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 3c00a5059f2..707edd8fe02 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -159,13 +159,6 @@ impl KademliaProtocolConfig { self.protocol_names = names; } - /// Sets single protocol name used on the wire. Can be used to create incompatibilities - /// between networks on purpose. - #[deprecated(since = "0.40.0", note = "use `set_protocol_names()` instead")] - pub fn set_protocol_name(&mut self, name: impl Into>) { - self.set_protocol_names(std::iter::once(name.into()).collect()); - } - /// Modifies the maximum allowed size of a single Kademlia packet. pub fn set_max_packet_size(&mut self, size: usize) { self.max_packet_size = size; diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index c5bacced138..b2d0506b226 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -85,7 +85,7 @@ where socket.bind(&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 5353).into())?; socket.set_multicast_loop_v4(true)?; socket.set_multicast_ttl_v4(255)?; - socket.join_multicast_v4(&*crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; + socket.join_multicast_v4(&crate::IPV4_MDNS_MULTICAST_ADDRESS, &addr)?; U::from_std(UdpSocket::from(socket))? } IpAddr::V6(_) => { @@ -96,7 +96,7 @@ where socket.bind(&SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 5353).into())?; socket.set_multicast_loop_v6(true)?; // TODO: find interface matching addr. - socket.join_multicast_v6(&*crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; + socket.join_multicast_v6(&crate::IPV6_MDNS_MULTICAST_ADDRESS, 0)?; U::from_std(UdpSocket::from(socket))? } }; diff --git a/protocols/mdns/src/behaviour/iface/dns.rs b/protocols/mdns/src/behaviour/iface/dns.rs index 5bb190f4c2c..4590e1e266e 100644 --- a/protocols/mdns/src/behaviour/iface/dns.rs +++ b/protocols/mdns/src/behaviour/iface/dns.rs @@ -246,7 +246,7 @@ fn query_response_packet(id: u16, peer_id: &[u8], records: &[Vec], ttl: u32) fn duration_to_secs(duration: Duration) -> u32 { let secs = duration .as_secs() - .saturating_add(if duration.subsec_nanos() > 0 { 1 } else { 0 }); + .saturating_add(u64::from(duration.subsec_nanos() > 0)); cmp::min(secs, From::from(u32::max_value())) as u32 } diff --git a/protocols/ping/tests/ping.rs b/protocols/ping/tests/ping.rs index ac45949ced7..2f75c09fb3d 100644 --- a/protocols/ping/tests/ping.rs +++ b/protocols/ping/tests/ping.rs @@ -243,14 +243,11 @@ fn unsupported_doesnt_fail() { fn mk_transport(muxer: MuxerChoice) -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().to_peer_id(); - let noise_keys = noise::Keypair::::new() - .into_authentic(&id_keys) - .unwrap(); ( peer_id, TcpTransport::new(GenTcpConfig::default().nodelay(true)) .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate(noise::NoiseAuthenticated::xx(&id_keys).unwrap()) .multiplex(match muxer { MuxerChoice::Yamux => upgrade::EitherUpgrade::A(yamux::YamuxConfig::default()), MuxerChoice::Mplex => upgrade::EitherUpgrade::B(mplex::MplexConfig::default()), diff --git a/protocols/relay/examples/relay_v2.rs b/protocols/relay/examples/relay_v2.rs index 25d0bb7fc94..b89c88b2829 100644 --- a/protocols/relay/examples/relay_v2.rs +++ b/protocols/relay/examples/relay_v2.rs @@ -48,13 +48,12 @@ fn main() -> Result<(), Box> { let tcp_transport = TcpTransport::default(); - let noise_keys = noise::Keypair::::new() - .into_authentic(&local_key) - .expect("Signing libp2p-noise static DH keypair failed."); - let transport = tcp_transport .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate( + noise::NoiseAuthenticated::xx(&local_key) + .expect("Signing libp2p-noise static DH keypair failed."), + ) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(); diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index fadcaf59580..3eee5c833e6 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -28,7 +28,7 @@ instant = "0.1.11" [dev-dependencies] async-trait = "0.1" -env_logger = "0.8" +env_logger = "0.9.0" libp2p = { path = "../..", default-features = false, features = ["ping", "identify", "tcp-async-io", "dns-async-std", "websocket", "noise", "mplex", "yamux", "rendezvous"] } rand = "0.8" tokio = { version = "1.15", features = [ "rt-multi-thread", "time", "macros", "sync", "process", "fs", "net" ] } diff --git a/protocols/rendezvous/tests/harness.rs b/protocols/rendezvous/tests/harness.rs index 555a5476bab..30dace245ff 100644 --- a/protocols/rendezvous/tests/harness.rs +++ b/protocols/rendezvous/tests/harness.rs @@ -27,7 +27,7 @@ use libp2p::core::transport::MemoryTransport; use libp2p::core::upgrade::SelectUpgrade; use libp2p::core::{identity, Multiaddr, PeerId, Transport}; use libp2p::mplex::MplexConfig; -use libp2p::noise::{Keypair, NoiseConfig, X25519Spec}; +use libp2p::noise::NoiseAuthenticated; use libp2p::swarm::{AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent}; use libp2p::yamux::YamuxConfig; use std::fmt::Debug; @@ -43,14 +43,9 @@ where let identity = identity::Keypair::generate_ed25519(); let peer_id = PeerId::from(identity.public()); - let dh_keys = Keypair::::new() - .into_authentic(&identity) - .expect("failed to create dh_keys"); - let noise = NoiseConfig::xx(dh_keys).into_authenticated(); - let transport = MemoryTransport::default() .upgrade(Version::V1) - .authenticate(noise) + .authenticate(NoiseAuthenticated::xx(&identity).unwrap()) .multiplex(SelectUpgrade::new( YamuxConfig::default(), MplexConfig::new(), diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 8cbc06e7444..bfb8641c106 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -29,7 +29,7 @@ use libp2p_core::{ upgrade::{self, read_length_prefixed, write_length_prefixed}, Multiaddr, PeerId, }; -use libp2p_noise::{Keypair, NoiseConfig, X25519Spec}; +use libp2p_noise::NoiseAuthenticated; use libp2p_request_response::*; use libp2p_swarm::{Swarm, SwarmEvent}; use libp2p_tcp::{GenTcpConfig, TcpTransport}; @@ -295,14 +295,12 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { let id_keys = identity::Keypair::generate_ed25519(); let peer_id = id_keys.public().to_peer_id(); - let noise_keys = Keypair::::new() - .into_authentic(&id_keys) - .unwrap(); + ( peer_id, TcpTransport::new(GenTcpConfig::default().nodelay(true)) .upgrade(upgrade::Version::V1) - .authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate(NoiseAuthenticated::xx(&id_keys).unwrap()) .multiplex(libp2p_yamux::YamuxConfig::default()) .boxed(), ) diff --git a/src/lib.rs b/src/lib.rs index b374b79f86a..29b32b8abf6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -222,13 +222,9 @@ pub async fn development_transport( dns_tcp.or_transport(ws_dns_tcp) }; - let noise_keys = noise::Keypair::::new() - .into_authentic(&keypair) - .expect("Signing libp2p-noise static DH keypair failed."); - Ok(transport .upgrade(core::upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap()) .multiplex(core::upgrade::SelectUpgrade::new( yamux::YamuxConfig::default(), mplex::MplexConfig::default(), @@ -282,13 +278,9 @@ pub fn tokio_development_transport( dns_tcp.or_transport(ws_dns_tcp) }; - let noise_keys = noise::Keypair::::new() - .into_authentic(&keypair) - .expect("Signing libp2p-noise static DH keypair failed."); - Ok(transport .upgrade(core::upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) + .authenticate(noise::NoiseAuthenticated::xx(&keypair).unwrap()) .multiplex(core::upgrade::SelectUpgrade::new( yamux::YamuxConfig::default(), mplex::MplexConfig::default(), diff --git a/swarm-derive/CHANGELOG.md b/swarm-derive/CHANGELOG.md index 79ddf8c8f2e..464bec7fe95 100644 --- a/swarm-derive/CHANGELOG.md +++ b/swarm-derive/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.30.1 [unreleased] + +- Fix an issue where the derive would generate bad code if the type parameters between the behaviour and a custom + out event differed. See [PR 2907]. + +[PR 2907]: https://github.com/libp2p/rust-libp2p/pull/2907 + # 0.30.0 - Remove support for removed `NetworkBehaviourEventProcess`. See [PR 2840]. diff --git a/swarm-derive/Cargo.toml b/swarm-derive/Cargo.toml index 89ee54447fc..1d2c54a9da1 100644 --- a/swarm-derive/Cargo.toml +++ b/swarm-derive/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm-derive" edition = "2021" rust-version = "1.56.1" description = "Procedural macros of libp2p-core" -version = "0.30.0" +version = "0.30.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -22,3 +22,4 @@ syn = { version = "1.0.8", default-features = false, features = ["clone-impls", libp2p = { path = "../", default-features = false, features = ["ping", "identify", "kad"] } either = "1.6.0" futures = "0.3.1" +void = "1" diff --git a/swarm-derive/src/lib.rs b/swarm-derive/src/lib.rs index 3e3bf5b8067..6899ba7d79d 100644 --- a/swarm-derive/src/lib.rs +++ b/swarm-derive/src/lib.rs @@ -100,7 +100,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { .iter() .map(|field| { let ty = &field.ty; - quote! {#name #ty_generics: From< <#ty as #trait_to_impl>::OutEvent >} + quote! {#name: From< <#ty as #trait_to_impl>::OutEvent >} }) .collect::>(); (name, definition, from_clauses) @@ -537,6 +537,12 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { }) }); + let out_event_reference = if out_event_definition.is_some() { + quote! { #out_event_name #ty_generics } + } else { + quote! { #out_event_name } + }; + // Now the magic happens. let final_quote = quote! { #out_event_definition @@ -545,7 +551,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { #where_clause { type ConnectionHandler = #connection_handler_ty; - type OutEvent = #out_event_name #ty_generics; + type OutEvent = #out_event_reference; fn new_handler(&mut self) -> Self::ConnectionHandler { use #into_connection_handler; diff --git a/swarm-derive/tests/test.rs b/swarm-derive/tests/test.rs index dcddb3a6a3f..e0f77eefd30 100644 --- a/swarm-derive/tests/test.rs +++ b/swarm-derive/tests/test.rs @@ -350,3 +350,65 @@ fn generated_out_event_derive_debug() { require_debug::(); } + +#[test] +fn custom_out_event_no_type_parameters() { + use libp2p::core::connection::ConnectionId; + use libp2p::swarm::handler::DummyConnectionHandler; + use libp2p::swarm::{ + ConnectionHandler, IntoConnectionHandler, NetworkBehaviourAction, PollParameters, + }; + use libp2p::PeerId; + use std::task::Context; + use std::task::Poll; + + pub struct TemplatedBehaviour { + _data: T, + } + + impl NetworkBehaviour for TemplatedBehaviour { + type ConnectionHandler = DummyConnectionHandler; + type OutEvent = void::Void; + + fn new_handler(&mut self) -> Self::ConnectionHandler { + DummyConnectionHandler::default() + } + + fn inject_event( + &mut self, + _peer: PeerId, + _connection: ConnectionId, + message: <::Handler as ConnectionHandler>::OutEvent, + ) { + void::unreachable(message); + } + + fn poll( + &mut self, + _ctx: &mut Context, + _: &mut impl PollParameters, + ) -> Poll> { + Poll::Pending + } + } + + #[derive(NetworkBehaviour)] + #[behaviour(out_event = "OutEvent")] + struct Behaviour { + custom: TemplatedBehaviour, + } + + #[derive(Debug)] + enum OutEvent { + None, + } + + impl From for OutEvent { + fn from(_e: void::Void) -> Self { + Self::None + } + } + + require_net_behaviour::>(); + require_net_behaviour::>(); +} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 230de4631db..7faaa43b44d 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1624,7 +1624,6 @@ mod tests { use libp2p_core::transport::TransportEvent; use libp2p_core::Endpoint; use quickcheck::{quickcheck, Arbitrary, Gen, QuickCheck}; - use rand::prelude::SliceRandom; use rand::Rng; // Test execution state. @@ -2425,60 +2424,51 @@ mod tests { assert!(!swarm.is_connected(&peer_id)); } - #[test] - fn multiple_addresses_err() { + #[async_std::test] + async fn multiple_addresses_err() { // Tries dialing multiple addresses, and makes sure there's one dialing error per address. let target = PeerId::random(); let mut swarm = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build(); - let mut addresses = Vec::new(); - for _ in 0..3 { - addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())]); - } - for _ in 0..5 { - addresses.push(multiaddr![Udp(rand::random::())]); - } - addresses.shuffle(&mut rand::thread_rng()); + let addresses = HashSet::from([ + multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())], + multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())], + multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())], + multiaddr![Udp(rand::random::())], + multiaddr![Udp(rand::random::())], + multiaddr![Udp(rand::random::())], + multiaddr![Udp(rand::random::())], + multiaddr![Udp(rand::random::())], + ]); swarm .dial( DialOpts::peer_id(target) - .addresses(addresses.clone()) + .addresses(addresses.iter().cloned().collect()) .build(), ) .unwrap(); - futures::executor::block_on(future::poll_fn(|cx| -> Poll> { - loop { - match swarm.poll_next_unpin(cx) { - Poll::Ready(Some(SwarmEvent::OutgoingConnectionError { - peer_id, - // multiaddr, - error: DialError::Transport(errors), - })) => { - assert_eq!(peer_id.unwrap(), target); + match swarm.next().await.unwrap() { + SwarmEvent::OutgoingConnectionError { + peer_id, + // multiaddr, + error: DialError::Transport(errors), + } => { + assert_eq!(target, peer_id.unwrap()); - let failed_addresses = - errors.into_iter().map(|(addr, _)| addr).collect::>(); - assert_eq!( - failed_addresses, - addresses - .clone() - .into_iter() - .map(|addr| addr.with(Protocol::P2p(target.into()))) - .collect::>() - ); + let failed_addresses = errors.into_iter().map(|(addr, _)| addr).collect::>(); + let expected_addresses = addresses + .into_iter() + .map(|addr| addr.with(Protocol::P2p(target.into()))) + .collect::>(); - return Poll::Ready(Ok(())); - } - Poll::Ready(_) => unreachable!(), - Poll::Pending => break Poll::Pending, - } + assert_eq!(expected_addresses, failed_addresses); } - })) - .unwrap(); + e => panic!("Unexpected event: {e:?}"), + } } #[test] diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index de2c1034a9e..1416aab4e30 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.39.1 [unreleased] + +- Introduce `NoiseAuthenticated::xx` constructor, assuming a X25519 DH key exchange. An XX key exchange and X25519 keys + are the most common way of using noise in libp2p and thus deserve a convenience constructor. See [PR 2887]. + +[PR 2887]: https://github.com/libp2p/rust-libp2p/pull/2887 + # 0.39.0 - Update to `libp2p-core` `v0.36.0`. diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 5ee9330818d..8fef520cb9a 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-noise" edition = "2021" rust-version = "1.56.1" description = "Cryptographic handshake protocol using the noise framework." -version = "0.39.0" +version = "0.39.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index ee609fd028d..1712176d7ef 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -41,12 +41,11 @@ //! ``` //! use libp2p_core::{identity, Transport, upgrade}; //! use libp2p_tcp::TcpTransport; -//! use libp2p_noise::{Keypair, X25519Spec, NoiseConfig}; +//! use libp2p_noise::{Keypair, X25519Spec, NoiseAuthenticated}; //! //! # fn main() { //! let id_keys = identity::Keypair::generate_ed25519(); -//! let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); -//! let noise = NoiseConfig::xx(dh_keys).into_authenticated(); +//! let noise = NoiseAuthenticated::xx(&id_keys).unwrap(); //! let builder = TcpTransport::default().upgrade(upgrade::Version::V1).authenticate(noise); //! // let transport = builder.multiplex(...); //! # } @@ -357,6 +356,19 @@ pub struct NoiseAuthenticated { config: NoiseConfig, } +impl NoiseAuthenticated { + /// Create a new [`NoiseAuthenticated`] for the `XX` handshake pattern using X25519 DH keys. + /// + /// For now, this is the only combination that is guaranteed to be compatible with other libp2p implementations. + pub fn xx(id_keys: &identity::Keypair) -> Result { + let dh_keys = Keypair::::new(); + let noise_keys = dh_keys.into_authentic(id_keys)?; + let config = NoiseConfig::xx(noise_keys); + + Ok(config.into_authenticated()) + } +} + impl UpgradeInfo for NoiseAuthenticated where NoiseConfig: UpgradeInfo, diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index 0148d03b4d6..14d09621dd9 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -27,7 +27,8 @@ use libp2p_core::identity; use libp2p_core::transport::{self, Transport}; use libp2p_core::upgrade::{self, apply_inbound, apply_outbound, Negotiated}; use libp2p_noise::{ - Keypair, NoiseConfig, NoiseError, NoiseOutput, RemoteIdentity, X25519Spec, X25519, + Keypair, NoiseAuthenticated, NoiseConfig, NoiseError, NoiseOutput, RemoteIdentity, X25519Spec, + X25519, }; use libp2p_tcp::TcpTransport; use log::info; @@ -39,8 +40,7 @@ fn core_upgrade_compat() { // Tests API compaibility with the libp2p-core upgrade API, // i.e. if it compiles, the "test" is considered a success. let id_keys = identity::Keypair::generate_ed25519(); - let dh_keys = Keypair::::new().into_authentic(&id_keys).unwrap(); - let noise = NoiseConfig::xx(dh_keys).into_authenticated(); + let noise = NoiseAuthenticated::xx(&id_keys).unwrap(); let _ = TcpTransport::default() .upgrade(upgrade::Version::V1) .authenticate(noise); diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index 621ce8d8e05..a5bd2fad149 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -29,7 +29,7 @@ yasna = "0.5.0" anyhow = "1.0.41" async-std = { version = "1.10.0", features = ["attributes"] } async-trait = "0.1.50" -libp2p = { version = "0.48.0", default-features = false, features = ["request-response"], path = "../.." } +libp2p = { version = "0.49.0", default-features = false, features = ["request-response"], path = "../.." } rand = "0.8.4" tracing-subscriber = {version = "0.3.8", features = ["env-filter"] } quickcheck = "1" diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 42cdd64acec..ff34ae49407 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.37.0 - [unreleased] + +- Update to `if-watch` `v2.0.0`. Simplify `IfWatcher` integration. + Use `if_watch::IfWatcher` for all runtimes. See [PR 2813]. + +[PR 2813]: https://github.com/libp2p/rust-libp2p/pull/2813 + # 0.36.0 - Update to `libp2p-core` `v0.36.0`. diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index d4577c74252..948d9507f0a 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-tcp" edition = "2021" rust-version = "1.56.1" description = "TCP/IP transport protocol for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,9 +14,7 @@ categories = ["network-programming", "asynchronous"] async-io-crate = { package = "async-io", version = "1.2.0", optional = true } futures = "0.3.8" futures-timer = "3.0" -if-watch = { version = "1.1.1", optional = true } -if-addrs = { version = "0.7.0", optional = true } -ipnet = "2.0.0" +if-watch = "2.0.0" libc = "0.2.80" libp2p-core = { version = "0.36.0", path = "../../core", default-features = false } log = "0.4.11" @@ -25,10 +23,10 @@ tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, [features] default = ["async-io"] -tokio = ["tokio-crate", "if-addrs"] -async-io = ["async-io-crate", "if-watch"] +tokio = ["tokio-crate"] +async-io = ["async-io-crate"] [dev-dependencies] async-std = { version = "1.6.5", features = ["attributes"] } -tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt"] } +tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net", "rt", "macros"] } env_logger = "0.9.0" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 981c896bcb5..f7b897c0d47 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -28,6 +28,7 @@ mod provider; +use if_watch::{IfEvent, IfWatcher}; #[cfg(feature = "async-io")] pub use provider::async_io; @@ -43,9 +44,8 @@ pub use provider::tokio; pub type TokioTcpTransport = GenTcpTransport; use futures::{ - future::{self, BoxFuture, Ready}, + future::{self, Ready}, prelude::*, - ready, }; use futures_timer::Delay; use libp2p_core::{ @@ -64,7 +64,7 @@ use std::{ time::Duration, }; -use provider::{IfEvent, Provider}; +use provider::{Incoming, Provider}; /// The configuration for a TCP/IP transport capability for libp2p. #[derive(Clone, Debug)] @@ -243,6 +243,9 @@ impl GenTcpConfig { /// # use libp2p_core::transport::{ListenerId, TransportEvent}; /// # use libp2p_core::{Multiaddr, Transport}; /// # use std::pin::Pin; + /// # #[cfg(not(feature = "async-io"))] + /// # fn main() {} + /// # /// #[cfg(feature = "async-io")] /// #[async_std::main] /// async fn main() -> std::io::Result<()> { @@ -368,7 +371,25 @@ where socket.bind(&socket_addr.into())?; socket.listen(self.config.backlog as _)?; socket.set_nonblocking(true)?; - TcpListenStream::::new(id, socket.into(), self.port_reuse.clone()) + let listener: TcpListener = socket.into(); + let local_addr = listener.local_addr()?; + + if local_addr.ip().is_unspecified() { + return TcpListenStream::::new( + id, + listener, + Some(IfWatcher::new()?), + self.port_reuse.clone(), + ); + } + + self.port_reuse.register(local_addr.ip(), local_addr.port()); + let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + self.pending_events.push_back(TransportEvent::NewAddress { + listener_id: id, + listen_addr, + }); + TcpListenStream::::new(id, listener, None, self.port_reuse.clone()) } } @@ -398,7 +419,6 @@ impl Transport for GenTcpTransport where T: Provider + Send + 'static, T::Listener: Unpin, - T::IfWatcher: Unpin, T::Stream: Unpin, { type Output = T::Stream; @@ -605,25 +625,6 @@ pub enum TcpListenerEvent { Error(io::Error), } -enum IfWatch { - Pending(BoxFuture<'static, io::Result>), - Ready(TIfWatcher), -} - -/// The listening addresses of a [`TcpListenStream`]. -enum InAddr { - /// The stream accepts connections on a single interface. - One { - addr: IpAddr, - out: Option, - }, - /// The stream accepts connections on all interfaces. - Any { - addrs: HashSet, - if_watch: IfWatch, - }, -} - /// A stream of incoming connections on one or more interfaces. pub struct TcpListenStream where @@ -637,12 +638,12 @@ where listen_addr: SocketAddr, /// The async listening socket for incoming connections. listener: T::Listener, - /// The IP addresses of network interfaces on which the listening socket - /// is accepting connections. + /// Watcher for network interface changes. + /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces + /// become or stop being available. /// - /// If the listen socket listens on all interfaces, these may change over - /// time as interfaces become available or unavailable. - in_addr: InAddr, + /// `None` if the socket is only listening on a single interface. + if_watcher: Option, /// The port reuse configuration for outgoing connections. /// /// If enabled, all IP addresses on which this listening stream @@ -666,27 +667,10 @@ where fn new( listener_id: ListenerId, listener: TcpListener, + if_watcher: Option, port_reuse: PortReuse, ) -> io::Result { let listen_addr = listener.local_addr()?; - - let in_addr = if match &listen_addr { - SocketAddr::V4(a) => a.ip().is_unspecified(), - SocketAddr::V6(a) => a.ip().is_unspecified(), - } { - // The `addrs` are populated via `if_watch` when the - // `TcpListenStream` is polled. - InAddr::Any { - addrs: HashSet::new(), - if_watch: IfWatch::Pending(T::if_watcher()), - } - } else { - InAddr::One { - out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())), - addr: listen_addr.ip(), - } - }; - let listener = T::new_listener(listener)?; Ok(TcpListenStream { @@ -694,7 +678,7 @@ where listener, listener_id, listen_addr, - in_addr, + if_watcher, pause: None, sleep_on_error: Duration::from_millis(100), }) @@ -707,15 +691,16 @@ where /// /// Has no effect if port reuse is disabled. fn disable_port_reuse(&mut self) { - match &self.in_addr { - InAddr::One { addr, .. } => { - self.port_reuse.unregister(*addr, self.listen_addr.port()); - } - InAddr::Any { addrs, .. } => { - for addr in addrs { - self.port_reuse.unregister(*addr, self.listen_addr.port()); + match &self.if_watcher { + Some(if_watcher) => { + for ip_net in if_watcher.iter() { + self.port_reuse + .unregister(ip_net.addr(), self.listen_addr.port()); } } + None => self + .port_reuse + .unregister(self.listen_addr.ip(), self.listen_addr.port()), } } } @@ -734,116 +719,78 @@ where T: Provider, T::Listener: Unpin, T::Stream: Unpin, - T::IfWatcher: Unpin, { type Item = Result, io::Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let me = Pin::into_inner(self); - loop { - match &mut me.in_addr { - InAddr::Any { if_watch, addrs } => match if_watch { - // If we listen on all interfaces, wait for `if-watch` to be ready. - IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) { - Ok(w) => { - *if_watch = IfWatch::Ready(w); - continue; - } - Err(err) => { - log::debug! { - "Failed to begin observing interfaces: {:?}. Scheduling retry.", - err - }; - *if_watch = IfWatch::Pending(T::if_watcher()); - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); - } - }, - // Consume all events for up/down interface changes. - IfWatch::Ready(watch) => { - while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) { - match ev { - Ok(IfEvent::Up(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.insert(ip) - { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("New listen address: {}", ma); - me.port_reuse.register(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::NewAddress(ma), - ))); - } - } - Ok(IfEvent::Down(inet)) => { - let ip = inet.addr(); - if me.listen_addr.is_ipv4() == ip.is_ipv4() && addrs.remove(&ip) - { - let ma = ip_to_multiaddr(ip, me.listen_addr.port()); - log::debug!("Expired listen address: {}", ma); - me.port_reuse.unregister(ip, me.listen_addr.port()); - return Poll::Ready(Some(Ok( - TcpListenerEvent::AddressExpired(ma), - ))); - } - } - Err(err) => { - log::debug! { - "Failure polling interfaces: {:?}. Scheduling retry.", - err - }; - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); - } - } - } - } - }, - // If the listener is bound to a single interface, make sure the - // address is registered for port reuse and reported once. - InAddr::One { addr, out } => { - if let Some(multiaddr) = out.take() { - me.port_reuse.register(*addr, me.listen_addr.port()); - return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(multiaddr)))); - } + if let Some(mut pause) = me.pause.take() { + match pause.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => { + me.pause = Some(pause); + return Poll::Pending; } } + } - if let Some(mut pause) = me.pause.take() { - match Pin::new(&mut pause).poll(cx) { - Poll::Ready(_) => {} - Poll::Pending => { - me.pause = Some(pause); - return Poll::Pending; + if let Some(if_watcher) = me.if_watcher.as_mut() { + while let Poll::Ready(event) = if_watcher.poll_if_event(cx) { + match event { + Ok(IfEvent::Up(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("New listen address: {}", ma); + me.port_reuse.register(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::NewAddress(ma)))); + } + } + Ok(IfEvent::Down(inet)) => { + let ip = inet.addr(); + if me.listen_addr.is_ipv4() == ip.is_ipv4() { + let ma = ip_to_multiaddr(ip, me.listen_addr.port()); + log::debug!("Expired listen address: {}", ma); + me.port_reuse.unregister(ip, me.listen_addr.port()); + return Poll::Ready(Some(Ok(TcpListenerEvent::AddressExpired(ma)))); + } + } + Err(err) => { + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(err)))); } } } + } - // Take the pending connection from the backlog. - let incoming = match T::poll_accept(&mut me.listener, cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(incoming)) => incoming, - Poll::Ready(Err(e)) => { - // These errors are non-fatal for the listener stream. - log::error!("error accepting incoming connection: {}", e); - me.pause = Some(Delay::new(me.sleep_on_error)); - return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); - } - }; + // Take the pending connection from the backlog. + match T::poll_accept(&mut me.listener, cx) { + Poll::Ready(Ok(Incoming { + local_addr, + remote_addr, + stream, + })) => { + let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port()); + let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port()); - let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port()); - let remote_addr = - ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port()); + log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); - log::debug!("Incoming connection from {} at {}", remote_addr, local_addr); + return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { + upgrade: future::ok(stream), + local_addr, + remote_addr, + }))); + } + Poll::Ready(Err(e)) => { + // These errors are non-fatal for the listener stream. + me.pause = Some(Delay::new(me.sleep_on_error)); + return Poll::Ready(Some(Ok(TcpListenerEvent::Error(e)))); + } + Poll::Pending => {} + }; - return Poll::Ready(Some(Ok(TcpListenerEvent::Upgrade { - upgrade: future::ok(incoming.stream), - local_addr, - remote_addr, - }))); - } + Poll::Pending } } @@ -991,7 +938,7 @@ mod tests { #[cfg(feature = "tokio")] { let (ready_tx, ready_rx) = mpsc::channel(1); - let listener = listener::(addr.clone(), ready_tx); + let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() @@ -1060,7 +1007,7 @@ mod tests { #[cfg(feature = "tokio")] { let (ready_tx, ready_rx) = mpsc::channel(1); - let listener = listener::(addr.clone(), ready_tx); + let listener = listener::(addr, ready_tx); let dialer = dialer::(ready_rx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() @@ -1168,7 +1115,7 @@ mod tests { let (ready_tx, ready_rx) = mpsc::channel(1); let (port_reuse_tx, port_reuse_rx) = oneshot::channel(); let listener = listener::(addr.clone(), ready_tx, port_reuse_rx); - let dialer = dialer::(addr.clone(), ready_rx, port_reuse_tx); + let dialer = dialer::(addr, ready_rx, port_reuse_tx); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .build() @@ -1209,10 +1156,7 @@ mod tests { match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await { TransportEvent::NewAddress { listen_addr: addr2, .. - } => { - assert_eq!(addr1, addr2); - return; - } + } => assert_eq!(addr1, addr2), e => panic!("Unexpected transport event: {:?}", e), } } @@ -1229,7 +1173,7 @@ mod tests { #[cfg(feature = "tokio")] { - let listener = listen_twice::(addr.clone()); + let listener = listen_twice::(addr); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .build() @@ -1267,7 +1211,7 @@ mod tests { .enable_io() .build() .unwrap(); - let new_addr = rt.block_on(listen::(addr.clone())); + let new_addr = rt.block_on(listen::(addr)); assert!(!new_addr.to_string().contains("tcp/0")); } } @@ -1290,7 +1234,7 @@ mod tests { #[cfg(feature = "tokio")] { let mut tcp = TokioTcpTransport::new(GenTcpConfig::new()); - assert!(tcp.listen_on(addr.clone()).is_err()); + assert!(tcp.listen_on(addr).is_err()); } } diff --git a/transports/tcp/src/provider.rs b/transports/tcp/src/provider.rs index 7ebeaa49ee8..a341026e7e6 100644 --- a/transports/tcp/src/provider.rs +++ b/transports/tcp/src/provider.rs @@ -28,18 +28,10 @@ pub mod tokio; use futures::future::BoxFuture; use futures::io::{AsyncRead, AsyncWrite}; -use ipnet::IpNet; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::task::{Context, Poll}; use std::{fmt, io}; -/// An event relating to a change of availability of an address -/// on a network interface. -pub enum IfEvent { - Up(IpNet), - Down(IpNet), -} - /// An incoming connection returned from [`Provider::poll_accept()`]. pub struct Incoming { pub stream: S, @@ -54,12 +46,6 @@ pub trait Provider: Clone + Send + 'static { type Stream: AsyncRead + AsyncWrite + Send + Unpin + fmt::Debug; /// The type of TCP listeners obtained from [`Provider::new_listener`]. type Listener: Send + Unpin; - /// The type of network interface observers obtained from [`Provider::if_watcher`]. - type IfWatcher: Send + Unpin; - - /// Creates an instance of [`Self::IfWatcher`] that can be polled for - /// network interface changes via [`Self::poll_interfaces`]. - fn if_watcher() -> BoxFuture<'static, io::Result>; /// Creates a new listener wrapping the given [`TcpListener`] that /// can be polled for incoming connections via [`Self::poll_accept()`]. @@ -77,8 +63,4 @@ pub trait Provider: Clone + Send + 'static { _: &mut Self::Listener, _: &mut Context<'_>, ) -> Poll>>; - - /// Polls a [`Self::IfWatcher`] for network interface changes, ensuring a task wakeup, - /// if necessary. - fn poll_interfaces(_: &mut Self::IfWatcher, _: &mut Context<'_>) -> Poll>; } diff --git a/transports/tcp/src/provider/async_io.rs b/transports/tcp/src/provider/async_io.rs index acbb4fbdcca..fc613d8fe86 100644 --- a/transports/tcp/src/provider/async_io.rs +++ b/transports/tcp/src/provider/async_io.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{IfEvent, Incoming, Provider}; +use super::{Incoming, Provider}; use async_io_crate::Async; use futures::future::{BoxFuture, FutureExt}; @@ -32,11 +32,6 @@ pub enum Tcp {} impl Provider for Tcp { type Stream = Async; type Listener = Async; - type IfWatcher = if_watch::IfWatcher; - - fn if_watcher() -> BoxFuture<'static, io::Result> { - if_watch::IfWatcher::new().boxed() - } fn new_listener(l: net::TcpListener) -> io::Result { Async::new(l) @@ -87,11 +82,4 @@ impl Provider for Tcp { remote_addr, })) } - - fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - w.poll_unpin(cx).map_ok(|e| match e { - if_watch::IfEvent::Up(a) => IfEvent::Up(a), - if_watch::IfEvent::Down(a) => IfEvent::Down(a), - }) - } } diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index 564eebfa48b..994a12a33c7 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -18,45 +18,24 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use super::{IfEvent, Incoming, Provider}; +use super::{Incoming, Provider}; use futures::{ - future::{self, BoxFuture, FutureExt}, + future::{BoxFuture, FutureExt}, prelude::*, }; -use futures_timer::Delay; -use if_addrs::{get_if_addrs, IfAddr}; -use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use std::collections::HashSet; use std::convert::TryFrom; use std::io; use std::net; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::Duration; #[derive(Copy, Clone)] pub enum Tcp {} -pub struct IfWatcher { - addrs: HashSet, - delay: Delay, - pending: Vec, -} - impl Provider for Tcp { type Stream = TcpStream; type Listener = tokio_crate::net::TcpListener; - type IfWatcher = IfWatcher; - - fn if_watcher() -> BoxFuture<'static, io::Result> { - future::ready(Ok(IfWatcher { - addrs: HashSet::new(), - delay: Delay::new(Duration::from_secs(0)), - pending: Vec::new(), - })) - .boxed() - } fn new_listener(l: net::TcpListener) -> io::Result { tokio_crate::net::TcpListener::try_from(l) @@ -104,51 +83,6 @@ impl Provider for Tcp { remote_addr, })) } - - fn poll_interfaces(w: &mut Self::IfWatcher, cx: &mut Context<'_>) -> Poll> { - loop { - if let Some(event) = w.pending.pop() { - return Poll::Ready(Ok(event)); - } - - match Pin::new(&mut w.delay).poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(()) => { - let ifs = get_if_addrs()?; - let addrs = ifs - .into_iter() - .map(|iface| match iface.addr { - IfAddr::V4(ip4) => { - let prefix_len = - (!u32::from_be_bytes(ip4.netmask.octets())).leading_zeros(); - let ipnet = Ipv4Net::new(ip4.ip, prefix_len as u8) - .expect("prefix_len can not exceed 32"); - IpNet::V4(ipnet) - } - IfAddr::V6(ip6) => { - let prefix_len = - (!u128::from_be_bytes(ip6.netmask.octets())).leading_zeros(); - let ipnet = Ipv6Net::new(ip6.ip, prefix_len as u8) - .expect("prefix_len can not exceed 128"); - IpNet::V6(ipnet) - } - }) - .collect::>(); - - for down in w.addrs.difference(&addrs) { - w.pending.push(IfEvent::Down(*down)); - } - - for up in addrs.difference(&w.addrs) { - w.pending.push(IfEvent::Up(*up)); - } - - w.addrs = addrs; - w.delay.reset(Duration::from_secs(10)); - } - } - } - } } /// A [`tokio_crate::net::TcpStream`] that implements [`AsyncRead`] and [`AsyncWrite`].