Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: Extend NetworkBehaviour callbacks. #2011

Merged
merged 5 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ where
///
/// The translation is transport-specific. See [`Transport::address_translation`].
pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a
-> Vec<Multiaddr>
romanb marked this conversation as resolved.
Show resolved Hide resolved
where
TMuxer: 'a,
THandler: 'a,
Expand All @@ -201,7 +201,7 @@ where
addrs.sort_unstable();
addrs.dedup();

addrs.into_iter()
addrs
}

/// Returns the peer id of the local node.
Expand Down
6 changes: 3 additions & 3 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use libp2p_core::{
Multiaddr,
PeerId,
PublicKey,
connection::ConnectionId,
connection::{ConnectionId, ListenerId},
upgrade::UpgradeError
};
use libp2p_swarm::{
Expand Down Expand Up @@ -233,13 +233,13 @@ impl NetworkBehaviour for Identify {
self.pending_push.remove(peer_id);
}

fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) {
fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys());
}
}

fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) {
fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys());
}
Expand Down
6 changes: 3 additions & 3 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::protocol::{KademliaProtocolConfig, KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::{ConnectionId, ListenerId}};
use libp2p_swarm::{
DialPeerCondition,
NetworkBehaviour,
Expand Down Expand Up @@ -1888,11 +1888,11 @@ where
};
}

fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_new_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
self.local_addrs.insert(addr.clone());
}

fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
fn inject_expired_listen_addr(&mut self, _id: ListenerId, addr: &Multiaddr) {
self.local_addrs.remove(addr);
}

Expand Down
48 changes: 42 additions & 6 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};

// Build the list of statements to put in the body of `inject_new_listener()`.
let inject_new_listener_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_new_listener(id); },
None => quote!{ self.#field_n.inject_new_listener(id); },
})
})
};

// Build the list of statements to put in the body of `inject_new_listen_addr()`.
let inject_new_listen_addr_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
Expand All @@ -252,8 +266,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(addr); },
None => quote!{ self.#field_n.inject_new_listen_addr(addr); },
Some(ref i) => quote!{ self.#i.inject_new_listen_addr(id, addr); },
None => quote!{ self.#field_n.inject_new_listen_addr(id, addr); },
})
})
};
Expand All @@ -266,8 +280,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(addr); },
None => quote!{ self.#field_n.inject_expired_listen_addr(addr); },
Some(ref i) => quote!{ self.#i.inject_expired_listen_addr(id, addr); },
None => quote!{ self.#field_n.inject_expired_listen_addr(id, addr); },
})
})
};
Expand All @@ -286,6 +300,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
})
};

// Build the list of statements to put in the body of `inject_expired_external_addr()`.
let inject_expired_external_addr_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}

Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_expired_external_addr(addr); },
None => quote!{ self.#field_n.inject_expired_external_addr(addr); },
})
})
};

// Build the list of statements to put in the body of `inject_listener_error()`.
let inject_listener_error_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
Expand Down Expand Up @@ -504,18 +532,26 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_dial_failure_stmts);*
}

fn inject_new_listen_addr(&mut self, addr: &#multiaddr) {
fn inject_new_listener(&mut self, id: #listener_id) {
#(#inject_new_listener_stmts);*
}

fn inject_new_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
#(#inject_new_listen_addr_stmts);*
}

fn inject_expired_listen_addr(&mut self, addr: &#multiaddr) {
fn inject_expired_listen_addr(&mut self, id: #listener_id, addr: &#multiaddr) {
#(#inject_expired_listen_addr_stmts);*
}

fn inject_new_external_addr(&mut self, addr: &#multiaddr) {
#(#inject_new_external_addr_stmts);*
}

fn inject_expired_external_addr(&mut self, addr: &#multiaddr) {
#(#inject_expired_external_addr_stmts);*
}

fn inject_listener_error(&mut self, id: #listener_id, err: &(dyn std::error::Error + 'static)) {
#(#inject_listener_error_stmts);*
}
Expand Down
22 changes: 15 additions & 7 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,17 @@ pub trait NetworkBehaviour: Send + 'static {
fn inject_dial_failure(&mut self, _peer_id: &PeerId) {
}

/// Indicates to the behaviour that we have started listening on a new multiaddr.
fn inject_new_listen_addr(&mut self, _addr: &Multiaddr) {
/// Indicates to the behaviour that a new listener was created.
fn inject_new_listener(&mut self, _id: ListenerId) {
}

/// Indicates to the behaviour that a new multiaddr we were listening on has expired,
/// which means that we are no longer listening in it.
fn inject_expired_listen_addr(&mut self, _addr: &Multiaddr) {
/// Indicates to the behaviour that we have started listening on a new multiaddr.
fn inject_new_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
}

/// Indicates to the behaviour that we have discovered a new external address for us.
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
/// Indicates to the behaviour that a multiaddr we were listening on has expired,
/// which means that we are no longer listening in it.
fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
}

/// A listener experienced an error.
Expand All @@ -168,6 +168,14 @@ pub trait NetworkBehaviour: Send + 'static {
fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {
}

/// Indicates to the behaviour that we have discovered a new external address for us.
fn inject_new_external_addr(&mut self, _addr: &Multiaddr) {
}

/// Indicates to the behaviour that an external address was removed.
fn inject_expired_external_addr(&mut self, _addr: &Multiaddr) {
}

/// Polls for things that swarm should do.
///
/// This API mimics the API of the `Stream` trait. The method may register the current task in
Expand Down
35 changes: 25 additions & 10 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
///
/// Returns an error if the address is not supported.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
self.network.listen_on(addr)
let id = self.network.listen_on(addr)?;
self.behaviour.inject_new_listener(id);
Ok(id)
}

/// Remove some listener.
Expand Down Expand Up @@ -412,7 +414,18 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
/// through this method.
pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
self.external_addrs.add(a, s)
let result = self.external_addrs.add(a.clone(), s);
let expired = match &result {
AddAddressResult::Inserted { expired } => {
self.behaviour.inject_new_external_addr(&a);
expired
}
AddAddressResult::Updated { expired } => expired,
};
for a in expired {
self.behaviour.inject_expired_external_addr(&a.addr);
}
result
}

/// Removes an external address of the local node, regardless of
Expand All @@ -422,7 +435,12 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Returns `true` if the address existed and was removed, `false`
/// otherwise.
pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
self.external_addrs.remove(addr)
if self.external_addrs.remove(addr) {
self.behaviour.inject_expired_external_addr(addr);
true
} else {
false
}
}

/// Bans a peer by its peer ID.
Expand Down Expand Up @@ -565,19 +583,19 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
if !this.listened_addrs.contains(&listen_addr) {
this.listened_addrs.push(listen_addr.clone())
}
this.behaviour.inject_new_listen_addr(&listen_addr);
this.behaviour.inject_new_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::NewListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr }) => {
log::debug!("Listener {:?}; Expired address {:?}.", listener_id, listen_addr);
this.listened_addrs.retain(|a| a != &listen_addr);
this.behaviour.inject_expired_listen_addr(&listen_addr);
this.behaviour.inject_expired_listen_addr(listener_id, &listen_addr);
return Poll::Ready(SwarmEvent::ExpiredListenAddr(listen_addr));
}
Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason }) => {
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
for addr in addresses.iter() {
this.behaviour.inject_expired_listen_addr(addr);
this.behaviour.inject_expired_listen_addr(listener_id, addr);
}
this.behaviour.inject_listener_closed(listener_id, match &reason {
Ok(()) => Ok(()),
Expand Down Expand Up @@ -732,10 +750,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
},
Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
for addr in this.network.address_translation(&address) {
if this.external_addrs.iter().all(|a| a.addr != addr) {
this.behaviour.inject_new_external_addr(&addr);
}
this.external_addrs.add(addr, score);
this.add_external_address(addr, score);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
},
}
Expand Down
13 changes: 8 additions & 5 deletions swarm/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ impl Default for Addresses {
/// The result of adding an address to an ordered list of
/// addresses with associated scores.
pub enum AddAddressResult {
Inserted,
Updated,
Inserted { expired: SmallVec<[AddressRecord; 8]> },
Updated { expired: SmallVec<[AddressRecord; 8]> },
}

impl Addresses {
Expand Down Expand Up @@ -206,8 +206,11 @@ impl Addresses {
}

// Remove addresses that have a score of 0.
let mut expired = SmallVec::new();
while self.registry.last().map(|e| e.score.is_zero()).unwrap_or(false) {
self.registry.pop();
if let Some(addr) = self.registry.pop() {
expired.push(addr);
}
}

// If the address score is finite, remember this report.
Expand All @@ -220,13 +223,13 @@ impl Addresses {
if r.addr == addr {
r.score = r.score + score;
isort(&mut self.registry);
return AddAddressResult::Updated
return AddAddressResult::Updated { expired }
}
}

// It is a new record.
self.registry.push(AddressRecord::new(addr, score));
AddAddressResult::Inserted
AddAddressResult::Inserted { expired }
}

/// Explicitly remove an address from the collection.
Expand Down
30 changes: 22 additions & 8 deletions swarm/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,11 @@ where
pub inject_event: Vec<(PeerId, ConnectionId, <<TInner::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent)>,
pub inject_addr_reach_failure: Vec<(Option<PeerId>, Multiaddr)>,
pub inject_dial_failure: Vec<PeerId>,
pub inject_new_listen_addr: Vec<Multiaddr>,
pub inject_new_listener: Vec<ListenerId>,
pub inject_new_listen_addr: Vec<(ListenerId, Multiaddr)>,
pub inject_new_external_addr: Vec<Multiaddr>,
pub inject_expired_listen_addr: Vec<Multiaddr>,
pub inject_expired_listen_addr: Vec<(ListenerId, Multiaddr)>,
pub inject_expired_external_addr: Vec<Multiaddr>,
pub inject_listener_error: Vec<ListenerId>,
pub inject_listener_closed: Vec<(ListenerId, bool)>,
pub poll: usize,
Expand All @@ -138,9 +140,11 @@ where
inject_event: Vec::new(),
inject_addr_reach_failure: Vec::new(),
inject_dial_failure: Vec::new(),
inject_new_listener: Vec::new(),
inject_new_listen_addr: Vec::new(),
inject_new_external_addr: Vec::new(),
inject_expired_listen_addr: Vec::new(),
inject_expired_external_addr: Vec::new(),
inject_listener_error: Vec::new(),
inject_listener_closed: Vec::new(),
poll: 0,
Expand Down Expand Up @@ -217,21 +221,31 @@ where
self.inner.inject_dial_failure(p);
}

fn inject_new_listen_addr(&mut self, a: &Multiaddr) {
self.inject_new_listen_addr.push(a.clone());
self.inner.inject_new_listen_addr(a);
fn inject_new_listener(&mut self, id: ListenerId) {
self.inject_new_listener.push(id);
self.inner.inject_new_listener(id);
}

fn inject_expired_listen_addr(&mut self, a: &Multiaddr) {
self.inject_expired_listen_addr.push(a.clone());
self.inner.inject_expired_listen_addr(a);
fn inject_new_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
self.inject_new_listen_addr.push((id, a.clone()));
self.inner.inject_new_listen_addr(id, a);
}

fn inject_expired_listen_addr(&mut self, id: ListenerId, a: &Multiaddr) {
self.inject_expired_listen_addr.push((id, a.clone()));
self.inner.inject_expired_listen_addr(id, a);
}

fn inject_new_external_addr(&mut self, a: &Multiaddr) {
self.inject_new_external_addr.push(a.clone());
self.inner.inject_new_external_addr(a);
}

fn inject_expired_external_addr(&mut self, a: &Multiaddr) {
self.inject_expired_external_addr.push(a.clone());
self.inner.inject_expired_external_addr(a);
}

fn inject_listener_error(&mut self, l: ListenerId, e: &(dyn std::error::Error + 'static)) {
self.inject_listener_error.push(l.clone());
self.inner.inject_listener_error(l, e);
Expand Down
Loading