Skip to content

Commit

Permalink
test(echo_service): complete echo service and upnp impl. with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
lionel-faber committed Nov 10, 2020
1 parent a19cd51 commit 5ea6ca2
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 152 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ unwrap = "1.2.1"
bincode = "1.2.1"
crossbeam-channel = "~0.4.2"
serde_json = "1.0.59"
flexi_logger = "~0.16.1"
structopt = "~0.3.15"
rcgen = "~0.8.4"
log = "~0.4.8"
Expand Down
64 changes: 51 additions & 13 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

#[cfg(feature = "upnp")]
use super::igd;
use super::{
bootstrap_cache::BootstrapCache,
config::{Config, SerialisableCertificate},
Expand All @@ -17,6 +15,7 @@ use super::{
endpoint::Endpoint,
error::{Error, Result},
peer_config::{self, DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC},
utils::init_logging,
};
use bytes::Bytes;
use futures::future::select_ok;
Expand Down Expand Up @@ -113,6 +112,7 @@ impl QuicP2p {
bootstrap_nodes: &[SocketAddr],
use_bootstrap_cache: bool,
) -> Result<Self> {
init_logging();
let cfg = unwrap_config_or_default(cfg)?;
debug!("Config passed in to qp2p: {:?}", cfg);

Expand Down Expand Up @@ -231,11 +231,13 @@ impl QuicP2p {
trace!("Bootstrapping with nodes {:?}", bootstrap_nodes);
// Attempt to connect to all nodes and return the first one to succeed
let mut tasks = Vec::default();
for node_addr in bootstrap_nodes {
for node_addr in bootstrap_nodes.iter().cloned() {
let nodes = bootstrap_nodes.clone();
let endpoint_cfg = self.endpoint_cfg.clone();
let client_cfg = self.client_cfg.clone();
let local_addr = self.local_addr;
let allow_random_port = self.allow_random_port;
#[cfg(feature = "upnp")]
let upnp_lease_duration = self.upnp_lease_duration;
let task_handle = tokio::spawn(async move {
new_connection_to(
Expand All @@ -244,7 +246,10 @@ impl QuicP2p {
client_cfg,
local_addr,
allow_random_port,
#[cfg(feature = "upnp")]
upnp_lease_duration,
#[cfg(feature = "upnp")]
nodes
)
.await
});
Expand Down Expand Up @@ -282,14 +287,28 @@ impl QuicP2p {
/// Ok(())
/// }
/// ```
pub async fn connect_to(&self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {
pub async fn connect_to(&mut self, node_addr: &SocketAddr) -> Result<(Endpoint, Connection)> {

#[cfg(feature = "upnp")]
let bootstrap_nodes: Vec<SocketAddr> = self
.bootstrap_cache
.peers()
.iter()
.rev()
.chain(self.bootstrap_cache.hard_coded_contacts().iter())
.cloned()
.collect();

new_connection_to(
node_addr,
self.endpoint_cfg.clone(),
self.client_cfg.clone(),
self.local_addr,
self.allow_random_port,
#[cfg(feature = "upnp")]
self.upnp_lease_duration,
#[cfg(feature = "upnp")]
bootstrap_nodes
)
.await
}
Expand All @@ -314,6 +333,17 @@ impl QuicP2p {
/// ```
pub fn new_endpoint(&self) -> Result<Endpoint> {
trace!("Creating a new enpoint");

#[cfg(feature = "upnp")]
let bootstrap_nodes: Vec<SocketAddr> = self
.bootstrap_cache
.peers()
.iter()
.rev()
.chain(self.bootstrap_cache.hard_coded_contacts().iter())
.cloned()
.collect();

let (quinn_endpoint, quinn_incoming) = bind(
self.endpoint_cfg.clone(),
self.local_addr,
Expand All @@ -326,7 +356,10 @@ impl QuicP2p {
quinn_endpoint,
quinn_incoming,
self.client_cfg.clone(),
#[cfg(feature = "upnp")]
self.upnp_lease_duration,
#[cfg(feature = "upnp")]
bootstrap_nodes,
)?;

Ok(endpoint)
Expand All @@ -342,7 +375,10 @@ async fn new_connection_to(
client_cfg: quinn::ClientConfig,
local_addr: SocketAddr,
allow_random_port: bool,
#[cfg(feature = "upnp")]
upnp_lease_duration: u32,
#[cfg(feature = "upnp")]
bootstrap_nodes: Vec<SocketAddr>,
) -> Result<(Endpoint, Connection)> {
trace!("Attempting to connect to peer: {}", node_addr);

Expand All @@ -354,7 +390,10 @@ async fn new_connection_to(
quinn_endpoint,
quinn_incoming,
client_cfg,
#[cfg(feature = "upnp")]
upnp_lease_duration,
#[cfg(feature = "upnp")]
bootstrap_nodes,
)?;
let connection = endpoint.connect_to(node_addr).await?;

Expand Down Expand Up @@ -393,17 +432,16 @@ fn bind(
}

// Unwrap the config if provided by the user, otherwise construct the default one
#[cfg(not(feature = "upnp"))]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
cfg.map_or(Config::read_or_construct_default(None), Ok)
}

#[cfg(feature = "upnp")]
fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg);
if cfg.ip.is_none() {
cfg.ip = igd::get_local_ip().ok();
};
// #[cfg(feature = "upnp")]
// fn unwrap_config_or_default(cfg: Option<Config>) -> Result<Config> {
// let mut cfg = cfg.map_or(Config::read_or_construct_default(None)?, |cfg| cfg);
// if cfg.ip.is_none() {
// cfg.ip = igd::get_local_ip().ok();
// };

Ok(cfg)
}
// Ok(cfg)
// }
18 changes: 11 additions & 7 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl Connection {
/// This returns the streams to send additional messages / read responses sent using the same stream.
pub async fn send(&self, msg: Bytes) -> Result<(SendStream, RecvStream)> {
let (mut send_stream, recv_stream) = self.open_bi_stream().await?;
send_stream.send(msg).await?;
send_stream.send_user_msg(msg).await?;
Ok((send_stream, recv_stream))
}

Expand Down Expand Up @@ -238,7 +238,7 @@ impl IncomingMessages {

/// Stream to receive multiple messages
pub struct RecvStream {
quinn_recv_stream: quinn::RecvStream,
pub(crate) quinn_recv_stream: quinn::RecvStream,
}

impl RecvStream {
Expand All @@ -262,24 +262,28 @@ impl SendStream {
Self { quinn_send_stream }
}

/// Send a message using the bi-directional stream created by the initiator
pub async fn send(&mut self, msg: Bytes) -> Result<()> {
/// Send a message using the stream created by the initiator
pub async fn send_user_msg(&mut self, msg: Bytes) -> Result<()> {
send_msg(&mut self.quinn_send_stream, msg).await
}


/// Send a wire message
pub async fn send(&mut self, msg: WireMsg) -> Result<()> {
msg.write_to_stream(&mut self.quinn_send_stream).await
}
/// Gracefully finish current stream
pub async fn finish(mut self) -> Result<()> {
self.quinn_send_stream.finish().await.map_err(Error::from)
}

}

// Helper to read the message's bytes from the provided stream
async fn read_bytes(recv: &mut quinn::RecvStream) -> Result<Bytes> {
match WireMsg::read_from_stream(recv).await? {
WireMsg::UserMsg(msg_bytes) => Ok(msg_bytes),
WireMsg::EndpointEchoReq | WireMsg::EndpointEchoResp(_) => {
// TODO: handle the echo request/response message
unimplemented!("echo message type not supported yet");
Err(Error::UnexpectedMessageType)
}
}
}
Expand Down
Loading

0 comments on commit 5ea6ca2

Please sign in to comment.