From 8650b97f54488bcaf12ee4c762b3cdace60a1fba Mon Sep 17 00:00:00 2001 From: elenaf9 Date: Sun, 3 Oct 2021 22:22:23 +0200 Subject: [PATCH] protocols/autonat/behaviour: smaller fixes, docs --- protocols/autonat/src/behaviour.rs | 109 +++++++++++++++-------------- 1 file changed, 58 insertions(+), 51 deletions(-) diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 88694aeb47b..f16355535d3 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -40,7 +40,9 @@ use std::{ type FiniteAddrScore = u32; +/// Config for the [`Behaviour`]. pub struct Config { + // Timeout for requests. timeout: Duration, } @@ -53,18 +55,26 @@ impl Default for Config { } impl Config { + /// Set the timeout for dial-requests. pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } } +/// Network Behaviour for AutoNAT. pub struct Behaviour { + // Inner protocol for sending requests and receiving the response. inner: RequestResponse, - local_addresses: HashMap, - pending_inbound: HashMap>, - pending_outbound: HashSet, - send_request: VecDeque, + // Local listening addresses with a score indicating their reachability. + // The score increases each time a remote peer successfully dials this address. + addresses: HashMap, + // Ongoing inbound requests, where no response has been sent back to the remote yet. + ongoing_inbound: HashMap>, + // Ongoing outbound dial-requests, where no response has been received from the remote yet. + ongoing_outbound: HashSet, + // Recently connected peers to which we want to send a dial-request. + pending_requests: VecDeque, } impl Default for Behaviour { @@ -81,18 +91,27 @@ impl Behaviour { let inner = RequestResponse::new(AutoNatCodec, protocols, cfg); Self { inner, - local_addresses: HashMap::default(), - pending_inbound: HashMap::default(), - pending_outbound: HashSet::default(), - send_request: VecDeque::default(), + addresses: HashMap::default(), + ongoing_inbound: HashMap::default(), + ongoing_outbound: HashSet::default(), + pending_requests: VecDeque::default(), } } + /// Add a new address to the address list that is send to remote peers in a dial-request. pub fn add_local_address(&mut self, address: Multiaddr) { - if self.local_addresses.get(&address).is_none() { - self.local_addresses.insert(address, 1); + if self.addresses.get(&address).is_none() { + self.addresses.insert(address, 1); } } + + /// Get the list of local addresses with their current score. + /// + /// The score of an address increases each time a remote peer successfully dialed us via this address. + /// Therefore higher scores indicate a higher reachability. + pub fn address_list(&self) -> impl Iterator { + self.addresses.iter() + } } impl NetworkBehaviour for Behaviour { @@ -112,7 +131,8 @@ impl NetworkBehaviour for Behaviour { } fn inject_disconnected(&mut self, peer: &PeerId) { - self.inner.inject_disconnected(peer) + self.inner.inject_disconnected(peer); + self.ongoing_inbound.remove(peer); } fn inject_connection_established( @@ -123,11 +143,16 @@ impl NetworkBehaviour for Behaviour { ) { self.inner .inject_connection_established(peer, conn, endpoint); - if !self.pending_outbound.contains(peer) { - self.send_request.push_back(*peer); + + // Initiate a new dial request if there is none pending. + if !self.ongoing_outbound.contains(peer) { + self.pending_requests.push_back(*peer); } + if let ConnectedPoint::Dialer { address } = endpoint { - if let Some(channel) = self.pending_inbound.remove(peer) { + if let Some(channel) = self.ongoing_inbound.remove(peer) { + // Successfully dialed one of the addresses from the remote peer. + // TODO: Check if the address was part of the list received in the dial-request. let _ = self .inner .send_response(channel, DialResponse::Ok(address.clone())); @@ -144,9 +169,6 @@ impl NetworkBehaviour for Behaviour { ) { self.inner .inject_connection_closed(peer, conn, endpoint, handler); - // Channel can be dropped, as the underlying substream already closed. - self.pending_inbound.remove(peer); - self.send_request.retain(|p| p != peer); } fn inject_address_change( @@ -157,24 +179,6 @@ impl NetworkBehaviour for Behaviour { new: &ConnectedPoint, ) { self.inner.inject_address_change(peer, conn, old, new); - if let ConnectedPoint::Listener { - local_addr: old_addr, - .. - } = old - { - match new { - ConnectedPoint::Listener { - local_addr: new_addr, - .. - } if old_addr != new_addr => { - self.local_addresses.remove(old_addr); - if !self.local_addresses.contains_key(new_addr) { - self.local_addresses.insert(new_addr.clone(), 1); - } - } - _ => {} - } - } } fn inject_event( @@ -202,7 +206,8 @@ impl NetworkBehaviour for Behaviour { error: libp2p_swarm::DialError, ) { self.inner.inject_dial_failure(peer_id, handler, error); - if let Some(channel) = self.pending_inbound.remove(peer_id) { + if let Some(channel) = self.ongoing_inbound.remove(peer_id) { + // Failed to dial any of the addresses sent by the remote peer in their dial-request. let _ = self .inner .send_response(channel, DialResponse::Err(ResponseError::DialError)); @@ -225,14 +230,14 @@ impl NetworkBehaviour for Behaviour { fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { self.inner.inject_new_listen_addr(id, addr); - if !self.local_addresses.contains_key(addr) { - self.local_addresses.insert(addr.clone(), 0); + if !self.addresses.contains_key(addr) { + self.addresses.insert(addr.clone(), 0); } } fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) { self.inner.inject_expired_listen_addr(id, addr); - self.local_addresses.remove(addr); + self.addresses.remove(addr); } fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { @@ -245,18 +250,19 @@ impl NetworkBehaviour for Behaviour { fn inject_new_external_addr(&mut self, addr: &Multiaddr) { self.inner.inject_new_external_addr(addr); - match self.local_addresses.get_mut(addr) { + // Add the address to the local address list. + match self.addresses.get_mut(addr) { Some(score) if *score == 0 => *score = 1, Some(_) => {} None => { - self.local_addresses.insert(addr.clone(), 1); + self.addresses.insert(addr.clone(), 1); } } } fn inject_expired_external_addr(&mut self, addr: &Multiaddr) { self.inner.inject_expired_external_addr(addr); - self.local_addresses.remove(addr); + self.addresses.remove(addr); } fn poll( @@ -265,9 +271,8 @@ impl NetworkBehaviour for Behaviour { params: &mut impl PollParameters, ) -> Poll> { loop { - if let Some(peer_id) = self.send_request.pop_front() { - let mut scores: Vec<(Multiaddr, FiniteAddrScore)> = - self.local_addresses.clone().into_iter().collect(); + if let Some(peer_id) = self.pending_requests.pop_front() { + let mut scores: Vec<_> = self.addresses.clone().into_iter().collect(); // Sort so that the address with the highest score will be dialed first by the remote. scores.sort_by(|(_, score_a), (_, score_b)| score_b.cmp(score_a)); let addrs = scores.into_iter().map(|(a, _)| a).collect(); @@ -283,11 +288,12 @@ impl NetworkBehaviour for Behaviour { request: DialRequest { peer_id, addrs }, channel, } => { + // Add all addresses to the address book. for addr in addrs { self.inner.add_address(&peer, addr) } - // TODO: Handle if there is already a pending request. - self.pending_inbound.insert(peer_id, channel); + // TODO: Handle if there is already a ongoing request. + self.ongoing_inbound.insert(peer_id, channel); return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id: peer, handler: self.inner.new_handler(), @@ -298,9 +304,10 @@ impl NetworkBehaviour for Behaviour { request_id: _, response, } => { - self.pending_outbound.remove(&peer); + self.ongoing_outbound.remove(&peer); if let DialResponse::Ok(address) = response { - let score = self.local_addresses.entry(address.clone()).or_insert(1); + // Increase score of the successfully dialed address. + let score = self.addresses.entry(address.clone()).or_insert(1); *score += 1; return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, @@ -315,12 +322,12 @@ impl NetworkBehaviour for Behaviour { Poll::Ready(NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::OutboundFailure { peer, .. }, )) => { - self.pending_outbound.remove(&peer); + self.ongoing_outbound.remove(&peer); } Poll::Ready(NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::InboundFailure { peer, .. }, )) => { - self.pending_inbound.remove(&peer); + self.ongoing_inbound.remove(&peer); } Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => { return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })