Skip to content

Commit

Permalink
protocols/autonat/behaviour: smaller fixes, docs
Browse files Browse the repository at this point in the history
  • Loading branch information
elenaf9 committed Oct 3, 2021
1 parent 1cc2fe1 commit 8650b97
Showing 1 changed file with 58 additions and 51 deletions.
109 changes: 58 additions & 51 deletions protocols/autonat/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ use std::{

type FiniteAddrScore = u32;

/// Config for the [`Behaviour`].
pub struct Config {
// Timeout for requests.
timeout: Duration,
}

Expand All @@ -53,18 +55,26 @@ impl Default for Config {
}

impl Config {
/// Set the timeout for dial-requests.
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
}

/// Network Behaviour for AutoNAT.
pub struct Behaviour {
// Inner protocol for sending requests and receiving the response.
inner: RequestResponse<AutoNatCodec>,
local_addresses: HashMap<Multiaddr, FiniteAddrScore>,
pending_inbound: HashMap<PeerId, ResponseChannel<DialResponse>>,
pending_outbound: HashSet<PeerId>,
send_request: VecDeque<PeerId>,
// Local listening addresses with a score indicating their reachability.
// The score increases each time a remote peer successfully dials this address.
addresses: HashMap<Multiaddr, FiniteAddrScore>,
// Ongoing inbound requests, where no response has been sent back to the remote yet.
ongoing_inbound: HashMap<PeerId, ResponseChannel<DialResponse>>,
// Ongoing outbound dial-requests, where no response has been received from the remote yet.
ongoing_outbound: HashSet<PeerId>,
// Recently connected peers to which we want to send a dial-request.
pending_requests: VecDeque<PeerId>,
}

impl Default for Behaviour {
Expand All @@ -81,18 +91,27 @@ impl Behaviour {
let inner = RequestResponse::new(AutoNatCodec, protocols, cfg);
Self {
inner,
local_addresses: HashMap::default(),
pending_inbound: HashMap::default(),
pending_outbound: HashSet::default(),
send_request: VecDeque::default(),
addresses: HashMap::default(),
ongoing_inbound: HashMap::default(),
ongoing_outbound: HashSet::default(),
pending_requests: VecDeque::default(),
}
}

/// Add a new address to the address list that is send to remote peers in a dial-request.
pub fn add_local_address(&mut self, address: Multiaddr) {
if self.local_addresses.get(&address).is_none() {
self.local_addresses.insert(address, 1);
if self.addresses.get(&address).is_none() {
self.addresses.insert(address, 1);
}
}

/// Get the list of local addresses with their current score.
///
/// The score of an address increases each time a remote peer successfully dialed us via this address.
/// Therefore higher scores indicate a higher reachability.
pub fn address_list(&self) -> impl Iterator<Item = (&Multiaddr, &FiniteAddrScore)> {
self.addresses.iter()
}
}

impl NetworkBehaviour for Behaviour {
Expand All @@ -112,7 +131,8 @@ impl NetworkBehaviour for Behaviour {
}

fn inject_disconnected(&mut self, peer: &PeerId) {
self.inner.inject_disconnected(peer)
self.inner.inject_disconnected(peer);
self.ongoing_inbound.remove(peer);
}

fn inject_connection_established(
Expand All @@ -123,11 +143,16 @@ impl NetworkBehaviour for Behaviour {
) {
self.inner
.inject_connection_established(peer, conn, endpoint);
if !self.pending_outbound.contains(peer) {
self.send_request.push_back(*peer);

// Initiate a new dial request if there is none pending.
if !self.ongoing_outbound.contains(peer) {
self.pending_requests.push_back(*peer);
}

if let ConnectedPoint::Dialer { address } = endpoint {
if let Some(channel) = self.pending_inbound.remove(peer) {
if let Some(channel) = self.ongoing_inbound.remove(peer) {
// Successfully dialed one of the addresses from the remote peer.
// TODO: Check if the address was part of the list received in the dial-request.
let _ = self
.inner
.send_response(channel, DialResponse::Ok(address.clone()));
Expand All @@ -144,9 +169,6 @@ impl NetworkBehaviour for Behaviour {
) {
self.inner
.inject_connection_closed(peer, conn, endpoint, handler);
// Channel can be dropped, as the underlying substream already closed.
self.pending_inbound.remove(peer);
self.send_request.retain(|p| p != peer);
}

fn inject_address_change(
Expand All @@ -157,24 +179,6 @@ impl NetworkBehaviour for Behaviour {
new: &ConnectedPoint,
) {
self.inner.inject_address_change(peer, conn, old, new);
if let ConnectedPoint::Listener {
local_addr: old_addr,
..
} = old
{
match new {
ConnectedPoint::Listener {
local_addr: new_addr,
..
} if old_addr != new_addr => {
self.local_addresses.remove(old_addr);
if !self.local_addresses.contains_key(new_addr) {
self.local_addresses.insert(new_addr.clone(), 1);
}
}
_ => {}
}
}
}

fn inject_event(
Expand Down Expand Up @@ -202,7 +206,8 @@ impl NetworkBehaviour for Behaviour {
error: libp2p_swarm::DialError,
) {
self.inner.inject_dial_failure(peer_id, handler, error);
if let Some(channel) = self.pending_inbound.remove(peer_id) {
if let Some(channel) = self.ongoing_inbound.remove(peer_id) {
// Failed to dial any of the addresses sent by the remote peer in their dial-request.
let _ = self
.inner
.send_response(channel, DialResponse::Err(ResponseError::DialError));
Expand All @@ -225,14 +230,14 @@ impl NetworkBehaviour for Behaviour {

fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_new_listen_addr(id, addr);
if !self.local_addresses.contains_key(addr) {
self.local_addresses.insert(addr.clone(), 0);
if !self.addresses.contains_key(addr) {
self.addresses.insert(addr.clone(), 0);
}
}

fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.inner.inject_expired_listen_addr(id, addr);
self.local_addresses.remove(addr);
self.addresses.remove(addr);
}

fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
Expand All @@ -245,18 +250,19 @@ impl NetworkBehaviour for Behaviour {

fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_new_external_addr(addr);
match self.local_addresses.get_mut(addr) {
// Add the address to the local address list.
match self.addresses.get_mut(addr) {
Some(score) if *score == 0 => *score = 1,
Some(_) => {}
None => {
self.local_addresses.insert(addr.clone(), 1);
self.addresses.insert(addr.clone(), 1);
}
}
}

fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.inner.inject_expired_external_addr(addr);
self.local_addresses.remove(addr);
self.addresses.remove(addr);
}

fn poll(
Expand All @@ -265,9 +271,8 @@ impl NetworkBehaviour for Behaviour {
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
loop {
if let Some(peer_id) = self.send_request.pop_front() {
let mut scores: Vec<(Multiaddr, FiniteAddrScore)> =
self.local_addresses.clone().into_iter().collect();
if let Some(peer_id) = self.pending_requests.pop_front() {
let mut scores: Vec<_> = self.addresses.clone().into_iter().collect();
// Sort so that the address with the highest score will be dialed first by the remote.
scores.sort_by(|(_, score_a), (_, score_b)| score_b.cmp(score_a));
let addrs = scores.into_iter().map(|(a, _)| a).collect();
Expand All @@ -283,11 +288,12 @@ impl NetworkBehaviour for Behaviour {
request: DialRequest { peer_id, addrs },
channel,
} => {
// Add all addresses to the address book.
for addr in addrs {
self.inner.add_address(&peer, addr)
}
// TODO: Handle if there is already a pending request.
self.pending_inbound.insert(peer_id, channel);
// TODO: Handle if there is already a ongoing request.
self.ongoing_inbound.insert(peer_id, channel);
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer,
handler: self.inner.new_handler(),
Expand All @@ -298,9 +304,10 @@ impl NetworkBehaviour for Behaviour {
request_id: _,
response,
} => {
self.pending_outbound.remove(&peer);
self.ongoing_outbound.remove(&peer);
if let DialResponse::Ok(address) = response {
let score = self.local_addresses.entry(address.clone()).or_insert(1);
// Increase score of the successfully dialed address.
let score = self.addresses.entry(address.clone()).or_insert(1);
*score += 1;
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr {
address,
Expand All @@ -315,12 +322,12 @@ impl NetworkBehaviour for Behaviour {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure { peer, .. },
)) => {
self.pending_outbound.remove(&peer);
self.ongoing_outbound.remove(&peer);
}
Poll::Ready(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure { peer, .. },
)) => {
self.pending_inbound.remove(&peer);
self.ongoing_inbound.remove(&peer);
}
Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler }) => {
return Poll::Ready(NetworkBehaviourAction::DialAddress { address, handler })
Expand Down

0 comments on commit 8650b97

Please sign in to comment.