diff --git a/Cargo.lock b/Cargo.lock index c0ada652504..652d88707e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3109,7 +3109,7 @@ dependencies = [ [[package]] name = "libp2p-relay" -version = "0.18.0" +version = "0.18.1" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 8869505921d..38d045ab5a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ libp2p-ping = { version = "0.45.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.42.0", path = "transports/plaintext" } libp2p-pnet = { version = "0.25.0", path = "transports/pnet" } libp2p-quic = { version = "0.11.1", path = "transports/quic" } -libp2p-relay = { version = "0.18.0", path = "protocols/relay" } +libp2p-relay = { version = "0.18.1", path = "protocols/relay" } libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" } libp2p-request-response = { version = "0.27.0", path = "protocols/request-response" } libp2p-server = { version = "0.12.8", path = "misc/server" } diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index 084ee744145..1a08f331506 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -57,8 +57,7 @@ async fn connect() { let dst_relayed_addr = relay_tcp_addr .with(Protocol::P2p(relay_peer_id)) - .with(Protocol::P2pCircuit) - .with(Protocol::P2p(dst_peer_id)); + .with(Protocol::P2pCircuit); dst.listen_on(dst_relayed_addr.clone()).unwrap(); wait_for_reservation( @@ -70,7 +69,8 @@ async fn connect() { .await; async_std::task::spawn(dst.loop_on_next()); - src.dial_and_wait(dst_relayed_addr.clone()).await; + src.dial_and_wait(dst_relayed_addr.with(Protocol::P2p(dst_peer_id))) + .await; let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id)); diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index fc71ccedad5..9bdb2e72348 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.18.1 + +- Fix relayed `ExternalAddr` not expiring when stopping to listen through relay. + Removing `/p2p/` part from the listen and external addresses reported by the relay `Behaviour`. + See [PR 5577](https://github.com/libp2p/rust-libp2p/pull/5577). + ## 0.18.0 diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 084fec07efd..16449f57df5 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition = "2021" rust-version = { workspace = true } description = "Communications relaying for libp2p" -version = "0.18.0" +version = "0.18.1" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/relay/src/priv_client.rs b/protocols/relay/src/priv_client.rs index 8bbc813ec4c..9a43cc65a85 100644 --- a/protocols/relay/src/priv_client.rs +++ b/protocols/relay/src/priv_client.rs @@ -33,15 +33,15 @@ use futures::future::{BoxFuture, FutureExt}; use futures::io::{AsyncRead, AsyncWrite}; use futures::ready; use futures::stream::StreamExt; -use libp2p_core::multiaddr::Protocol; use libp2p_core::transport::PortUse; use libp2p_core::{Endpoint, Multiaddr}; use libp2p_identity::PeerId; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour, - NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, + dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, ExpiredListenAddr, + NetworkBehaviour, NewListenAddr, NotifyHandler, Stream, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -71,12 +71,6 @@ pub enum Event { }, } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum ReservationStatus { - Pending, - Confirmed, -} - /// [`NetworkBehaviour`] implementation of the relay client /// functionality of the circuit relay v2 protocol. pub struct Behaviour { @@ -87,11 +81,6 @@ pub struct Behaviour { /// connection. directly_connected_peers: HashMap>, - /// Stores the address of a pending or confirmed reservation. - /// - /// This is indexed by the [`ConnectionId`] to a relay server and the address is the `/p2p-circuit` address we reserved on it. - reservation_addresses: HashMap, - /// Queue of actions to return when polled. queued_actions: VecDeque>>, @@ -105,7 +94,6 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) { local_peer_id, from_transport, directly_connected_peers: Default::default(), - reservation_addresses: Default::default(), queued_actions: Default::default(), pending_handler_commands: Default::default(), }; @@ -140,12 +128,6 @@ impl Behaviour { unreachable!("`on_connection_closed` for unconnected peer.") } }; - if let Some((addr, ReservationStatus::Confirmed)) = - self.reservation_addresses.remove(&connection_id) - { - self.queued_actions - .push_back(ToSwarm::ExternalAddrExpired(addr)); - } } } } @@ -220,8 +202,19 @@ impl NetworkBehaviour for Behaviour { FromSwarm::ConnectionClosed(connection_closed) => { self.on_connection_closed(connection_closed) } + FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => { + if addr.is_relayed() { + self.queued_actions + .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone())); + } + } + FromSwarm::ExpiredListenAddr(ExpiredListenAddr { addr, .. }) => { + if addr.is_relayed() { + self.queued_actions + .push_back(ToSwarm::ExternalAddrExpired(addr.clone())); + } + } FromSwarm::DialFailure(DialFailure { connection_id, .. }) => { - self.reservation_addresses.remove(&connection_id); self.pending_handler_commands.remove(&connection_id); } _ => {} @@ -231,7 +224,7 @@ impl NetworkBehaviour for Behaviour { fn on_connection_handler_event( &mut self, event_source: PeerId, - connection: ConnectionId, + _: ConnectionId, handler_event: THandlerOutEvent, ) { let handler_event = match handler_event { @@ -243,17 +236,6 @@ impl NetworkBehaviour for Behaviour { let event = match handler_event { handler::Event::ReservationReqAccepted { renewal, limit } => { - let (addr, status) = self - .reservation_addresses - .get_mut(&connection) - .expect("Relay connection exist"); - - if !renewal && *status == ReservationStatus::Pending { - *status = ReservationStatus::Confirmed; - self.queued_actions - .push_back(ToSwarm::ExternalAddrConfirmed(addr.clone())); - } - Event::ReservationReqAccepted { relay_peer_id: event_source, renewal, @@ -294,24 +276,11 @@ impl NetworkBehaviour for Behaviour { .get(&relay_peer_id) .and_then(|cs| cs.first()) { - Some(connection_id) => { - self.reservation_addresses.insert( - *connection_id, - ( - relay_addr - .with(Protocol::P2p(relay_peer_id)) - .with(Protocol::P2pCircuit) - .with(Protocol::P2p(self.local_peer_id)), - ReservationStatus::Pending, - ), - ); - - ToSwarm::NotifyHandler { - peer_id: relay_peer_id, - handler: NotifyHandler::One(*connection_id), - event: Either::Left(handler::In::Reserve { to_listener }), - } - } + Some(connection_id) => ToSwarm::NotifyHandler { + peer_id: relay_peer_id, + handler: NotifyHandler::One(*connection_id), + event: Either::Left(handler::In::Reserve { to_listener }), + }, None => { let opts = DialOpts::peer_id(relay_peer_id) .addresses(vec![relay_addr.clone()]) @@ -319,17 +288,6 @@ impl NetworkBehaviour for Behaviour { .build(); let relayed_connection_id = opts.connection_id(); - self.reservation_addresses.insert( - relayed_connection_id, - ( - relay_addr - .with(Protocol::P2p(relay_peer_id)) - .with(Protocol::P2pCircuit) - .with(Protocol::P2p(self.local_peer_id)), - ReservationStatus::Pending, - ), - ); - self.pending_handler_commands .insert(relayed_connection_id, handler::In::Reserve { to_listener }); ToSwarm::Dial { opts } diff --git a/protocols/relay/src/priv_client/transport.rs b/protocols/relay/src/priv_client/transport.rs index ec1e8ca5fb8..7fd19a211d4 100644 --- a/protocols/relay/src/priv_client/transport.rs +++ b/protocols/relay/src/priv_client/transport.rs @@ -33,7 +33,7 @@ use futures::stream::{Stream, StreamExt}; use libp2p_core::multiaddr::{Multiaddr, Protocol}; use libp2p_core::transport::{DialOpts, ListenerId, TransportError, TransportEvent}; use libp2p_identity::PeerId; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use thiserror::Error; @@ -154,6 +154,7 @@ impl libp2p_core::Transport for Transport { from_behaviour, is_closed: false, waker: None, + listened_addrs: Default::default(), }; self.listeners.push(listener); Ok(()) @@ -311,6 +312,8 @@ pub(crate) struct Listener { /// the sender side of the `from_behaviour` channel is dropped. is_closed: bool, waker: Option, + /// Addresses listened through relay + listened_addrs: HashSet, } impl Listener { @@ -368,14 +371,45 @@ impl Stream for Listener { self.queued_events.is_empty(), "Assert empty due to previous `pop_front` attempt." ); - // Returned as [`ListenerEvent::NewAddress`] in next iteration of loop. - self.queued_events = addrs + + let reserved_addresses = addrs .into_iter() - .map(|listen_addr| TransportEvent::NewAddress { - listener_id: self.listener_id, - listen_addr, + .map(|mut addr| { + // Every transport (tcp / quic / etc) gives addresses without the last + // p2p part, so pop it if present. + if matches!(addr.iter().last(), Some(Protocol::P2p(_))) { + addr.pop(); + } + addr }) - .collect(); + .collect::>(); + + let expired_addresses = self + .listened_addrs + .difference(&reserved_addresses) + .map(Clone::clone) + .collect::>(); + let new_addresses = reserved_addresses + .difference(&self.listened_addrs) + .map(Clone::clone) + .collect::>(); + let listener_id = self.listener_id; + + for listen_addr in expired_addresses { + self.listened_addrs.remove(&listen_addr); + self.queued_events + .push_back(TransportEvent::AddressExpired { + listener_id, + listen_addr, + }); + } + for listen_addr in new_addresses { + self.listened_addrs.insert(listen_addr.clone()); + self.queued_events.push_back(TransportEvent::NewAddress { + listener_id, + listen_addr, + }); + } } ToListenerMsg::IncomingRelayedConnection { stream, diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 2b28d5a50cd..666be7dcecb 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -59,7 +59,6 @@ fn reservation() { .with(Protocol::P2p(relay_peer_id)) .with(Protocol::P2pCircuit); let mut client = build_client(); - let client_peer_id = *client.local_peer_id(); client.listen_on(client_addr.clone()).unwrap(); @@ -69,7 +68,7 @@ fn reservation() { // Wait for initial reservation. pool.run_until(wait_for_reservation( &mut client, - client_addr.clone().with(Protocol::P2p(client_peer_id)), + client_addr.clone(), relay_peer_id, false, // No renewal. )); @@ -77,7 +76,7 @@ fn reservation() { // Wait for renewal. pool.run_until(wait_for_reservation( &mut client, - client_addr.with(Protocol::P2p(client_peer_id)), + client_addr, relay_peer_id, true, // Renewal. )); @@ -99,11 +98,9 @@ fn new_reservation_to_same_relay_replaces_old() { spawn_swarm_on_pool(&pool, relay); let mut client = build_client(); - let client_peer_id = *client.local_peer_id(); let client_addr = relay_addr .with(Protocol::P2p(relay_peer_id)) .with(Protocol::P2pCircuit); - let client_addr_with_peer_id = client_addr.clone().with(Protocol::P2p(client_peer_id)); let old_listener = client.listen_on(client_addr.clone()).unwrap(); @@ -113,7 +110,7 @@ fn new_reservation_to_same_relay_replaces_old() { // Wait for first (old) reservation. pool.run_until(wait_for_reservation( &mut client, - client_addr_with_peer_id.clone(), + client_addr.clone(), relay_peer_id, false, // No renewal. )); @@ -123,12 +120,17 @@ fn new_reservation_to_same_relay_replaces_old() { // Wait for // - listener of old reservation to close + // - old external addr to expire // - new reservation to be accepted // - new listener address to be reported + // - new external addr to be reported pool.run_until(async { let mut old_listener_closed = false; + let mut old_external_addr_expired = false; let mut new_reservation_accepted = false; - let mut new_listener_address_reported = false; + let mut new_listen_address_reported = false; + let mut new_external_addr_confirmed = false; + loop { match client.select_next_some().await { SwarmEvent::ListenerClosed { @@ -136,13 +138,9 @@ fn new_reservation_to_same_relay_replaces_old() { listener_id, .. } => { - assert_eq!(addresses, vec![client_addr_with_peer_id.clone()]); + assert_eq!(addresses, vec![client_addr.clone()]); assert_eq!(listener_id, old_listener); - old_listener_closed = true; - if new_reservation_accepted && new_listener_address_reported { - break; - } } SwarmEvent::Behaviour(ClientEvent::Relay( relay::client::Event::ReservationReqAccepted { @@ -151,32 +149,35 @@ fn new_reservation_to_same_relay_replaces_old() { }, )) => { assert_eq!(relay_peer_id, peer_id); - new_reservation_accepted = true; - if old_listener_closed && new_listener_address_reported { - break; - } } SwarmEvent::NewListenAddr { address, listener_id, } => { - assert_eq!(address, client_addr_with_peer_id); + assert_eq!(address, client_addr); assert_eq!(listener_id, new_listener); - - new_listener_address_reported = true; - if old_listener_closed && new_reservation_accepted { - break; - } + new_listen_address_reported = true; } SwarmEvent::ExternalAddrConfirmed { address } => { - assert_eq!( - address, - client_addr.clone().with(Protocol::P2p(client_peer_id)) - ); + assert_eq!(address, client_addr); + new_external_addr_confirmed = true; + } + SwarmEvent::ExternalAddrExpired { address } => { + assert_eq!(address, client_addr); + old_external_addr_expired = true; } SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} e => panic!("{e:?}"), + }; + + if old_listener_closed + && old_external_addr_expired + && new_reservation_accepted + && new_listen_address_reported + && new_external_addr_confirmed + { + break; } } }); @@ -201,8 +202,7 @@ fn connect() { let dst_peer_id = *dst.local_peer_id(); let dst_addr = relay_addr .with(Protocol::P2p(relay_peer_id)) - .with(Protocol::P2pCircuit) - .with(Protocol::P2p(dst_peer_id)); + .with(Protocol::P2pCircuit); dst.listen_on(dst_addr.clone()).unwrap(); @@ -218,7 +218,7 @@ fn connect() { let mut src = build_client(); let src_peer_id = *src.local_peer_id(); - src.dial(dst_addr).unwrap(); + src.dial(dst_addr.with(Protocol::P2p(dst_peer_id))).unwrap(); pool.run_until(futures::future::join( connection_established_to(&mut src, relay_peer_id, dst_peer_id), @@ -418,7 +418,6 @@ fn reuse_connection() { let mut client = build_client_with_config( Config::with_async_std_executor().with_idle_connection_timeout(Duration::from_secs(1)), ); - let client_peer_id = *client.local_peer_id(); client.dial(relay_addr).unwrap(); assert!(pool.run_until(wait_for_dial(&mut client, relay_peer_id))); @@ -427,7 +426,7 @@ fn reuse_connection() { pool.run_until(wait_for_reservation( &mut client, - client_addr.with(Protocol::P2p(client_peer_id)), + client_addr, relay_peer_id, false, // No renewal. )); @@ -524,11 +523,13 @@ async fn wait_for_reservation( ) { let mut new_listen_addr = false; let mut reservation_req_accepted = false; + let mut external_addr_confirmed = false; loop { match client.select_next_some().await { SwarmEvent::ExternalAddrConfirmed { address } if !is_renewal => { assert_eq!(address, client_addr); + external_addr_confirmed = true; } SwarmEvent::Behaviour(ClientEvent::Relay( relay::client::Event::ReservationReqAccepted { @@ -538,19 +539,21 @@ async fn wait_for_reservation( }, )) if relay_peer_id == peer_id && renewal == is_renewal => { reservation_req_accepted = true; - if new_listen_addr { - break; - } } - SwarmEvent::NewListenAddr { address, .. } if address == client_addr => { + SwarmEvent::NewListenAddr { address, .. } if !is_renewal => { + assert_eq!(address, client_addr); new_listen_addr = true; - if reservation_req_accepted { - break; - } } SwarmEvent::Behaviour(ClientEvent::Ping(_)) => {} e => panic!("{e:?}"), } + + if reservation_req_accepted + && (external_addr_confirmed || is_renewal) + && (new_listen_addr || is_renewal) + { + break; + } } } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 12280e99f07..149e41260d4 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -120,6 +120,7 @@ pub use handler::{ ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, OneShotHandler, OneShotHandlerConfig, StreamUpgradeError, SubstreamProtocol, }; +use libp2p_core::multiaddr::Protocol; #[cfg(feature = "macros")] pub use libp2p_swarm_derive::NetworkBehaviour; pub use listen_opts::ListenOpts; @@ -509,10 +510,25 @@ where } } - let mut unique_addresses = HashSet::new(); + let mut unique_addresses: HashSet<_> = HashSet::new(); addresses_from_opts.retain(|addr| { - !self.listened_addrs.values().flatten().any(|a| a == addr) - && unique_addresses.insert(addr.clone()) + if !unique_addresses.insert(addr.clone()) { + // Address already added, don't dial it twice. + false + } else { + // Check the address against the local peer listen addresses + // to prevent the local peer of dialing himself. + + if addr.iter().any(|p| p == Protocol::P2pCircuit) { + // Address is relayed. It's ok to dial a peer that listens + // on the same relay that the local peer does. + true + } else { + // Address is not relayed. Prevent dial on any addresses + // the local peer listens on. + !self.listened_addrs.values().flatten().any(|a| a == addr) + } + } }); if addresses_from_opts.is_empty() {