Skip to content

Commit

Permalink
feat(api): add support for manual port forwarding by passing additional
Browse files Browse the repository at this point in the history
arguments
  • Loading branch information
lionel-faber committed Feb 12, 2021
1 parent 4ddd218 commit 9dca7b9
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json = "1.0.59"
structopt = "~0.3.15"
thiserror = "1.0.23"
webpki = "~0.21.3"
flexi_logger = "~0.16.1"

[dependencies.bytes]
version = "1.0.1"
Expand Down Expand Up @@ -50,7 +51,6 @@ webpki = "~0.21.3"
[dev-dependencies]
anyhow = "1.0.36"
assert_matches = "1.3"
flexi_logger = "~0.16.1"
rand = "~0.7.3"

[target."cfg(any(all(unix, not(any(target_os = \"android\", target_os = \"androideabi\", target_os = \"ios\"))), windows))".dependencies]
Expand Down
56 changes: 44 additions & 12 deletions examples/echo_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{anyhow, Error, Result};
use qp2p::{Config, QuicP2p};
use qp2p::{Config, QuicP2p, Error as QP2pError};
use std::collections::HashSet;
use std::net::Ipv4Addr;
use std::net::{IpAddr, SocketAddr};
Expand All @@ -19,17 +19,28 @@ pub struct Args {
)]
pub hard_coded_contacts: HashSet<SocketAddr>,
/// Port we want to reserve for QUIC. If none supplied we'll use the OS given random port.
/// If external port is provided it means that the user is carrying out manual port forwarding and this field is mandatory.
/// This will be the internal port number mapped to the process
#[structopt(short, long)]
pub port: Option<u16>,
/// IP address for the listener. If none supplied and `forward_port` is enabled, we will use IGD to realize the
pub local_port: Option<u16>,
/// IP address for the listener. If none is supplied and `forward_port` is enabled, we will use IGD to realize the
/// local IP address of the machine. If IGD fails the application will exit.
#[structopt(long)]
pub ip: Option<IpAddr>,
#[structopt(short, long)]
pub local_ip: Option<IpAddr>,
/// Specify if port forwarding via UPnP should be done or not. This can be set to false if the network
/// is run locally on the network loopback or on a local area network.
#[structopt(long)]
pub forward_port: bool,
/// External port number assigned to the socket address of the program.
/// If this is provided, QP2p considers that the local port provided has been mapped to the
/// provided external port number and automatic port forwarding will be skipped.
#[structopt(short, long)]
pub external_port: Option<u16>,
/// External IP address of the computer on the WAN. This field is mandatory if the node is the genesis node and
/// port forwarding is not available. In case of non-genesis nodes, the external IP address will be resolved
/// using the Echo service.
#[structopt(short, long)]
pub external_ip: Option<IpAddr>,
/// Is the network meant to run on the local loopback network?
/// If this is set to true, the IP address will be set to `127.0.0.1`
#[structopt(long)]
Expand All @@ -44,23 +55,37 @@ async fn main() -> Result<(), Error> {
let mut args = Args::from_args();

if args.local {
args.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
args.local_ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
args.forward_port = false;
}

let qp2p = QuicP2p::with_config(
let qp2p_res = QuicP2p::with_config(
Some(Config {
ip: args.ip,
port: args.port,
local_ip: args.local_ip,
local_port: args.local_port,
external_ip: args.external_ip,
external_port: args.external_port,
forward_port: args.forward_port,
hard_coded_contacts: args.hard_coded_contacts,
..Default::default()
}),
&[],
false,
)?;
);

let qp2p = match qp2p_res {
Ok(qp2p) => qp2p,
Err(err) => {
if let QP2pError::IgdSearch(_) = err {
println!("The program encountered an error while automatically trying to realize local IP Address.");
println!("This is because IGD is not supported by your router.");
println!("Please restart the program by manually forwarding the port and providing the required information by passing --local-ip x.x.x.x --external-ip x.x.x.x --local-port xxxx --external-port xxxx");
};
return Err(Error::from(err));
}
};

let mut endpoint = qp2p.new_endpoint()?;
let mut endpoint = qp2p.new_endpoint().await?;
let local_addr = endpoint.local_addr();

let public_addr = if args.forward_port {
Expand All @@ -75,6 +100,13 @@ async fn main() -> Result<(), Error> {
public_addr
);

// Message is declared here and later assigned so that it is held in memory
// long enough for the peer to read the Echo Service responses before it is
// dropped and the streams are closed
#[allow(unused)]
let mut message: qp2p::Message;

#[allow(unused)]
loop {
println!("Waiting for connections...\n");
let mut incoming = endpoint.listen();
Expand All @@ -85,7 +117,7 @@ async fn main() -> Result<(), Error> {
let connecting_peer = messages.remote_addr();
println!("Incoming connection from: {}", &connecting_peer);

let _message = messages
message = messages
.next()
.await
.ok_or_else(|| anyhow!("Missing expected incomming message"))?;
Expand Down
27 changes: 14 additions & 13 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,16 @@ impl QuicP2p {
bootstrap_nodes: &[SocketAddr],
use_bootstrap_cache: bool,
) -> Result<Self> {
crate::utils::init_logging();
let cfg = unwrap_config_or_default(cfg)?;
debug!("Config passed in to qp2p: {:?}", cfg);

let (port, allow_random_port) = cfg
.port
.local_port
.map(|p| (p, false))
.unwrap_or((DEFAULT_PORT_TO_TRY, true));

let ip = cfg.ip.unwrap_or_else(|| {
let ip = cfg.local_ip.unwrap_or_else(|| {
let mut our_ip = IpAddr::V4(Ipv4Addr::UNSPECIFIED);

// check hard coded contacts for being local (aka loopback)
Expand Down Expand Up @@ -182,8 +183,8 @@ impl QuicP2p {
.upnp_lease_duration
.unwrap_or(DEFAULT_UPNP_LEASE_DURATION_SEC);

qp2p_config.ip = Some(ip);
qp2p_config.port = Some(port);
qp2p_config.local_ip = Some(ip);
qp2p_config.local_port = Some(port);
qp2p_config.keep_alive_interval_msec = Some(keep_alive_interval_msec);
qp2p_config.idle_timeout_msec = Some(idle_timeout_msec);
qp2p_config.upnp_lease_duration = Some(upnp_lease_duration);
Expand Down Expand Up @@ -219,7 +220,7 @@ impl QuicP2p {
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// config.port = Some(3000);
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut endpoint = quic_p2p.new_endpoint()?;
/// let mut endpoint = quic_p2p.new_endpoint().await?;
/// let peer_addr = endpoint.socket_addr().await?;
///
/// config.port = Some(3001);
Expand All @@ -229,7 +230,7 @@ impl QuicP2p {
/// }
/// ```
pub async fn bootstrap(&self) -> Result<(Endpoint, Connection, IncomingMessages)> {
let endpoint = self.new_endpoint()?;
let endpoint = self.new_endpoint().await?;

trace!("Bootstrapping with nodes {:?}", endpoint.bootstrap_nodes());
if endpoint.bootstrap_nodes().is_empty() {
Expand Down Expand Up @@ -273,15 +274,15 @@ impl QuicP2p {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint()?;
/// let mut peer_1 = quic_p2p.new_endpoint().await?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
/// Ok(())
/// }
/// ```
pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {
let endpoint = self.new_endpoint()?;
let endpoint = self.new_endpoint().await?;
let (conn, _) = endpoint.connect_to(node_addr).await?;

Ok((endpoint, conn))
Expand All @@ -301,11 +302,11 @@ impl QuicP2p {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let endpoint = quic_p2p.new_endpoint()?;
/// let endpoint = quic_p2p.new_endpoint().await?;
/// Ok(())
/// }
/// ```
pub fn new_endpoint(&self) -> Result<Endpoint> {
pub async fn new_endpoint(&self) -> Result<Endpoint> {
trace!("Creating a new endpoint");

let bootstrap_nodes: Vec<SocketAddr> = self
Expand All @@ -331,7 +332,7 @@ impl QuicP2p {
self.client_cfg.clone(),
bootstrap_nodes,
self.qp2p_config.clone(),
)?;
).await?;

Ok(endpoint)
}
Expand Down Expand Up @@ -372,8 +373,8 @@ fn bind(

fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg);
if cfg.ip.is_none() {
cfg.ip = crate::igd::get_local_ip().ok();
if cfg.local_ip.is_none() {
cfg.local_ip = Some(crate::igd::get_local_ip()?);
};
if cfg.clean {
Config::clear_config_from_disk(None)?;
Expand Down
26 changes: 20 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,28 @@ pub struct Config {
)]
pub hard_coded_contacts: HashSet<SocketAddr>,
/// Port we want to reserve for QUIC. If none supplied we'll use the OS given random port.
/// If external port is provided it means that the user is carrying out manual port forwarding and this field is mandatory.
/// This will be the internal port number mapped to the process
#[structopt(short, long)]
pub port: Option<u16>,
/// IP address for the listener. If none supplied we'll use the default address (0.0.0.0).
pub local_port: Option<u16>,
/// IP address for the listener. If none is supplied and `forward_port` is enabled, we will use IGD to realize the
/// local IP address of the machine. If IGD fails the application will exit.
#[structopt(short, long)]
pub local_ip: Option<IpAddr>,
/// Specify if port forwarding via UPnP should be done or not. This can be set to false if the network
/// is run locally on the network loopback or on a local area network.
#[structopt(long)]
pub ip: Option<IpAddr>,
pub forward_port: bool,
/// External port number assigned to the socket address of the program.
/// If this is provided, QP2p considers that the local port provided has been mapped to the
/// provided external port number and automatic port forwarding will be skipped.
#[structopt(short, long)]
pub external_port: Option<u16>,
/// External IP address of the computer on the WAN. This field is mandatory if the node is the genesis node and
/// port forwarding is not available. In case of non-genesis nodes, the external IP address will be resolved
/// using the Echo service.
#[structopt(short, long)]
pub external_ip: Option<IpAddr>,
/// This is the maximum message size we'll allow the peer to send to us. Any bigger message and
/// we'll error out probably shutting down the connection to the peer. If none supplied we'll
/// default to the documented constant.
Expand All @@ -62,9 +79,6 @@ pub struct Config {
/// Duration of a UPnP port mapping.
#[structopt(long)]
pub upnp_lease_duration: Option<u32>,
/// Specify if port forwarding via UPnP should be done or not
#[structopt(long)]
pub forward_port: bool,
/// Use a fresh config without re-using any config available on disk
#[structopt(long)]
pub fresh: bool,
Expand Down
13 changes: 11 additions & 2 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Connection {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint()?;
/// let mut peer_1 = quic_p2p.new_endpoint().await?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
Expand All @@ -71,7 +71,7 @@ impl Connection {
/// let mut config = Config::default();
/// config.ip = Some(IpAddr::V4(Ipv4Addr::LOCALHOST));
/// let mut quic_p2p = QuicP2p::with_config(Some(config.clone()), Default::default(), true)?;
/// let mut peer_1 = quic_p2p.new_endpoint()?;
/// let mut peer_1 = quic_p2p.new_endpoint().await?;
/// let peer1_addr = peer_1.socket_addr().await?;
///
/// let (peer_2, connection) = quic_p2p.connect_to(&peer1_addr).await?;
Expand Down Expand Up @@ -259,10 +259,19 @@ impl IncomingMessages {
Some(Ok((mut send, mut recv))) => match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => Some((bytes, send, recv)),
Ok(WireMsg::EndpointEchoReq) => {
trace!("Received Echo Request");
let message = WireMsg::EndpointEchoResp(peer_addr);
message.write_to_stream(&mut send).await.ok()?;
trace!("Responded to Echo request");
Some((Bytes::new(), send, recv))
}
Ok(WireMsg::EndpointVerificationReq(address_sent)) => {
trace!("Received Endpoint verification request {:?} from {:?}", address_sent, peer_addr);
let message = WireMsg::EndpointVerficationResp(address_sent == peer_addr);
message.write_to_stream(&mut send).await.ok()?;
trace!("Responded to Endpoint verification request");
Some((Bytes::new(), send, recv))
},
Ok(msg) => {
error!("Unexpected message type: {:?}", msg);
Some((Bytes::new(), send, recv))
Expand Down
47 changes: 41 additions & 6 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use super::{
Config,
};
use futures::lock::Mutex;
use log::trace;
use log::{debug, info};
use log::{debug, info, warn, trace, error};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::time::timeout;

Expand Down Expand Up @@ -54,25 +53,61 @@ impl std::fmt::Debug for Endpoint {
}

impl Endpoint {
pub(crate) fn new(
pub(crate) async fn new(
quic_endpoint: quinn::Endpoint,
quic_incoming: quinn::Incoming,
client_cfg: quinn::ClientConfig,
bootstrap_nodes: Vec<SocketAddr>,
qp2p_config: Config,
) -> Result<Self> {
let local_addr = quic_endpoint.local_addr()?;
Ok(Self {
let public_addr = match (qp2p_config.external_ip, qp2p_config.external_port) {
(Some(ip), Some(port)) => Some(SocketAddr::new(ip, port)),
_ => None
};
let endpoint = Self {
local_addr,
public_addr: None,
public_addr,
quic_endpoint,
quic_incoming: Arc::new(Mutex::new(quic_incoming)),
client_cfg,
bootstrap_nodes,
qp2p_config,
connection_pool: ConnectionPool::new(),
connection_deduplicator: ConnectionDeduplicator::new(),
})
};
if let Some(addr) = endpoint.public_addr {
// External IP and port number is provided
// This means that the user has performed manual port-forwarding
// Verify that the given socket address is reachable
if let Some(contact) = endpoint.bootstrap_nodes.iter().next() {
info!("Verifying provided public IP address");
let (connection, _incoming) = endpoint.connect_to(contact).await?;
let (mut send, mut recv) = connection.open_bi().await?;
send.send(WireMsg::EndpointVerificationReq(addr)).await?;
let response = WireMsg::read_from_stream(&mut recv.quinn_recv_stream).await?;
match response {
WireMsg::EndpointVerficationResp(valid) => {
if valid {
info!("Endpoint verification successful! {} is reachable.", addr);
Ok(endpoint)
} else {
error!("Endpoint verification failed! {} is not reachable.", addr);
Err(Error::IncorrectPublicAddress)
}
},
other => {
error!("Unexpected message when verifying public endpoint: {}", other);
Err(Error::UnexpectedMessageType(other))
}
}
} else {
warn!("Public IP address not verified since bootstrap contacts are empty");
Ok(endpoint)
}
} else {
Ok(endpoint)
}
}

/// Endpoint local address
Expand Down
5 changes: 4 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ pub enum Error {
#[error("Type of the message received was not the expected one: {0}")]
UnexpectedMessageType(WireMsg),
/// The message exceeds the maximum message length allowed.
#[error("Maximum data length exceeded, lengh: {0}")]
#[error("Maximum data length exceeded, length: {0}")]
MaxLengthExceeded(usize),
/// Incorrect Public Address provided
#[error("Incorrect Public Address provided")]
IncorrectPublicAddress,
}
Loading

0 comments on commit 9dca7b9

Please sign in to comment.