Skip to content

Commit

Permalink
fix(echo_service): respond to echo service request and expand test
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Nov 10, 2020
1 parent 5ea6ca2 commit 40217e1
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 89 deletions.
30 changes: 14 additions & 16 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl QuicP2p {
#[cfg(feature = "upnp")]
upnp_lease_duration,
#[cfg(feature = "upnp")]
nodes
nodes,
)
.await
});
Expand Down Expand Up @@ -288,7 +288,6 @@ impl QuicP2p {
/// }
/// ```
pub async fn connect_to(&mut self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {

#[cfg(feature = "upnp")]
let bootstrap_nodes: Vec<SocketAddr> = self
.bootstrap_cache
Expand All @@ -307,8 +306,8 @@ impl QuicP2p {
self.allow_random_port,
#[cfg(feature = "upnp")]
self.upnp_lease_duration,
#[cfg(feature = "upnp")]
bootstrap_nodes
#[cfg(feature = "upnp")]
bootstrap_nodes,
)
.await
}
Expand Down Expand Up @@ -375,10 +374,8 @@ async fn new_connection_to(
client_cfg: quinn::ClientConfig,
local_addr: SocketAddr,
allow_random_port: bool,
#[cfg(feature = "upnp")]
upnp_lease_duration: u32,
#[cfg(feature = "upnp")]
bootstrap_nodes: Vec<SocketAddr>,
#[cfg(feature = "upnp")] upnp_lease_duration: u32,
#[cfg(feature = "upnp")] bootstrap_nodes: Vec<SocketAddr>,
) -> Result<(Endpoint, Connection)> {
trace!("Attempting to connect to peer: {}", node_addr);

Expand Down Expand Up @@ -432,16 +429,17 @@ fn bind(
}

// Unwrap the config if provided by the user, otherwise construct the default one
#[cfg(not(feature = "upnp"))]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
cfg.map_or(Config::read_or_construct_default(None), Ok)
}

// #[cfg(feature = "upnp")]
// fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
// let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg);
// if cfg.ip.is_none() {
// cfg.ip = igd::get_local_ip().ok();
// };
#[cfg(feature = "upnp")]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg);
if cfg.ip.is_none() {
cfg.ip = crate::igd::get_local_ip().ok();
};

// Ok(cfg)
// }
Ok(cfg)
}
43 changes: 29 additions & 14 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl IncomingMessages {
src,
recv: RecvStream::new(recv)
}),
next_bi = Self::next_on_bi_streams(&mut self.bi_streams) =>
next_bi = Self::next_on_bi_streams(&mut self.bi_streams, self.peer_addr) =>
next_bi.map(|(bytes, send, recv)| Message::BiStream {
bytes,
src,
Expand All @@ -202,7 +202,11 @@ impl IncomingMessages {
None
}
Some(Ok(mut recv)) => match read_bytes(&mut recv).await {
Ok(bytes) => Some((bytes, recv)),
Ok(WireMsg::UserMsg(bytes)) => Some((bytes, recv)),
Ok(msg) => {
error!("Unexpected message type: {:?}", msg);
None
}
Err(err) => {
error!("{}", err);
None
Expand All @@ -214,6 +218,7 @@ impl IncomingMessages {
// Returns next message sent by peer in a bidirectional stream.
async fn next_on_bi_streams(
bi_streams: &mut quinn::IncomingBiStreams,
peer_addr: SocketAddr,
) -> Option<(Bytes, quinn::SendStream, quinn::RecvStream)> {
match bi_streams.next().await {
None => None,
Expand All @@ -225,8 +230,17 @@ impl IncomingMessages {
error!("Failed to read incoming message on bi-stream: {}", err);
None
}
Some(Ok((send, mut recv))) => match read_bytes(&mut recv).await {
Ok(bytes) => Some((bytes, send, recv)),
Some(Ok((mut send, mut recv))) => match read_bytes(&mut recv).await {
Ok(WireMsg::UserMsg(bytes)) => Some((bytes, send, recv)),
Ok(WireMsg::EndpointEchoReq) => {
let message = WireMsg::EndpointEchoResp(peer_addr);
message.write_to_stream(&mut send).await.ok()?;
Some((Bytes::from(""), send, recv))
}
Ok(msg) => {
error!("Unexpected message type: {:?}", msg);
None
}
Err(err) => {
error!("{}", err);
None
Expand All @@ -248,7 +262,14 @@ impl RecvStream {

/// Read next message from the stream
pub async fn next(&mut self) -> Result<Bytes> {
read_bytes(&mut self.quinn_recv_stream).await
match read_bytes(&mut self.quinn_recv_stream).await {
Ok(WireMsg::UserMsg(bytes)) => Ok(bytes),
Ok(msg) => Err(Error::Unexpected(format!(
"Unexpected message type: {:?}",
msg
))),
Err(error) => Err(error),
}
}
}

Expand All @@ -266,7 +287,7 @@ impl SendStream {
pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<()> {
send_msg(&mut self.quinn_send_stream, msg).await
}

/// Send a wire message
pub async fn send(&mut self, msg: WireMsg) -> Result<()> {
msg.write_to_stream(&mut self.quinn_send_stream).await
Expand All @@ -275,17 +296,11 @@ impl SendStream {
pub async fn finish(mut self) -> Result<()> {
self.quinn_send_stream.finish().await.map_err(Error::from)
}

}

// Helper to read the message's bytes from the provided stream
async fn read_bytes(recv: &mut quinn::RecvStream) -> Result<Bytes> {
match WireMsg::read_from_stream(recv).await? {
WireMsg::UserMsg(msg_bytes) => Ok(msg_bytes),
WireMsg::EndpointEchoReq | WireMsg::EndpointEchoResp(_) => {
Err(Error::UnexpectedMessageType)
}
}
async fn read_bytes(recv: &mut quinn::RecvStream) -> Result<WireMsg> {
WireMsg::read_from_stream(recv).await
}

// Helper to send bytes to peer using the provided stream.
Expand Down
51 changes: 27 additions & 24 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

#[cfg(feature = "upnp")]
use super::igd::forward_port;
#[cfg(feature = "upnp")]
use log::{debug, info};
#[cfg(feature = "upnp")]
use super::error::Error;
#[cfg(feature = "upnp")]
use super::igd::forward_port;
use super::wire_msg::WireMsg;
use super::{
connections::{Connection, IncomingConnections},
error::Result,
};
use super::wire_msg::WireMsg;
use futures::lock::Mutex;
use log::trace;
#[cfg(feature = "upnp")]
use log::{debug, info};
use std::{net::SocketAddr, sync::Arc};

/// Host name of the Quic communication certificate used by peers
Expand Down Expand Up @@ -55,10 +55,8 @@ impl Endpoint {
quic_endpoint: quinn::Endpoint,
quic_incoming: quinn::Incoming,
client_cfg: quinn::ClientConfig,
#[cfg(feature = "upnp")]
upnp_lease_duration: u32,
#[cfg(feature = "upnp")]
bootstrap_nodes: Vec<SocketAddr>,
#[cfg(feature = "upnp")] upnp_lease_duration: u32,
#[cfg(feature = "upnp")] bootstrap_nodes: Vec<SocketAddr>,
) -> Result<Self> {
let local_addr = quic_endpoint.local_addr()?;
dbg!(local_addr);
Expand Down Expand Up @@ -88,9 +86,8 @@ impl Endpoint {
/// such an address cannot be reached and hence not useful.
#[cfg(feature = "upnp")]
pub async fn our_endpoint(&mut self) -> Result<SocketAddr> {

// Skip port forwarding
if self.local_addr.ip().is_loopback() || !self.local_addr.ip().is_unspecified() {
if self.local_addr.ip().is_loopback() {
return Ok(self.local_addr);
}

Expand All @@ -110,14 +107,12 @@ impl Endpoint {

// Try to contact an echo service
match self.query_ip_echo_service().await {
Ok(echo_res) => {
match addr {
None => {
addr = Some(echo_res);
},
Some(address) => {
info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address);
}
Ok(echo_res) => match addr {
None => {
addr = Some(echo_res);
}
Some(address) => {
info!("Got response from echo service: {:?}, but IGD has already provided our external address: {:?}", echo_res, address);
}
},
Err(err) => {
Expand All @@ -127,7 +122,9 @@ impl Endpoint {
if let Some(socket_addr) = addr {
Ok(socket_addr)
} else {
Err(Error::Unexpected("No response from echo service".to_string()))
Err(Error::Unexpected(
"No response from echo service".to_string(),
))
}
}

Expand Down Expand Up @@ -174,20 +171,26 @@ impl Endpoint {

let mut tasks = Vec::default();
for node in self.bootstrap_nodes.iter().cloned() {
debug!("Connecting to {:?}", &node);
let connection = self.connect_to(&node).await?; // TODO: move into loop
let task_handle = tokio::spawn(async move {
let (mut send_stream, mut recv_stream) = connection.open_bi_stream().await?;
send_stream.send(WireMsg::EndpointEchoReq).await?;
match WireMsg::read_from_stream(&mut recv_stream.quinn_recv_stream).await {
Ok(WireMsg::EndpointEchoResp(socket_addr)) => Ok(socket_addr),
Ok(WireMsg::EndpointEchoResp(socket_addr)) => {
send_stream.send_user_msg(bytes::Bytes::from("OK")).await?;
Ok(socket_addr)
}
Ok(_) => Err(Error::Unexpected("Unexpected message".to_string())),
Err(err) => Err(err),
}
});
tasks.push(task_handle);
}

self.local_addr()

let (result, _) = futures::future::select_ok(tasks).await.map_err(|err| {
log::error!("Failed to contact echo service: {}", err);
Error::BootstrapFailure
})?;
result
}
}
2 changes: 2 additions & 0 deletions src/igd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub(crate) async fn add_port(local_addr: SocketAddr, lease_duration: u32) -> Res

debug!("Found IGD gateway: {:?}", gateway);

debug!("Our local address: {:?}", local_addr);

if let SocketAddr::V4(socket_addr) = local_addr {
let ext_addr = gateway
.get_any_address(
Expand Down
45 changes: 24 additions & 21 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ use crate::{
dirs::Dirs,
error::{Error, Result},
};
use flexi_logger::{DeferredNow, Logger};
use log::Record;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fs::File;
use std::io::{BufReader, BufWriter, Write};
use std::path::Path;
use flexi_logger::{DeferredNow, Logger};
use log::Record;

/// Get the project directory
#[cfg(any(
Expand Down Expand Up @@ -96,24 +96,27 @@ where
}

pub(crate) fn init_logging() {
// Custom formatter for logs
let do_format = move |writer: &mut dyn Write, clock: &mut DeferredNow, record: &Record| {
let handle = std::thread::current();
write!(
writer,
"[{}] {} {} [{}:{}] {}",
handle
.name()
.unwrap_or(&format!("Thread-{:?}", handle.id())),
record.level(),
clock.now().to_rfc3339(),
record.file().unwrap_or_default(),
record.line().unwrap_or_default(),
record.args()
)
};
// Custom formatter for logs
let do_format = move |writer: &mut dyn Write, clock: &mut DeferredNow, record: &Record| {
let handle = std::thread::current();
write!(
writer,
"[{}] {} {} [{}:{}] {}",
handle
.name()
.unwrap_or(&format!("Thread-{:?}", handle.id())),
record.level(),
clock.now().to_rfc3339(),
record.file().unwrap_or_default(),
record.line().unwrap_or_default(),
record.args()
)
};

Logger::with_env()
.format(do_format)
.suppress_timestamp().start().map(|_| ()).unwrap_or(());
Logger::with_env()
.format(do_format)
.suppress_timestamp()
.start()
.map(|_| ())
.unwrap_or(());
}
Loading

0 comments on commit 40217e1

Please sign in to comment.