Skip to content

Commit

Permalink
feat(echo_service): find if peer is externally reachable if external IP
Browse files Browse the repository at this point in the history
and port is provided manually
  • Loading branch information
lionel-faber committed Feb 12, 2021
1 parent d09b14f commit a9989cc
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 30 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ serde_json = "1.0.59"
structopt = "~0.3.15"
thiserror = "1.0.23"
webpki = "~0.21.3"
flexi_logger = "~0.16.1"

[dependencies.bytes]
version = "1.0.1"
Expand Down Expand Up @@ -49,8 +48,8 @@ flexi_logger = "~0.16.1"
features = [ "rt-threaded", "sync", "time", "macros", "io-std", "io-util" ]

[dev-dependencies]
flexi_logger = "~0.16.1"
anyhow = "1.0.36"
assert_matches = "1.3"
rand = "~0.7.3"

[target."cfg(any(all(unix, not(any(target_os = \"android\", target_os = \"androideabi\", target_os = \"ios\"))), windows))".dependencies]
Expand Down
12 changes: 4 additions & 8 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl QuicP2p {
bootstrap_nodes: &[SocketAddr],
use_bootstrap_cache: bool,
) -> Result<Self> {
crate::utils::init_logging();
let cfg = unwrap_config_or_default(cfg)?;
debug!("Config passed in to qp2p: {:?}", cfg);

Expand Down Expand Up @@ -202,13 +201,10 @@ impl QuicP2p {
.iter()
.map(|addr| Box::pin(endpoint.connect_to(addr)));

future::select_ok(tasks)
.await
.map_err(|err| {
error!("Failed to bootstrap to the network: {}", err);
Error::BootstrapFailure
})?
.0;
future::select_ok(tasks).await.map_err(|err| {
error!("Failed to bootstrap to the network: {}", err);
Error::BootstrapFailure
})?;

Ok((
endpoint,
Expand Down
2 changes: 1 addition & 1 deletion src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ConnectionPool {
.map
.range_mut(Key::min(*addr)..=Key::max(*addr))
.into_iter()
.map(|(key, _)| key.clone())
.map(|(key, _)| *key)
.collect::<Vec<_>>();

keys_to_remove
Expand Down
36 changes: 30 additions & 6 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::QuicP2p;

use super::{
connection_pool::{ConnectionPool, ConnectionRemover},
error::{Error, Result},
Expand All @@ -16,8 +18,11 @@ use bytes::Bytes;
use futures::stream::StreamExt;
use log::{error, trace};
use std::net::SocketAddr;
use tokio::select;
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
select,
time::{timeout, Duration},
};

/// Connection instance to a node which can be used to send messages to it
#[derive(Clone)]
Expand Down Expand Up @@ -216,9 +221,7 @@ pub(super) fn listen_for_incoming_messages(
}

// Returns next message sent by peer in an unidirectional stream.
async fn next_on_uni_streams(
uni_streams: &mut quinn::IncomingUniStreams,
) -> Option<Bytes> {
async fn next_on_uni_streams(uni_streams: &mut quinn::IncomingUniStreams) -> Option<Bytes> {
match uni_streams.next().await {
None => None,
Some(Err(quinn::ConnectionError::ApplicationClosed { .. })) => {
Expand Down Expand Up @@ -273,7 +276,28 @@ async fn next_on_bi_streams(
address_sent,
peer_addr
);
let message = WireMsg::EndpointVerficationResp(address_sent == peer_addr);
// Verify if the peer's endpoint is reachable via EchoServiceReq
let qp2p = QuicP2p::with_config(Default::default(), &[], false).ok()?;
let (temporary_endpoint, _, _, _) = qp2p.new_endpoint().await.ok()?;
let (mut temp_send, mut temp_recv) = temporary_endpoint
.open_bidirectional_stream(&address_sent)
.await
.ok()?;
let message = WireMsg::EndpointEchoReq;
message
.write_to_stream(&mut temp_send.quinn_send_stream)
.await
.ok()?;
let verified = matches!(
timeout(
Duration::from_secs(30),
WireMsg::read_from_stream(&mut temp_recv.quinn_recv_stream)
)
.await,
Ok(Ok(WireMsg::EndpointEchoResp(_)))
);

let message = WireMsg::EndpointVerficationResp(verified);
message.write_to_stream(&mut send).await.ok()?;
trace!("Responded to Endpoint verification request");
Some(Bytes::new())
Expand Down Expand Up @@ -325,7 +349,7 @@ mod tests {

let connection = peer1
.get_connection(&peer2_addr)
.ok_or_else(|| Error::MissingConnection)?;
.ok_or(Error::MissingConnection)?;
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
let message = WireMsg::EndpointEchoReq;
message
Expand Down
24 changes: 13 additions & 11 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use super::{
api::DEFAULT_UPNP_LEASE_DURATION_SEC,
connection_deduplicator::ConnectionDeduplicator,
connection_pool::ConnectionPool,
connections::{listen_for_incoming_connections, listen_for_incoming_messages, Connection, RecvStream, SendStream},
connections::{
listen_for_incoming_connections, listen_for_incoming_messages, Connection, RecvStream,
SendStream,
},
error::Result,
Config,
};
Expand Down Expand Up @@ -124,12 +127,12 @@ impl Endpoint {
// External IP and port number is provided
// This means that the user has performed manual port-forwarding
// Verify that the given socket address is reachable
if let Some(contact) = endpoint.bootstrap_nodes.iter().next() {
if let Some(contact) = endpoint.bootstrap_nodes.get(0) {
info!("Verifying provided public IP address");
endpoint.connect_to(contact).await?;
let connection = endpoint
.get_connection(&contact)
.ok_or_else(|| Error::BootstrapFailure)?; //FIXME
.ok_or(Error::MissingConnection)?;
let (mut send, mut recv) = connection.open_bi().await?;
send.send(WireMsg::EndpointVerificationReq(addr)).await?;
let response = WireMsg::read_from_stream(&mut recv.quinn_recv_stream).await?;
Expand Down Expand Up @@ -374,11 +377,14 @@ impl Endpoint {
}

/// Open a bi-directional peer with a given peer
pub async fn open_bidirectional_stream(&self, peer_addr: &SocketAddr) -> Result<(SendStream, RecvStream)> {
pub async fn open_bidirectional_stream(
&self,
peer_addr: &SocketAddr,
) -> Result<(SendStream, RecvStream)> {
self.connect_to(peer_addr).await?;
let connection = self
.get_connection(peer_addr)
.ok_or_else(|| Error::MissingConnection)?; // will never be None
.ok_or(Error::MissingConnection)?;
connection.open_bi().await
}

Expand All @@ -387,9 +393,7 @@ impl Endpoint {
pub async fn send_message(&self, msg: Bytes, dest: &SocketAddr) -> Result<()> {
self.connect_to(dest).await?;

let connection = self
.get_connection(dest)
.ok_or_else(|| Error::MissingConnection)?; // will never be None
let connection = self.get_connection(dest).ok_or(Error::MissingConnection)?;
connection.send_uni(msg).await?;
Ok(())
}
Expand All @@ -410,9 +414,7 @@ impl Endpoint {
for node in self.bootstrap_nodes.iter().cloned() {
debug!("Connecting to {:?}", &node);
self.connect_to(&node).await?;
let connection = self
.get_connection(&node)
.ok_or_else(|| Error::MissingConnection)?;
let connection = self.get_connection(&node).ok_or(Error::MissingConnection)?;
let task_handle = tokio::spawn(async move {
let (mut send_stream, mut recv_stream) = connection.open_bi().await?;
send_stream.send(WireMsg::EndpointEchoReq).await?;
Expand Down
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ where
Ok(())
}

// #[cfg(test)]
#[cfg(test)]
pub(crate) fn init_logging() {
use flexi_logger::{DeferredNow, Logger};
use log::Record;
Expand Down
2 changes: 1 addition & 1 deletion src/wire_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum WireMsg {
EndpointEchoReq,
EndpointEchoResp(SocketAddr),
EndpointVerificationReq(SocketAddr),
EndpointVerficationResp(bool), // Use result here?
EndpointVerficationResp(bool),
UserMsg(Bytes),
}

Expand Down

0 comments on commit a9989cc

Please sign in to comment.