diff --git a/Cargo.toml b/Cargo.toml index b96d3885..94365d12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ unwrap = "1.2.1" bincode = "1.2.1" crossbeam-channel = "~0.4.2" serde_json = "1.0.59" +flexi_logger = "~0.16.1" structopt = "~0.3.15" rcgen = "~0.8.4" log = "~0.4.8" diff --git a/src/api.rs b/src/api.rs index 9be5503c..601196a4 100644 --- a/src/api.rs +++ b/src/api.rs @@ -7,8 +7,6 @@ // specific language governing permissions and limitations relating to use of the SAFE Network // Software. -#[cfg(feature = "upnp")] -use super::igd; use super::{ bootstrap_cache::BootstrapCache, config::{Config, SerialisableCertificate}, @@ -17,6 +15,7 @@ use super::{ endpoint::Endpoint, error::{Error, Result}, peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC}, + utils::init_logging, }; use bytes::Bytes; use futures::future::select_ok; @@ -113,6 +112,7 @@ impl QuicP2p { bootstrap_nodes: &[SocketAddr], use_bootstrap_cache: bool, ) -> Result { + init_logging(); let cfg = unwrap_config_or_default(cfg)?; debug!("Config passed in to qp2p: {:?}", cfg); @@ -231,11 +231,13 @@ impl QuicP2p { trace!("Bootstrapping with nodes {:?}", bootstrap_nodes); // Attempt to connect to all nodes and return the first one to succeed let mut tasks = Vec::default(); - for node_addr in bootstrap_nodes { + for node_addr in bootstrap_nodes.iter().cloned() { + let nodes = bootstrap_nodes.clone(); let endpoint_cfg = self.endpoint_cfg.clone(); let client_cfg = self.client_cfg.clone(); let local_addr = self.local_addr; let allow_random_port = self.allow_random_port; + #[cfg(feature = "upnp")] let upnp_lease_duration = self.upnp_lease_duration; let task_handle = tokio::spawn(async move { new_connection_to( @@ -244,7 +246,10 @@ impl QuicP2p { client_cfg, local_addr, allow_random_port, + #[cfg(feature = "upnp")] upnp_lease_duration, + #[cfg(feature = "upnp")] + nodes ) .await }); @@ -282,14 +287,28 @@ impl QuicP2p { /// Ok(()) /// } /// ``` - pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> { + pub async fn connect_to(&mut self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> { + + #[cfg(feature = "upnp")] + let bootstrap_nodes: Vec = self + .bootstrap_cache + .peers() + .iter() + .rev() + .chain(self.bootstrap_cache.hard_coded_contacts().iter()) + .cloned() + .collect(); + new_connection_to( node_addr, self.endpoint_cfg.clone(), self.client_cfg.clone(), self.local_addr, self.allow_random_port, + #[cfg(feature = "upnp")] self.upnp_lease_duration, + #[cfg(feature = "upnp")] + bootstrap_nodes ) .await } @@ -314,6 +333,17 @@ impl QuicP2p { /// ``` pub fn new_endpoint(&self) -> Result { trace!("Creating a new enpoint"); + + #[cfg(feature = "upnp")] + let bootstrap_nodes: Vec = self + .bootstrap_cache + .peers() + .iter() + .rev() + .chain(self.bootstrap_cache.hard_coded_contacts().iter()) + .cloned() + .collect(); + let (quinn_endpoint, quinn_incoming) = bind( self.endpoint_cfg.clone(), self.local_addr, @@ -326,7 +356,10 @@ impl QuicP2p { quinn_endpoint, quinn_incoming, self.client_cfg.clone(), + #[cfg(feature = "upnp")] self.upnp_lease_duration, + #[cfg(feature = "upnp")] + bootstrap_nodes, )?; Ok(endpoint) @@ -342,7 +375,10 @@ async fn new_connection_to( client_cfg: quinn::ClientConfig, local_addr: SocketAddr, allow_random_port: bool, + #[cfg(feature = "upnp")] upnp_lease_duration: u32, + #[cfg(feature = "upnp")] + bootstrap_nodes: Vec, ) -> Result<(Endpoint, Connection)> { trace!("Attempting to connect to peer: {}", node_addr); @@ -354,7 +390,10 @@ async fn new_connection_to( quinn_endpoint, quinn_incoming, client_cfg, + #[cfg(feature = "upnp")] upnp_lease_duration, + #[cfg(feature = "upnp")] + bootstrap_nodes, )?; let connection = endpoint.connect_to(node_addr).await?; @@ -393,17 +432,16 @@ fn bind( } // Unwrap the config if provided by the user, otherwise construct the default one -#[cfg(not(feature = "upnp"))] fn unwrap_config_or_default(cfg: Option) -> Result { cfg.map_or(Config::read_or_construct_default(None), Ok) } -#[cfg(feature = "upnp")] -fn unwrap_config_or_default(cfg: Option) -> Result { - let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg); - if cfg.ip.is_none() { - cfg.ip = igd::get_local_ip().ok(); - }; +// #[cfg(feature = "upnp")] +// fn unwrap_config_or_default(cfg: Option) -> Result { +// let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg); +// if cfg.ip.is_none() { +// cfg.ip = igd::get_local_ip().ok(); +// }; - Ok(cfg) -} +// Ok(cfg) +// } diff --git a/src/connections.rs b/src/connections.rs index fcd57e91..287b24c9 100644 --- a/src/connections.rs +++ b/src/connections.rs @@ -91,7 +91,7 @@ impl Connection { /// This returns the streams to send additional messages / read responses sent using the same stream. pub async fn send(&self, msg: Bytes) -> Result<(SendStream, RecvStream)> { let (mut send_stream, recv_stream) = self.open_bi_stream().await?; - send_stream.send(msg).await?; + send_stream.send_user_msg(msg).await?; Ok((send_stream, recv_stream)) } @@ -238,7 +238,7 @@ impl IncomingMessages { /// Stream to receive multiple messages pub struct RecvStream { - quinn_recv_stream: quinn::RecvStream, + pub(crate) quinn_recv_stream: quinn::RecvStream, } impl RecvStream { @@ -262,15 +262,20 @@ impl SendStream { Self { quinn_send_stream } } - /// Send a message using the bi-directional stream created by the initiator - pub async fn send(&mut self, msg: Bytes) -> Result<()> { + /// Send a message using the stream created by the initiator + pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<()> { send_msg(&mut self.quinn_send_stream, msg).await } - + + /// Send a wire message + pub async fn send(&mut self, msg: WireMsg) -> Result<()> { + msg.write_to_stream(&mut self.quinn_send_stream).await + } /// Gracefully finish current stream pub async fn finish(mut self) -> Result<()> { self.quinn_send_stream.finish().await.map_err(Error::from) } + } // Helper to read the message's bytes from the provided stream @@ -278,8 +283,7 @@ async fn read_bytes(recv: &mut quinn::RecvStream) -> Result { match WireMsg::read_from_stream(recv).await? { WireMsg::UserMsg(msg_bytes) => Ok(msg_bytes), WireMsg::EndpointEchoReq | WireMsg::EndpointEchoResp(_) => { - // TODO: handle the echo request/response message - unimplemented!("echo message type not supported yet"); + Err(Error::UnexpectedMessageType) } } } diff --git a/src/endpoint.rs b/src/endpoint.rs index 0ae08a64..26768ef9 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -9,14 +9,17 @@ #[cfg(feature = "upnp")] use super::igd::forward_port; +#[cfg(feature = "upnp")] +use log::{debug, info}; +#[cfg(feature = "upnp")] +use super::error::Error; use super::{ connections::{Connection, IncomingConnections}, - error::{Error, Result}, + error::Result, }; +use super::wire_msg::WireMsg; use futures::lock::Mutex; use log::trace; -#[cfg(feature = "upnp")] -use log::{debug, info}; use std::{net::SocketAddr, sync::Arc}; /// Host name of the Quic communication certificate used by peers @@ -30,7 +33,10 @@ pub struct Endpoint { quic_endpoint: quinn::Endpoint, quic_incoming: Arc>, client_cfg: quinn::ClientConfig, + #[cfg(feature = "upnp")] upnp_lease_duration: u32, + #[cfg(feature = "upnp")] + bootstrap_nodes: Vec, } impl std::fmt::Debug for Endpoint { @@ -49,23 +55,23 @@ impl Endpoint { quic_endpoint: quinn::Endpoint, quic_incoming: quinn::Incoming, client_cfg: quinn::ClientConfig, + #[cfg(feature = "upnp")] upnp_lease_duration: u32, + #[cfg(feature = "upnp")] + bootstrap_nodes: Vec, ) -> Result { let local_addr = quic_endpoint.local_addr()?; - if local_addr.ip().is_unspecified() { - Err(Error::Configuration( - "No IP specified in the config and IGD detection is disabled or not available." - .to_string(), - )) - } else { - Ok(Self { - local_addr, - quic_endpoint, - quic_incoming: Arc::new(Mutex::new(quic_incoming)), - client_cfg, - upnp_lease_duration, - }) - } + dbg!(local_addr); + Ok(Self { + local_addr, + quic_endpoint, + quic_incoming: Arc::new(Mutex::new(quic_incoming)), + client_cfg, + #[cfg(feature = "upnp")] + upnp_lease_duration, + #[cfg(feature = "upnp")] + bootstrap_nodes, + }) } /// Endpoint local address @@ -81,50 +87,53 @@ impl Endpoint { /// Note that if such an obtained address is of unspecified category we will ignore that as /// such an address cannot be reached and hence not useful. #[cfg(feature = "upnp")] - pub async fn our_addr(&self) -> Result { - // Make use of UPnP to detect our public addr - let is_loopback = self.local_addr.ip().is_loopback(); - - // In parallel, try to contact an echo service - let echo_service_res = match self.query_ip_echo_service() { - Ok(addr) => Ok(addr), - Err(Error::NoEndpointEchoServerFound) => Ok(self.local_addr), - Err(err) => { - info!("Could not contact echo service: {} - {:?}", err, err); - Err(err) - } - }; + pub async fn our_endpoint(&mut self) -> Result { + + // Skip port forwarding + if self.local_addr.ip().is_loopback() || !self.local_addr.ip().is_unspecified() { + return Ok(self.local_addr); + } let mut addr = None; - // Skip port forwarding if we are running locally - if !is_loopback { - // Attempt to use IGD for port forwarding - match forward_port(self.local_addr, self.upnp_lease_duration).await { - Ok(public_sa) => { - debug!("IGD success: {:?}", SocketAddr::V4(public_sa)); - let mut local_addr = self.local_addr; - local_addr.set_port(public_sa.port()); - addr = Some(local_addr) - } - Err(e) => { - info!("IGD request failed: {} - {:?}", e, e); - return Err(Error::IgdNotSupported); - } + + // Attempt to use IGD for port forwarding + match forward_port(self.local_addr, self.upnp_lease_duration).await { + Ok(public_sa) => { + debug!("IGD success: {:?}", SocketAddr::V4(public_sa)); + addr = Some(SocketAddr::V4(public_sa)); + } + Err(e) => { + info!("IGD request failed: {} - {:?}", e, e); + // return Err(Error::IgdNotSupported); } } - echo_service_res.map(|echo_srvc_addr| { - if let Some(our_address) = addr { - our_address - } else { - echo_srvc_addr + // Try to contact an echo service + match self.query_ip_echo_service().await { + Ok(echo_res) => { + match addr { + None => { + addr = Some(echo_res); + }, + Some(address) => { + info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address); + } + } + }, + Err(err) => { + info!("Could not contact echo service: {} - {:?}", err, err); } - }) + }; + if let Some(socket_addr) = addr { + Ok(socket_addr) + } else { + Err(Error::Unexpected("No response from echo service".to_string())) + } } /// Endpoint local address to give others for them to connect to us. #[cfg(not(feature = "upnp"))] - pub fn our_addr(&self) -> Result { + pub async fn our_endpoint(&mut self) -> Result { self.local_addr() } @@ -157,76 +166,28 @@ impl Endpoint { // Private helper #[cfg(feature = "upnp")] - fn query_ip_echo_service(&self) -> Result { - Ok(self.local_addr) - /* // Bail out early if we don't have any contacts. - if self.cfg.hard_coded_contacts.is_empty() { - return Err(QuicP2pError::NoEndpointEchoServerFound); + async fn query_ip_echo_service(&self) -> Result { + // Bail out early if we don't have any contacts. + if self.bootstrap_nodes.is_empty() { + return Err(Error::NoEndpointEchoServerFound); } - // Create a separate event stream for the IP echo request. - let (echo_resp_tx, echo_resp_rx) = mpsc::channel(); - - let idle_timeout_msec = Duration::from_millis( - self.cfg - .idle_timeout_msec - .unwrap_or(DEFAULT_IDLE_TIMEOUT_MSEC), - ); - - self.el.post(move || { - ctx_mut(|ctx| { - ctx.our_ext_addr_tx = Some(echo_resp_tx); + let mut tasks = Vec::default(); + for node in self.bootstrap_nodes.iter().cloned() { + let connection = self.connect_to(&node).await?; // TODO: move into loop + let task_handle = tokio::spawn(async move { + let (mut send_stream, mut recv_stream) = connection.open_bi_stream().await?; + send_stream.send(WireMsg::EndpointEchoReq).await?; + match WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await { + Ok(WireMsg::EndpointEchoResp(socket_addr)) => Ok(socket_addr), + Ok(_) => Err(Error::Unexpected("Unexpected message".to_string())), + Err(err) => Err(err), + } }); - }); - - debug!( - "Querying IP echo service to find our public IP address (contact list: {:?})", - self.cfg.hard_coded_contacts - ); - - loop { - if self.cfg.hard_coded_contacts.is_empty() { - return Err(QuicP2pError::NoEndpointEchoServerFound); - } - - let (notify_tx, notify_rx) = utils::new_unbounded_channels(); + tasks.push(task_handle); + } - self.el.post(move || { - let _ = bootstrap::echo_request(notify_tx); - }); + self.local_addr() - match notify_rx.recv_timeout(idle_timeout_msec) { - Ok(Event::BootstrapFailure { .. }) => { - debug!("BootstrapFailure"); - break Err(QuicP2pError::NoEndpointEchoServerFound); - } - Ok(Event::BootstrappedTo { node }) => { - debug!("BootstrappedTo {{ node: {:?} }}", node); - match echo_resp_rx.recv_timeout(idle_timeout_msec) { - Ok(res) => { - debug!("Found our address: {:?}", res); - break Ok(res); - } - Err(_e) => { - // This node hasn't replied in a timely manner, so remove it from our bootstrap list and try again. - let _ = self.cfg.hard_coded_contacts.remove(&node); - info!( - "Node {} is unresponsive, removing it from bootstrap contacts; {} contacts left", - node, - self.cfg.hard_coded_contacts.len() - ); - } - } - } - Ok(ev) => { - debug!("Unexpected event: {:?}", ev); - break Err(QuicP2pError::NoEndpointEchoServerFound); - } - Err(err) => { - debug!("Unexpected error: {:?}", err); - break Err(QuicP2pError::NoEndpointEchoServerFound); - } - } - }*/ } } diff --git a/src/lib.rs b/src/lib.rs index d1ff573e..07ba8c55 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,7 +15,7 @@ mutable_transmutes, no_mangle_const_items, unknown_crate_types, - warnings + // warnings )] #![deny( bad_style, diff --git a/src/utils.rs b/src/utils.rs index 2108f812..9aeebede 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -14,8 +14,10 @@ use crate::{ use serde::de::DeserializeOwned; use serde::Serialize; use std::fs::File; -use std::io::{BufReader, BufWriter}; +use std::io::{BufReader, BufWriter, Write}; use std::path::Path; +use flexi_logger::{DeferredNow, Logger}; +use log::Record; /// Get the project directory #[cfg(any( @@ -92,3 +94,26 @@ where Ok(()) } + +pub(crate) fn init_logging() { + // Custom formatter for logs + let do_format = move |writer: &mut dyn Write, clock: &mut DeferredNow, record: &Record| { + let handle = std::thread::current(); + write!( + writer, + "[{}] {} {} [{}:{}] {}", + handle + .name() + .unwrap_or(&format!("Thread-{:?}", handle.id())), + record.level(), + clock.now().to_rfc3339(), + record.file().unwrap_or_default(), + record.line().unwrap_or_default(), + record.args() + ) + }; + + Logger::with_env() + .format(do_format) + .suppress_timestamp().start().map(|_| ()).unwrap_or(()); +} diff --git a/tests/quic_p2p.rs b/tests/common.rs similarity index 88% rename from tests/quic_p2p.rs rename to tests/common.rs index ed1ef3cf..25182290 100644 --- a/tests/quic_p2p.rs +++ b/tests/common.rs @@ -6,7 +6,7 @@ use std::{ }; /// Constructs a `QuicP2p` node with some sane defaults for testing. -fn new_qp2p() -> QuicP2p { +pub fn new_qp2p() -> QuicP2p { new_qp2p_with_hcc(Default::default()) } @@ -33,10 +33,10 @@ fn random_msg() -> Bytes { #[tokio::test] async fn successful_connection() -> Result<()> { let qp2p = new_qp2p(); - let peer1 = qp2p.new_endpoint()?; - let peer1_addr = peer1.our_addr()?; + let mut peer1 = qp2p.new_endpoint()?; + let peer1_addr = peer1.our_endpoint().await?; - let peer2 = qp2p.new_endpoint()?; + let mut peer2 = qp2p.new_endpoint()?; let _connection = peer2.connect_to(&peer1_addr).await?; let mut incoming_conn = peer1.listen()?; @@ -45,7 +45,7 @@ async fn successful_connection() -> Result<()> { .await .ok_or_else(|| Error::Unexpected("No incoming connection".to_string()))?; - assert_eq!(incoming_messages.remote_addr(), peer2.our_addr()?); + assert_eq!(incoming_messages.remote_addr(), peer2.our_endpoint().await?); Ok(()) } @@ -53,8 +53,8 @@ async fn successful_connection() -> Result<()> { #[tokio::test] async fn bi_directional_streams() -> Result<()> { let qp2p = new_qp2p(); - let peer1 = qp2p.new_endpoint()?; - let peer1_addr = peer1.our_addr()?; + let mut peer1 = qp2p.new_endpoint()?; + let peer1_addr = peer1.our_endpoint().await?; let peer2 = qp2p.new_endpoint()?; let connection = peer2.connect_to(&peer1_addr).await?; @@ -88,7 +88,7 @@ async fn bi_directional_streams() -> Result<()> { // Peer 2 should be able to re-use the stream to send an additional message let msg = random_msg(); - send_stream2.send(msg.clone()).await?; + send_stream2.send_user_msg(msg.clone()).await?; // Peer 1 should recieve the message in the stream recieved along with the // previous message @@ -97,7 +97,7 @@ async fn bi_directional_streams() -> Result<()> { // Peer 1 responds using the send stream let response_msg = random_msg(); - send_stream1.send(response_msg.clone()).await?; + send_stream1.send_user_msg(response_msg.clone()).await?; let received_response = recv_stream2.next().await?; @@ -109,12 +109,12 @@ async fn bi_directional_streams() -> Result<()> { #[tokio::test] async fn uni_directional_streams() -> Result<()> { let qp2p = new_qp2p(); - let peer1 = qp2p.new_endpoint()?; - let peer1_addr = peer1.our_addr()?; + let mut peer1 = qp2p.new_endpoint()?; + let peer1_addr = peer1.our_endpoint().await?; let mut incoming_conn_peer1 = peer1.listen()?; - let peer2 = qp2p.new_endpoint()?; - let peer2_addr = peer2.our_addr()?; + let mut peer2 = qp2p.new_endpoint()?; + let peer2_addr = peer2.our_endpoint().await?; let mut incoming_conn_peer2 = peer2.listen()?; // Peer 2 sends a message diff --git a/tests/igd.rs b/tests/igd.rs new file mode 100644 index 00000000..01702703 --- /dev/null +++ b/tests/igd.rs @@ -0,0 +1,39 @@ +#![cfg(feature = "upnp")] + +use qp2p::{Error, QuicP2p, Config}; +use std::net::{IpAddr, Ipv4Addr}; +use common::new_qp2p; + +mod common; + + + +#[tokio::test] +async fn echo_service() -> Result<(), Error> { + let qp2p = QuicP2p::with_config( + Some(Config { + port: Some(0), + ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), + ..Default::default() + }), + Default::default(), + false, + )?; + let peer1 = qp2p.new_endpoint()?; + let peer_addr = peer1.local_addr()?; + + let qp2p = QuicP2p::with_config( + Some(Config { + port: Some(0), + ip: None, + ..Default::default() + }), + vec![peer_addr].into(), + false, + )?; + + let mut peer2 = qp2p.new_endpoint()?; + let addr = peer2.our_endpoint().await?; + + Ok(()) +} \ No newline at end of file