Skip to content

Commit

Permalink
feat(port_forwarding): refactor IGD and echo service to be async
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco authored and lionel-faber committed Nov 10, 2020
1 parent 10b7c25 commit a19cd51
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 25 deletions.
41 changes: 30 additions & 11 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
/// before using a random port.
pub const DEFAULT_PORT_TO_TRY: u16 = 12000;

/// Default duration of a UPnP lease, in seconds.
pub const DEFAULT_UPNP_LEASE_DURATION_SEC: u32 = 120;

/// Message received from a peer
pub enum Message {
/// A message sent by peer on a uni-directional stream
Expand Down Expand Up @@ -68,6 +71,7 @@ pub struct QuicP2p {
bootstrap_cache: BootstrapCache,
endpoint_cfg: quinn::ServerConfig,
client_cfg: quinn::ClientConfig,
upnp_lease_duration: u32,
}

impl QuicP2p {
Expand All @@ -80,7 +84,6 @@ impl QuicP2p {
///
/// let quic_p2p = QuicP2p::new().expect("Error initializing QuicP2p");
/// ```
pub fn new() -> Result<Self> {
Self::with_config(None, Default::default(), true)
}
Expand Down Expand Up @@ -170,15 +173,18 @@ impl QuicP2p {

let client_cfg = peer_config::new_client_cfg(idle_timeout_msec, keep_alive_interval_msec);

let qp2p = Self {
let upnp_lease_duration = cfg
.upnp_lease_duration
.unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC);

Ok(Self {
local_addr: SocketAddr::new(ip, port),
allow_random_port,
bootstrap_cache,
endpoint_cfg,
client_cfg,
};

Ok(qp2p)
upnp_lease_duration,
})
}

/// Bootstrap to the network.
Expand Down Expand Up @@ -230,13 +236,15 @@ impl QuicP2p {
let client_cfg = self.client_cfg.clone();
let local_addr = self.local_addr;
let allow_random_port = self.allow_random_port;
let upnp_lease_duration = self.upnp_lease_duration;
let task_handle = tokio::spawn(async move {
new_connection_to(
&node_addr,
endpoint_cfg,
client_cfg,
local_addr,
allow_random_port,
upnp_lease_duration,
)
.await
});
Expand Down Expand Up @@ -281,6 +289,7 @@ impl QuicP2p {
self.client_cfg.clone(),
self.local_addr,
self.allow_random_port,
self.upnp_lease_duration,
)
.await
}
Expand Down Expand Up @@ -313,27 +322,40 @@ impl QuicP2p {

trace!("Bound endpoint to local address: {}", self.local_addr);

let endpoint = Endpoint::new(quinn_endpoint, quinn_incoming, self.client_cfg.clone())?;
let endpoint = Endpoint::new(
quinn_endpoint,
quinn_incoming,
self.client_cfg.clone(),
self.upnp_lease_duration,
)?;

Ok(endpoint)
}
}

// Private helpers

// Creates a new Connection
async fn new_connection_to(
node_addr: &SocketAddr,
endpoint_cfg: quinn::ServerConfig,
client_cfg: quinn::ClientConfig,
local_addr: SocketAddr,
allow_random_port: bool,
upnp_lease_duration: u32,
) -> Result<(Endpoint, Connection)> {
trace!("Attempting to connect to peer: {}", node_addr);

let (quinn_endpoint, quinn_incoming) = bind(endpoint_cfg, local_addr, allow_random_port)?;

trace!("Bound connection to local address: {}", local_addr);

let endpoint = Endpoint::new(quinn_endpoint, quinn_incoming, client_cfg)?;
let endpoint = Endpoint::new(
quinn_endpoint,
quinn_incoming,
client_cfg,
upnp_lease_duration,
)?;
let connection = endpoint.connect_to(node_addr).await?;

Ok((endpoint, connection))
Expand All @@ -346,7 +368,6 @@ fn bind(
allow_random_port: bool,
) -> Result<(quinn::Endpoint, quinn::Incoming)> {
let mut endpoint_builder = quinn::Endpoint::builder();
// TODO: allow to optionally accept incoming conns, needed for clients
let _ = endpoint_builder.listen(endpoint_cfg);

match UdpSocket::bind(&local_addr) {
Expand All @@ -371,9 +392,7 @@ fn bind(
}
}

// Private helpers

// Unwrap the conffig if provided by the user, otherwise construct the default one
// Unwrap the config if provided by the user, otherwise construct the default one
#[cfg(not(feature = "upnp"))]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
cfg.map_or(Config::read_or_construct_default(None), Ok)
Expand Down
124 changes: 121 additions & 3 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

#[cfg(feature = "upnp")]
use super::igd::forward_port;
use super::{
connections::{Connection, IncomingConnections},
error::{Error, Result},
};
use futures::lock::Mutex;
use log::trace;
#[cfg(feature = "upnp")]
use log::{debug, info};
use std::{net::SocketAddr, sync::Arc};

/// Host name of the Quic communication certificate used by peers
Expand All @@ -26,6 +30,7 @@ pub struct Endpoint {
quic_endpoint: quinn::Endpoint,
quic_incoming: Arc<Mutex<quinn::Incoming>>,
client_cfg: quinn::ClientConfig,
upnp_lease_duration: u32,
}

impl std::fmt::Debug for Endpoint {
Expand All @@ -44,6 +49,7 @@ impl Endpoint {
quic_endpoint: quinn::Endpoint,
quic_incoming: quinn::Incoming,
client_cfg: quinn::ClientConfig,
upnp_lease_duration: u32,
) -> Result<Self> {
let local_addr = quic_endpoint.local_addr()?;
if local_addr.ip().is_unspecified() {
Expand All @@ -57,6 +63,7 @@ impl Endpoint {
quic_endpoint,
quic_incoming: Arc::new(Mutex::new(quic_incoming)),
client_cfg,
upnp_lease_duration,
})
}
}
Expand All @@ -74,9 +81,45 @@ impl Endpoint {
/// Note that if such an obtained address is of unspecified category we will ignore that as
/// such an address cannot be reached and hence not useful.
#[cfg(feature = "upnp")]
pub fn our_addr(&self) -> Result<SocketAddr> {
// TODO: make use of UPnP
self.local_addr()
pub async fn our_addr(&self) -> Result<SocketAddr> {
// Make use of UPnP to detect our public addr
let is_loopback = self.local_addr.ip().is_loopback();

// In parallel, try to contact an echo service
let echo_service_res = match self.query_ip_echo_service() {
Ok(addr) => Ok(addr),
Err(Error::NoEndpointEchoServerFound) => Ok(self.local_addr),
Err(err) => {
info!("Could not contact echo service: {} - {:?}", err, err);
Err(err)
}
};

let mut addr = None;
// Skip port forwarding if we are running locally
if !is_loopback {
// Attempt to use IGD for port forwarding
match forward_port(self.local_addr, self.upnp_lease_duration).await {
Ok(public_sa) => {
debug!("IGD success: {:?}", SocketAddr::V4(public_sa));
let mut local_addr = self.local_addr;
local_addr.set_port(public_sa.port());
addr = Some(local_addr)
}
Err(e) => {
info!("IGD request failed: {} - {:?}", e, e);
return Err(Error::IgdNotSupported);
}
}
}

echo_service_res.map(|echo_srvc_addr| {
if let Some(our_address) = addr {
our_address
} else {
echo_srvc_addr
}
})
}

/// Endpoint local address to give others for them to connect to us.
Expand Down Expand Up @@ -111,4 +154,79 @@ impl Endpoint {
);
IncomingConnections::new(Arc::clone(&self.quic_incoming))
}

// Private helper
#[cfg(feature = "upnp")]
fn query_ip_echo_service(&self) -> Result<SocketAddr> {
Ok(self.local_addr)
/* // Bail out early if we don't have any contacts.
if self.cfg.hard_coded_contacts.is_empty() {
return Err(QuicP2pError::NoEndpointEchoServerFound);
}
// Create a separate event stream for the IP echo request.
let (echo_resp_tx, echo_resp_rx) = mpsc::channel();
let idle_timeout_msec = Duration::from_millis(
self.cfg
.idle_timeout_msec
.unwrap_or(DEFAULT_IDLE_TIMEOUT_MSEC),
);
self.el.post(move || {
ctx_mut(|ctx| {
ctx.our_ext_addr_tx = Some(echo_resp_tx);
});
});
debug!(
"Querying IP echo service to find our public IP address (contact list: {:?})",
self.cfg.hard_coded_contacts
);
loop {
if self.cfg.hard_coded_contacts.is_empty() {
return Err(QuicP2pError::NoEndpointEchoServerFound);
}
let (notify_tx, notify_rx) = utils::new_unbounded_channels();
self.el.post(move || {
let _ = bootstrap::echo_request(notify_tx);
});
match notify_rx.recv_timeout(idle_timeout_msec) {
Ok(Event::BootstrapFailure { .. }) => {
debug!("BootstrapFailure");
break Err(QuicP2pError::NoEndpointEchoServerFound);
}
Ok(Event::BootstrappedTo { node }) => {
debug!("BootstrappedTo {{ node: {:?} }}", node);
match echo_resp_rx.recv_timeout(idle_timeout_msec) {
Ok(res) => {
debug!("Found our address: {:?}", res);
break Ok(res);
}
Err(_e) => {
// This node hasn't replied in a timely manner, so remove it from our bootstrap list and try again.
let _ = self.cfg.hard_coded_contacts.remove(&node);
info!(
"Node {} is unresponsive, removing it from bootstrap contacts; {} contacts left",
node,
self.cfg.hard_coded_contacts.len()
);
}
}
}
Ok(ev) => {
debug!("Unexpected event: {:?}", ev);
break Err(QuicP2pError::NoEndpointEchoServerFound);
}
Err(err) => {
debug!("Unexpected error: {:?}", err);
break Err(QuicP2pError::NoEndpointEchoServerFound);
}
}
}*/
}
}
7 changes: 0 additions & 7 deletions src/igd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,12 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

#![allow(unused)]

use crate::error::{Error, Result};
use log::{debug, info, warn};
use std::net::{IpAddr, SocketAddr, SocketAddrV4};
use std::time::Duration;
use tokio::time::{self, Instant};

/// Default duration of a UPnP lease, in seconds.
pub const DEFAULT_UPNP_LEASE_DURATION_SEC: u32 = 120;
/// Duration of wait for a UPnP result to come back.
pub const UPNP_RESPONSE_TIMEOUT_MSEC: u64 = 3_000;

/// Automatically forwards a port and setups a tokio task to renew it periodically.
pub async fn forward_port(local_addr: SocketAddr, lease_duration: u32) -> Result<SocketAddrV4> {
let igd_res = add_port(local_addr, lease_duration).await;
Expand Down
5 changes: 1 addition & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mutable_transmutes,
no_mangle_const_items,
unknown_crate_types,
//warnings
warnings
)]
#![deny(
bad_style,
Expand Down Expand Up @@ -63,12 +63,9 @@ mod test_utils;
mod utils;
mod wire_msg;

#[cfg(feature = "upnp")]
pub use crate::igd::{DEFAULT_UPNP_LEASE_DURATION_SEC, UPNP_RESPONSE_TIMEOUT_MSEC};
pub use api::{Message, QuicP2p};
pub use config::Config;
pub use connections::{Connection, IncomingConnections, IncomingMessages, RecvStream, SendStream};
pub use dirs::{Dirs, OverRide};
pub use endpoint::Endpoint;
pub use error::{Error, Result};
pub use peer_config::{DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC};

0 comments on commit a19cd51

Please sign in to comment.