diff --git a/Cargo.lock b/Cargo.lock index fb2a020a34e..49c9cb5e8f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2143,7 +2143,7 @@ checksum = "7fc7aa29613bd6a620df431842069224d8bc9011086b1db4c0e0cd47fa03ec9a" [[package]] name = "libp2p" -version = "0.51.1" +version = "0.51.2" dependencies = [ "async-std", "async-trait", @@ -2156,6 +2156,7 @@ dependencies = [ "getrandom 0.2.8", "instant", "libp2p-autonat", + "libp2p-connection-limits", "libp2p-core", "libp2p-dcutr", "libp2p-deflate", @@ -2215,6 +2216,23 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "libp2p-connection-limits" +version = "0.1.0" +dependencies = [ + "async-std", + "libp2p-core", + "libp2p-identify", + "libp2p-identity", + "libp2p-ping", + "libp2p-swarm", + "libp2p-swarm-derive", + "libp2p-swarm-test", + "quickcheck-ext", + "rand 0.8.5", + "void", +] + [[package]] name = "libp2p-core" version = "0.39.1" @@ -2752,7 +2770,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" -version = "0.42.0" +version = "0.42.1" dependencies = [ "async-std", "either", @@ -2769,6 +2787,7 @@ dependencies = [ "libp2p-ping", "libp2p-plaintext", "libp2p-swarm-derive", + "libp2p-swarm-test", "libp2p-yamux", "log", "quickcheck-ext", diff --git a/Cargo.toml b/Cargo.toml index dbe2947fb46..dcfb8b1b230 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,43 +11,43 @@ members = [ "examples/ping-example", "examples/rendezvous", "identity", + "interop-tests", + "misc/connection-limits", + "misc/keygen", "misc/metrics", "misc/multistream-select", - "misc/rw-stream-sink", - "misc/keygen", "misc/quick-protobuf-codec", "misc/quickcheck-ext", + "misc/rw-stream-sink", "muxers/mplex", - "muxers/yamux", "muxers/test-harness", - "protocols/dcutr", + "muxers/yamux", "protocols/autonat", + "protocols/dcutr", "protocols/floodsub", "protocols/gossipsub", - "protocols/rendezvous", "protocols/identify", "protocols/kad", "protocols/mdns", "protocols/perf", "protocols/ping", "protocols/relay", + "protocols/rendezvous", "protocols/request-response", "swarm", "swarm-derive", - "interop-tests", "swarm-test", "transports/deflate", "transports/dns", "transports/noise", - "transports/tls", "transports/plaintext", "transports/pnet", "transports/quic", "transports/tcp", + "transports/tls", "transports/uds", - "transports/websocket", "transports/wasm-ext", "transports/webrtc", - "interop-tests" + "transports/websocket", ] resolver = "2" diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 64423ea46ee..36e4204ca88 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.52.2 - unreleased + +- Introduce `libp2p::connection_limits` module. + See [PR 3386]. + +[PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386 + # 0.51.1 - Depend on `libp2p-tls` `v0.1.0`. diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index ee4d4a2d5c1..753257a7c5d 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p" edition = "2021" rust-version = "1.65.0" description = "Peer-to-peer networking library" -version = "0.51.1" +version = "0.51.2" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -97,6 +97,7 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature libp2p-autonat = { version = "0.10.0", path = "../protocols/autonat", optional = true } +libp2p-connection-limits = { version = "0.1.0", path = "../misc/connection-limits" } libp2p-core = { version = "0.39.0", path = "../core" } libp2p-dcutr = { version = "0.9.0", path = "../protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.42.0", path = "../protocols/floodsub", optional = true } diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 5a58b149e97..0fe8ee9c0fd 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -44,6 +44,8 @@ pub use multiaddr; #[doc(inline)] pub use libp2p_autonat as autonat; #[doc(inline)] +pub use libp2p_connection_limits as connection_limits; +#[doc(inline)] pub use libp2p_core as core; #[cfg(feature = "dcutr")] #[doc(inline)] diff --git a/misc/connection-limits/CHANGELOG.md b/misc/connection-limits/CHANGELOG.md new file mode 100644 index 00000000000..d1e4ec5a44e --- /dev/null +++ b/misc/connection-limits/CHANGELOG.md @@ -0,0 +1,3 @@ +# 0.1.0 - unreleased + +- Initial release. diff --git a/misc/connection-limits/Cargo.toml b/misc/connection-limits/Cargo.toml new file mode 100644 index 00000000000..3bf2675beb4 --- /dev/null +++ b/misc/connection-limits/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "libp2p-connection-limits" +edition = "2021" +rust-version = "1.62.0" +description = "Connection limits for libp2p." +version = "0.1.0" +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +libp2p-core = { version = "0.39.0", path = "../../core" } +libp2p-swarm = { version = "0.42.0", path = "../../swarm" } +libp2p-identity = { version = "0.1.0", path = "../../identity", features = ["peerid"] } +void = "1" + +[dev-dependencies] +async-std = { version = "1.12.0", features = ["attributes"] } +libp2p-identify = { path = "../../protocols/identify" } +libp2p-ping = { path = "../../protocols/ping" } +libp2p-swarm-derive = { path = "../../swarm-derive" } +libp2p-swarm-test = { path = "../../swarm-test" } +quickcheck-ext = { path = "../quickcheck-ext" } +rand = "0.8.5" diff --git a/misc/connection-limits/src/lib.rs b/misc/connection-limits/src/lib.rs new file mode 100644 index 00000000000..1f6927b3bd2 --- /dev/null +++ b/misc/connection-limits/src/lib.rs @@ -0,0 +1,483 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::{Endpoint, Multiaddr}; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + dummy, ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, + NetworkBehaviourAction, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, +}; +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::task::{Context, Poll}; +use void::Void; + +/// A [`NetworkBehaviour`] that enforces a set of [`ConnectionLimits`]. +/// +/// For these limits to take effect, this needs to be composed into the behaviour tree of your application. +/// +/// If a connection is denied due to a limit, either a [`SwarmEvent::IncomingConnectionError`](libp2p_swarm::SwarmEvent::IncomingConnectionError) +/// or [`SwarmEvent::OutgoingConnectionError`](libp2p_swarm::SwarmEvent::OutgoingConnectionError) will be emitted. +/// The [`ListenError::Denied`](libp2p_swarm::ListenError::Denied) and respectively the [`DialError::Denied`](libp2p_swarm::DialError::Denied) variant +/// contain a [`ConnectionDenied`](libp2p_swarm::ConnectionDenied) type that can be downcast to [`Exceeded`] error if (and only if) **this** +/// behaviour denied the connection. +/// +/// If you employ multiple [`NetworkBehaviour`]s that manage connections, it may also be a different error. +/// +/// # Example +/// +/// ```rust +/// # use libp2p_identify as identify; +/// # use libp2p_ping as ping; +/// # use libp2p_swarm_derive::NetworkBehaviour; +/// # use libp2p_connection_limits as connection_limits; +/// +/// #[derive(NetworkBehaviour)] +/// # #[behaviour(prelude = "libp2p_swarm::derive_prelude")] +/// struct MyBehaviour { +/// identify: identify::Behaviour, +/// ping: ping::Behaviour, +/// limits: connection_limits::Behaviour +/// } +/// ``` +pub struct Behaviour { + limits: ConnectionLimits, + + pending_inbound_connections: HashSet, + pending_outbound_connections: HashSet, + established_inbound_connections: HashSet, + established_outbound_connections: HashSet, + established_per_peer: HashMap>, +} + +impl Behaviour { + pub fn new(limits: ConnectionLimits) -> Self { + Self { + limits, + pending_inbound_connections: Default::default(), + pending_outbound_connections: Default::default(), + established_inbound_connections: Default::default(), + established_outbound_connections: Default::default(), + established_per_peer: Default::default(), + } + } + + fn check_limit( + &mut self, + limit: Option, + current: usize, + kind: Kind, + ) -> Result<(), ConnectionDenied> { + let limit = limit.unwrap_or(u32::MAX); + let current = current as u32; + + if current >= limit { + return Err(ConnectionDenied::new(Exceeded { limit, kind })); + } + + Ok(()) + } +} + +/// A connection limit has been exceeded. +#[derive(Debug, Clone, Copy)] +pub struct Exceeded { + limit: u32, + kind: Kind, +} + +impl Exceeded { + pub fn limit(&self) -> u32 { + self.limit + } +} + +impl fmt::Display for Exceeded { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "connection limit exceeded: at most {} {} are allowed", + self.limit, self.kind + ) + } +} + +#[derive(Debug, Clone, Copy)] +enum Kind { + PendingIncoming, + PendingOutgoing, + EstablishedIncoming, + EstablishedOutgoing, + EstablishedPerPeer, + EstablishedTotal, +} + +impl fmt::Display for Kind { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Kind::PendingIncoming => write!(f, "pending incoming connections"), + Kind::PendingOutgoing => write!(f, "pending outgoing connections"), + Kind::EstablishedIncoming => write!(f, "established incoming connections"), + Kind::EstablishedOutgoing => write!(f, "established outgoing connections"), + Kind::EstablishedPerPeer => write!(f, "established connections per peer"), + Kind::EstablishedTotal => write!(f, "established connections"), + } + } +} + +impl std::error::Error for Exceeded {} + +/// The configurable connection limits. +#[derive(Debug, Clone, Default)] +pub struct ConnectionLimits { + max_pending_incoming: Option, + max_pending_outgoing: Option, + max_established_incoming: Option, + max_established_outgoing: Option, + max_established_per_peer: Option, + max_established_total: Option, +} + +impl ConnectionLimits { + /// Configures the maximum number of concurrently incoming connections being established. + pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { + self.max_pending_incoming = limit; + self + } + + /// Configures the maximum number of concurrently outgoing connections being established. + pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { + self.max_pending_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established inbound connections. + pub fn with_max_established_incoming(mut self, limit: Option) -> Self { + self.max_established_incoming = limit; + self + } + + /// Configures the maximum number of concurrent established outbound connections. + pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { + self.max_established_outgoing = limit; + self + } + + /// Configures the maximum number of concurrent established connections (both + /// inbound and outbound). + /// + /// Note: This should be used in conjunction with + /// [`ConnectionLimits::with_max_established_incoming`] to prevent possible + /// eclipse attacks (all connections being inbound). + pub fn with_max_established(mut self, limit: Option) -> Self { + self.max_established_total = limit; + self + } + + /// Configures the maximum number of concurrent established connections per peer, + /// regardless of direction (incoming or outgoing). + pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { + self.max_established_per_peer = limit; + self + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = dummy::ConnectionHandler; + type OutEvent = Void; + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.check_limit( + self.limits.max_pending_incoming, + self.pending_inbound_connections.len(), + Kind::PendingIncoming, + )?; + + self.pending_inbound_connections.insert(connection_id); + + Ok(()) + } + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + _: &Multiaddr, + _: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.pending_inbound_connections.remove(&connection_id); + + self.check_limit( + self.limits.max_established_incoming, + self.established_inbound_connections.len(), + Kind::EstablishedIncoming, + )?; + self.check_limit( + self.limits.max_established_per_peer, + self.established_per_peer + .get(&peer) + .map(|connections| connections.len()) + .unwrap_or(0), + Kind::EstablishedPerPeer, + )?; + self.check_limit( + self.limits.max_established_total, + self.established_inbound_connections.len() + + self.established_outbound_connections.len(), + Kind::EstablishedTotal, + )?; + + self.established_inbound_connections.insert(connection_id); + self.established_per_peer + .entry(peer) + .or_default() + .insert(connection_id); + + Ok(dummy::ConnectionHandler) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + _: Option, + _: &[Multiaddr], + _: Endpoint, + ) -> Result, ConnectionDenied> { + self.check_limit( + self.limits.max_pending_outgoing, + self.pending_outbound_connections.len(), + Kind::PendingOutgoing, + )?; + + self.pending_outbound_connections.insert(connection_id); + + Ok(vec![]) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + _: &Multiaddr, + _: Endpoint, + ) -> Result, ConnectionDenied> { + self.pending_outbound_connections.remove(&connection_id); + + self.check_limit( + self.limits.max_established_outgoing, + self.established_outbound_connections.len(), + Kind::EstablishedOutgoing, + )?; + self.check_limit( + self.limits.max_established_per_peer, + self.established_per_peer + .get(&peer) + .map(|connections| connections.len()) + .unwrap_or(0), + Kind::EstablishedPerPeer, + )?; + self.check_limit( + self.limits.max_established_total, + self.established_inbound_connections.len() + + self.established_outbound_connections.len(), + Kind::EstablishedTotal, + )?; + + self.established_outbound_connections.insert(connection_id); + self.established_per_peer + .entry(peer) + .or_default() + .insert(connection_id); + + Ok(dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id, + .. + }) => { + self.established_inbound_connections.remove(&connection_id); + self.established_outbound_connections.remove(&connection_id); + self.established_per_peer + .entry(peer_id) + .or_default() + .remove(&connection_id); + } + FromSwarm::ConnectionEstablished(_) => {} + FromSwarm::AddressChange(_) => {} + FromSwarm::DialFailure(_) => {} + FromSwarm::ListenFailure(_) => {} + FromSwarm::NewListener(_) => {} + FromSwarm::NewListenAddr(_) => {} + FromSwarm::ExpiredListenAddr(_) => {} + FromSwarm::ListenerError(_) => {} + FromSwarm::ListenerClosed(_) => {} + FromSwarm::NewExternalAddr(_) => {} + FromSwarm::ExpiredExternalAddr(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + _id: PeerId, + _: ConnectionId, + event: THandlerOutEvent, + ) { + void::unreachable(event) + } + + fn poll( + &mut self, + _: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>> { + Poll::Pending + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libp2p_swarm::{dial_opts::DialOpts, DialError, ListenError, Swarm, SwarmEvent}; + use libp2p_swarm_test::SwarmExt; + use quickcheck_ext::*; + + #[test] + fn max_outgoing() { + use rand::Rng; + + let outgoing_limit = rand::thread_rng().gen_range(1..10); + + let mut network = Swarm::new_ephemeral(|_| { + Behaviour::new( + ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)), + ) + }); + + let addr: Multiaddr = "/memory/1234".parse().unwrap(); + let target = PeerId::random(); + + for _ in 0..outgoing_limit { + network + .dial( + DialOpts::peer_id(target) + .addresses(vec![addr.clone()]) + .build(), + ) + .expect("Unexpected connection limit."); + } + + match network + .dial(DialOpts::peer_id(target).addresses(vec![addr]).build()) + .expect_err("Unexpected dialing success.") + { + DialError::Denied { cause } => { + let exceeded = cause + .downcast::() + .expect("connection denied because of limit"); + + assert_eq!(exceeded.limit(), outgoing_limit); + } + e => panic!("Unexpected error: {e:?}"), + } + + let info = network.network_info(); + assert_eq!(info.num_peers(), 0); + assert_eq!( + info.connection_counters().num_pending_outgoing(), + outgoing_limit + ); + } + + #[test] + fn max_established_incoming() { + fn prop(Limit(limit): Limit) { + let mut swarm1 = Swarm::new_ephemeral(|_| { + Behaviour::new( + ConnectionLimits::default().with_max_established_incoming(Some(limit)), + ) + }); + let mut swarm2 = Swarm::new_ephemeral(|_| { + Behaviour::new( + ConnectionLimits::default().with_max_established_incoming(Some(limit)), + ) + }); + + async_std::task::block_on(async { + let (listen_addr, _) = swarm1.listen().await; + + for _ in 0..limit { + swarm2.connect(&mut swarm1).await; + } + + swarm2.dial(listen_addr).unwrap(); + + async_std::task::spawn(swarm2.loop_on_next()); + + let cause = swarm1 + .wait(|event| match event { + SwarmEvent::IncomingConnectionError { + error: ListenError::Denied { cause }, + .. + } => Some(cause), + _ => None, + }) + .await; + + assert_eq!(cause.downcast::().unwrap().limit, limit); + }); + } + + #[derive(Debug, Clone)] + struct Limit(u32); + + impl Arbitrary for Limit { + fn arbitrary(g: &mut Gen) -> Self { + Self(g.gen_range(1..10)) + } + } + + quickcheck(prop as fn(_)); + } + + #[derive(libp2p_swarm_derive::NetworkBehaviour)] + #[behaviour(prelude = "libp2p_swarm::derive_prelude")] + struct Behaviour { + limits: super::Behaviour, + keep_alive: libp2p_swarm::keep_alive::Behaviour, + } + + impl Behaviour { + fn new(limits: ConnectionLimits) -> Self { + Self { + limits: super::Behaviour::new(limits), + keep_alive: libp2p_swarm::keep_alive::Behaviour, + } + } + } +} diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index ff19c650cd9..8f80c74d6ec 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -225,6 +225,7 @@ impl super::Recorder record(OutgoingConnectionError::Banned), + #[allow(deprecated)] libp2p_swarm::DialError::ConnectionLimit(_) => { record(OutgoingConnectionError::ConnectionLimit) } @@ -371,6 +372,7 @@ impl From<&libp2p_swarm::ListenError> for IncomingConnectionError { fn from(error: &libp2p_swarm::ListenError) -> Self { match error { libp2p_swarm::ListenError::WrongPeerId { .. } => IncomingConnectionError::WrongPeerId, + #[allow(deprecated)] libp2p_swarm::ListenError::ConnectionLimit(_) => { IncomingConnectionError::ConnectionLimit } diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 6c13bf46403..9bb1cdf33e7 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -1924,7 +1924,6 @@ where match error { DialError::Banned - | DialError::ConnectionLimit(_) | DialError::LocalPeerId { .. } | DialError::InvalidPeerId { .. } | DialError::WrongPeerId { .. } @@ -1951,6 +1950,8 @@ where DialError::DialPeerConditionFalse(dial_opts::PeerCondition::Always) => { unreachable!("DialPeerCondition::Always can not trigger DialPeerConditionFalse."); } + #[allow(deprecated)] + DialError::ConnectionLimit(_) => {} } } diff --git a/swarm-test/src/lib.rs b/swarm-test/src/lib.rs index ebc8b0412d2..94bad497e8f 100644 --- a/swarm-test/src/lib.rs +++ b/swarm-test/src/lib.rs @@ -27,6 +27,7 @@ use libp2p_core::{ }; use libp2p_identity::PeerId; use libp2p_plaintext::PlainText2Config; +use libp2p_swarm::dial_opts::PeerCondition; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, THandlerErr, @@ -235,6 +236,7 @@ where let dial_opts = DialOpts::peer_id(*other.local_peer_id()) .addresses(external_addresses) + .condition(PeerCondition::Always) .build(); self.dial(dial_opts).unwrap(); diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index df04507376e..c3dcb1ec4f1 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.42.1 [unreleased] + +- Deprecate `ConnectionLimits` in favor of `libp2p::connection_limits`. + See [PR 3386]. + +[PR 3386]: https://github.com/libp2p/rust-libp2p/pull/3386 + # 0.42.0 - Allow `NetworkBehaviour`s to manage connections. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index ed1d03f31a9..47ab4489407 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.62.0" description = "The libp2p swarm" -version = "0.42.0" +version = "0.42.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -47,6 +47,7 @@ libp2p-kad = { path = "../protocols/kad" } libp2p-ping = { path = "../protocols/ping" } libp2p-plaintext = { path = "../transports/plaintext" } libp2p-swarm-derive = { path = "../swarm-derive" } +libp2p-swarm-test = { path = "../swarm-test" } libp2p-yamux = { path = "../muxers/yamux" } quickcheck = { package = "quickcheck-ext", path = "../misc/quickcheck-ext" } void = "1" diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index b42bdde9620..131f8715783 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -377,6 +377,7 @@ impl<'a> IncomingInfo<'a> { } /// Information about a connection limit. +#[deprecated(note = "Use `libp2p::connection_limits` instead.", since = "0.42.1")] #[derive(Debug, Clone, Copy)] pub struct ConnectionLimit { /// The maximum number of connections. @@ -385,6 +386,7 @@ pub struct ConnectionLimit { pub current: u32, } +#[allow(deprecated)] impl fmt::Display for ConnectionLimit { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( @@ -396,6 +398,7 @@ impl fmt::Display for ConnectionLimit { } /// A `ConnectionLimit` can represent an error if it has been exceeded. +#[allow(deprecated)] impl std::error::Error for ConnectionLimit {} struct SubstreamUpgrade { diff --git a/swarm/src/connection/error.rs b/swarm/src/connection/error.rs index a5a6136f0b1..49a3ee65f12 100644 --- a/swarm/src/connection/error.rs +++ b/swarm/src/connection/error.rs @@ -18,9 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +#[allow(deprecated)] +use crate::connection::ConnectionLimit; use crate::transport::TransportError; use crate::Multiaddr; -use crate::{connection::ConnectionLimit, ConnectedPoint, PeerId}; +use crate::{ConnectedPoint, PeerId}; use std::{fmt, io}; /// Errors that can occur in the context of an established `Connection`. @@ -90,6 +92,11 @@ pub enum PendingConnectionError { /// The connection was dropped because the connection limit /// for a peer has been reached. + #[deprecated( + note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.", + since = "0.42.1" + )] + #[allow(deprecated)] ConnectionLimit(ConnectionLimit), /// Pending connection attempt has been aborted. @@ -110,6 +117,7 @@ impl PendingConnectionError { pub fn map(self, f: impl FnOnce(T) -> U) -> PendingConnectionError { match self { PendingConnectionError::Transport(t) => PendingConnectionError::Transport(f(t)), + #[allow(deprecated)] PendingConnectionError::ConnectionLimit(l) => { PendingConnectionError::ConnectionLimit(l) } @@ -137,6 +145,7 @@ where "Pending connection: Transport error on connection: {err}" ) } + #[allow(deprecated)] PendingConnectionError::ConnectionLimit(l) => { write!(f, "Connection error: Connection limit: {l}.") } @@ -163,6 +172,7 @@ where PendingConnectionError::WrongPeerId { .. } => None, PendingConnectionError::LocalPeerId { .. } => None, PendingConnectionError::Aborted => None, + #[allow(deprecated)] PendingConnectionError::ConnectionLimit(..) => None, } } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 8d80a8f7870..5cb70173a92 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -18,12 +18,13 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::{Connection, ConnectionId, PendingPoint}; +#[allow(deprecated)] +use crate::connection::{Connection, ConnectionId, ConnectionLimit, PendingPoint}; #[allow(deprecated)] use crate::IntoConnectionHandler; use crate::{ connection::{ - Connected, ConnectionError, ConnectionLimit, IncomingInfo, PendingConnectionError, + Connected, ConnectionError, IncomingInfo, PendingConnectionError, PendingInboundConnectionError, PendingOutboundConnectionError, }, transport::TransportError, @@ -304,6 +305,7 @@ where THandler: ConnectionHandler, { /// Creates a new empty `Pool`. + #[allow(deprecated)] pub fn new(local_id: PeerId, config: PoolConfig, limits: ConnectionLimits) -> Self { let (pending_connection_events_tx, pending_connection_events_rx) = mpsc::channel(0); let executor = match config.executor { @@ -407,6 +409,7 @@ where /// /// Returns an error if the limit of pending outgoing connections /// has been reached. + #[allow(deprecated)] pub fn add_outgoing( &mut self, dials: Vec< @@ -461,6 +464,7 @@ where /// /// Returns an error if the limit of pending incoming connections /// has been reached. + #[allow(deprecated)] pub fn add_incoming( &mut self, future: TFut, @@ -679,6 +683,8 @@ where ), }; + #[allow(deprecated)] + // Remove once `PendingConnectionError::ConnectionLimit` is gone. let error = self .counters // Check general established connection limit. @@ -865,6 +871,7 @@ impl Drop for NewConnection { #[derive(Debug, Clone)] pub struct ConnectionCounters { /// The effective connection limits. + #[allow(deprecated)] limits: ConnectionLimits, /// The current number of incoming connections. pending_incoming: u32, @@ -877,6 +884,7 @@ pub struct ConnectionCounters { } impl ConnectionCounters { + #[allow(deprecated)] fn new(limits: ConnectionLimits) -> Self { Self { limits, @@ -888,6 +896,8 @@ impl ConnectionCounters { } /// The effective connection limits. + #[deprecated(note = "Use the `libp2p::connection_limits` instead.")] + #[allow(deprecated)] pub fn limits(&self) -> &ConnectionLimits { &self.limits } @@ -975,14 +985,17 @@ impl ConnectionCounters { } } + #[allow(deprecated)] fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) } + #[allow(deprecated)] fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { Self::check(self.pending_incoming, self.limits.max_pending_incoming) } + #[allow(deprecated)] fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { // Check total connection limit. Self::check(self.num_established(), self.limits.max_established_total)?; @@ -999,10 +1012,12 @@ impl ConnectionCounters { } } + #[allow(deprecated)] fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { Self::check(current, self.limits.max_established_per_peer) } + #[allow(deprecated)] fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { if let Some(limit) = limit { if current >= limit { @@ -1027,6 +1042,7 @@ fn num_peer_established( /// /// By default no connection limits apply. #[derive(Debug, Clone, Default)] +#[deprecated(note = "Use `libp2p::connection_limits` instead.", since = "0.42.1")] pub struct ConnectionLimits { max_pending_incoming: Option, max_pending_outgoing: Option, @@ -1036,6 +1052,7 @@ pub struct ConnectionLimits { max_established_total: Option, } +#[allow(deprecated)] impl ConnectionLimits { /// Configures the maximum number of concurrently incoming connections being established. pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 34928b15578..1180e7185c8 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -104,14 +104,17 @@ pub mod derive_prelude { pub use libp2p_identity::PeerId; } +#[allow(deprecated)] +pub use crate::connection::ConnectionLimit; pub use behaviour::{ AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure, ListenerClosed, ListenerError, NetworkBehaviour, NetworkBehaviourAction, NewExternalAddr, NewListenAddr, NotifyHandler, PollParameters, }; +#[allow(deprecated)] pub use connection::pool::{ConnectionCounters, ConnectionLimits}; -pub use connection::{ConnectionError, ConnectionId, ConnectionLimit}; +pub use connection::{ConnectionError, ConnectionId}; pub use executor::Executor; #[allow(deprecated)] pub use handler::IntoConnectionHandler; @@ -651,6 +654,7 @@ where ) { Ok(()) => Ok(()), Err(connection_limit) => { + #[allow(deprecated)] let error = DialError::ConnectionLimit(connection_limit); self.behaviour .on_swarm_event(FromSwarm::DialFailure(DialFailure { @@ -1089,6 +1093,7 @@ where }); } Err(connection_limit) => { + #[allow(deprecated)] let error = ListenError::ConnectionLimit(connection_limit); self.behaviour .on_swarm_event(FromSwarm::ListenFailure(ListenFailure { @@ -1511,6 +1516,7 @@ pub struct SwarmBuilder { transport: transport::Boxed<(PeerId, StreamMuxerBox)>, behaviour: TBehaviour, pool_config: PoolConfig, + #[allow(deprecated)] connection_limits: ConnectionLimits, } @@ -1655,6 +1661,7 @@ where } /// Configures the connection limits. + #[allow(deprecated)] pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self { self.connection_limits = limits; self @@ -1712,6 +1719,11 @@ pub enum DialError { Banned, /// The configured limit for simultaneous outgoing connections /// has been reached. + #[deprecated( + note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.", + since = "0.42.1" + )] + #[allow(deprecated)] ConnectionLimit(ConnectionLimit), /// The peer identity obtained on the connection matches the local peer. LocalPeerId { @@ -1742,6 +1754,7 @@ pub enum DialError { impl From for DialError { fn from(error: PendingOutboundConnectionError) -> Self { match error { + #[allow(deprecated)] PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit), PendingConnectionError::Aborted => DialError::Aborted, PendingConnectionError::WrongPeerId { obtained, endpoint } => { @@ -1756,6 +1769,7 @@ impl From for DialError { impl fmt::Display for DialError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + #[allow(deprecated)] DialError::ConnectionLimit(err) => write!(f, "Dial error: {err}"), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), DialError::LocalPeerId { endpoint } => write!( @@ -1809,6 +1823,7 @@ fn print_error_chain(f: &mut fmt::Formatter<'_>, e: &dyn error::Error) -> fmt::R impl error::Error for DialError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { + #[allow(deprecated)] DialError::ConnectionLimit(err) => Some(err), DialError::LocalPeerId { .. } => None, DialError::NoAddresses => None, @@ -1828,6 +1843,11 @@ impl error::Error for DialError { pub enum ListenError { /// The configured limit for simultaneous outgoing connections /// has been reached. + #[deprecated( + note = "Use `libp2p::connection_limits` instead and handle `{Dial,Listen}Error::Denied::cause`.", + since = "0.42.1" + )] + #[allow(deprecated)] ConnectionLimit(ConnectionLimit), /// Pending connection attempt has been aborted. Aborted, @@ -1851,6 +1871,7 @@ impl From for ListenError { fn from(error: PendingInboundConnectionError) -> Self { match error { PendingInboundConnectionError::Transport(inner) => ListenError::Transport(inner), + #[allow(deprecated)] PendingInboundConnectionError::ConnectionLimit(inner) => { ListenError::ConnectionLimit(inner) } @@ -1868,6 +1889,7 @@ impl From for ListenError { impl fmt::Display for ListenError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { + #[allow(deprecated)] ListenError::ConnectionLimit(_) => write!(f, "Listen error"), ListenError::Aborted => write!( f, @@ -1893,6 +1915,7 @@ impl fmt::Display for ListenError { impl error::Error for ListenError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { + #[allow(deprecated)] ListenError::ConnectionLimit(err) => Some(err), ListenError::WrongPeerId { .. } => None, ListenError::Transport(err) => Some(err), @@ -1914,6 +1937,19 @@ impl ConnectionDenied { inner: Box::new(cause), } } + + /// Attempt to downcast to a particular reason for why the connection was denied. + pub fn downcast(self) -> Result + where + E: error::Error + Send + Sync + 'static, + { + let inner = self + .inner + .downcast::() + .map_err(|inner| ConnectionDenied { inner })?; + + Ok(*inner) + } } impl fmt::Display for ConnectionDenied { @@ -2485,6 +2521,7 @@ mod tests { } #[test] + #[allow(deprecated)] fn max_outgoing() { use rand::Rng; @@ -2512,6 +2549,7 @@ mod tests { .dial(DialOpts::peer_id(target).addresses(vec![addr]).build()) .expect_err("Unexpected dialing success.") { + #[allow(deprecated)] DialError::ConnectionLimit(limit) => { assert_eq!(limit.current, outgoing_limit); assert_eq!(limit.limit, outgoing_limit); @@ -2538,6 +2576,7 @@ mod tests { } } + #[allow(deprecated)] fn limits(limit: u32) -> ConnectionLimits { ConnectionLimits::default().with_max_established_incoming(Some(limit)) } @@ -2545,9 +2584,11 @@ mod tests { fn prop(limit: Limit) { let limit = limit.0; + #[allow(deprecated)] let mut network1 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler) .connection_limits(limits(limit)) .build(); + #[allow(deprecated)] let mut network2 = new_test_swarm::<_, ()>(keep_alive::ConnectionHandler) .connection_limits(limits(limit)) .build(); @@ -2580,6 +2621,7 @@ mod tests { Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => { network_1_established = true; } + #[allow(deprecated)] Poll::Ready(Some(SwarmEvent::IncomingConnectionError { error: ListenError::ConnectionLimit(err), ..