Skip to content

Commit

Permalink
simplify IfWatcher integration
Browse files Browse the repository at this point in the history
port of libp2p#2813
  • Loading branch information
melekes committed Sep 5, 2022
1 parent a2ae49f commit 31ae422
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 183 deletions.
2 changes: 1 addition & 1 deletion transports/webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures = "0.3"
futures-lite = "1"
futures-timer = "3"
hex = "0.4"
if-watch = "0.2"
if-watch = "2.0"
libp2p-core = { version = "0.35.0", path = "../../core", default-features = false }
libp2p-noise = { version = "0.38.0", path = "../../transports/noise" }
log = "0.4"
Expand Down
120 changes: 0 additions & 120 deletions transports/webrtc/src/in_addr.rs

This file was deleted.

1 change: 0 additions & 1 deletion transports/webrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ pub mod error;
pub mod transport;

mod fingerprint;
mod in_addr;
mod req_res_chan;
mod sdp;
mod udp_mux;
Expand Down
111 changes: 51 additions & 60 deletions transports/webrtc/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use futures::{
stream::Stream,
TryFutureExt,
};
use if_watch::IfEvent;
use if_watch::{IfEvent, IfWatcher};
use libp2p_core::{
identity,
multiaddr::{Multiaddr, Protocol},
Expand All @@ -54,7 +54,6 @@ use crate::{
connection::PollDataChannel,
error::Error,
fingerprint::Fingerprint,
in_addr::InAddr,
udp_mux::{UDPMuxEvent, UDPMuxNewAddr},
webrtc_connection::WebRTCConnection,
};
Expand Down Expand Up @@ -107,13 +106,16 @@ impl WebRTCTransport {

let udp_mux = UDPMuxNewAddr::new(socket);

Ok(WebRTCListenStream::new(
return Ok(WebRTCListenStream::new(
listener_id,
listen_addr,
self.config.clone(),
udp_mux,
self.id_keys.clone(),
))
IfWatcher::new()
.map_err(Error::IoError)
.map_err(TransportError::Other)?,
));
}
}

Expand Down Expand Up @@ -247,13 +249,6 @@ pub struct WebRTCListenStream {
/// when listening on all interfaces for IPv4 respectively IPv6 connections.
listen_addr: SocketAddr,

/// The IP addresses of network interfaces on which the listening socket
/// is accepting connections.
///
/// If the listen socket listens on all interfaces, these may change over
/// time as interfaces become available or unavailable.
in_addr: InAddr,

/// The config which holds this peer's certificate(s).
config: WebRTCConfiguration,

Expand All @@ -268,6 +263,13 @@ pub struct WebRTCListenStream {
/// Optionally contains a [`TransportEvent::ListenerClosed`] that should be
/// reported before the listener's stream is terminated.
report_closed: Option<Option<<Self as Stream>::Item>>,

/// Watcher for network interface changes.
/// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
/// become or stop being available.
///
/// `None` if the socket is only listening on a single interface.
if_watcher: IfWatcher,
}

impl WebRTCListenStream {
Expand All @@ -278,17 +280,16 @@ impl WebRTCListenStream {
config: WebRTCConfiguration,
udp_mux: UDPMuxNewAddr,
id_keys: identity::Keypair,
if_watcher: IfWatcher,
) -> Self {
let in_addr = InAddr::new(listen_addr.ip());

WebRTCListenStream {
listener_id,
listen_addr,
in_addr,
config,
udp_mux,
id_keys,
report_closed: None,
if_watcher,
}
}

Expand All @@ -309,58 +310,48 @@ impl WebRTCListenStream {
}
}

/// Poll for a next If Event.
fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
loop {
let mut item = ready!(self.in_addr.poll_next_unpin(cx));
if let Some(item) = item.take() {
// Consume all events for up/down interface changes.
match item {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4()
|| self.listen_addr.is_ipv6() == ip.is_ipv6()
{
let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
debug!("New listen address: {}", ma);
return Poll::Ready(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr: ma,
});
} else {
continue;
}
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4()
|| self.listen_addr.is_ipv6() == ip.is_ipv6()
{
let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
debug!("Expired listen address: {}", ma);
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: self.listener_id,
listen_addr: ma,
});
} else {
continue;
}
fn poll_if_watcher(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
while let Poll::Ready(event) = self.if_watcher.poll_if_event(cx) {
match event {
Ok(IfEvent::Up(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4()
|| self.listen_addr.is_ipv6() == ip.is_ipv6()
{
let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
log::debug!("New listen address: {}", ma);
return Poll::Ready(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr: ma,
});
}
Err(err) => {
debug! {
"Failure polling interfaces: {:?}.",
err
};
return Poll::Ready(TransportEvent::ListenerError {
}
Ok(IfEvent::Down(inet)) => {
let ip = inet.addr();
if self.listen_addr.is_ipv4() == ip.is_ipv4()
|| self.listen_addr.is_ipv6() == ip.is_ipv6()
{
let socket_addr = SocketAddr::new(ip, self.listen_addr.port());
let ma = socketaddr_to_multiaddr(&socket_addr);
log::debug!("Expired listen address: {}", ma);
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: self.listener_id,
error: err.into(),
listen_addr: ma,
});
}
}
Err(err) => {
log::debug!("Error when polling network interfaces {}", err);
return Poll::Ready(TransportEvent::ListenerError {
listener_id: self.listener_id,
error: err.into(),
});
}
}
}

Poll::Pending
}
}

Expand All @@ -376,7 +367,7 @@ impl Stream for WebRTCListenStream {
return Poll::Ready(closed.take());
}

if let Poll::Ready(event) = self.poll_if_addr(cx) {
if let Poll::Ready(event) = self.poll_if_watcher(cx) {
return Poll::Ready(Some(event));
}

Expand Down
20 changes: 19 additions & 1 deletion transports/webrtc/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,19 @@ async fn smoke() -> Result<()> {
e => panic!("{:?}", e),
};

// skip other interface addresses
while let Some(_) = a.next().now_or_never() {}

let addr = addr.with(Protocol::Certhash(fingerprint2multihash(&a_fingerprint)));

let _ = match b.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};

// skip other interface addresses
while let Some(_) = b.next().now_or_never() {}

let mut data = vec![0; 4096];
rng.fill_bytes(&mut data);

Expand Down Expand Up @@ -312,13 +318,19 @@ async fn dial_failure() -> Result<()> {
e => panic!("{:?}", e),
};

// skip other interface addresses
while let Some(_) = a.next().now_or_never() {}

let addr = addr.with(Protocol::Certhash(fingerprint2multihash(&a_fingerprint)));

let _ = match b.next().await {
Some(SwarmEvent::NewListenAddr { address, .. }) => address,
e => panic!("{:?}", e),
};

// skip other interface addresses
while let Some(_) = b.next().now_or_never() {}

let a_peer_id = &Swarm::local_peer_id(&a).clone();
drop(a); // stop a swarm so b can never reach it

Expand Down Expand Up @@ -361,7 +373,7 @@ async fn concurrent_connections_and_streams() {
}

let mut pool = futures::executor::LocalPool::default();
let mut data = vec![0; 4096 * 10];
let mut data = vec![0; 4096];
rand::thread_rng().fill_bytes(&mut data);
let mut listeners = vec![];

Expand Down Expand Up @@ -416,6 +428,9 @@ async fn concurrent_connections_and_streams() {
log::debug!("listener ResponseSent");
}
Some(SwarmEvent::ConnectionClosed { .. }) => {}
Some(SwarmEvent::NewListenAddr { .. }) => {
log::debug!("listener NewListenAddr");
}
Some(e) => {
panic!("unexpected event {:?}", e);
}
Expand Down Expand Up @@ -489,6 +504,9 @@ async fn concurrent_connections_and_streams() {
Some(SwarmEvent::ConnectionClosed { .. }) => {
log::debug!("dialer ConnectionClosed");
}
Some(SwarmEvent::NewListenAddr { .. }) => {
log::debug!("dialer NewListenAddr");
}
e => {
panic!("unexpected event {:?}", e);
}
Expand Down

0 comments on commit 31ae422

Please sign in to comment.