Skip to content

Commit

Permalink
fix(manual-port-forwarding): use the existing endpoint to validate
Browse files Browse the repository at this point in the history
EndpointVerificationReq
  • Loading branch information
lionel-faber authored and joshuef committed Apr 2, 2021
1 parent bbf0a31 commit 56e6626
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
25 changes: 15 additions & 10 deletions src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// specific language governing permissions and limitations relating to use of the SAFE Network
// Software.

use crate::QuicP2p;
use crate::Endpoint;

use super::{
connection_pool::{ConnectionPool, ConnectionRemover},
Expand Down Expand Up @@ -144,6 +144,7 @@ pub(super) fn listen_for_incoming_connections(
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
connection_tx: UnboundedSender<SocketAddr>,
disconnection_tx: UnboundedSender<SocketAddr>,
endpoint: Endpoint,
) {
let _ = tokio::spawn(async move {
loop {
Expand All @@ -164,6 +165,7 @@ pub(super) fn listen_for_incoming_connections(
pool_handle,
message_tx.clone(),
disconnection_tx.clone(),
endpoint.clone(),
);
}
Err(err) => {
Expand All @@ -186,12 +188,13 @@ pub(super) fn listen_for_incoming_messages(
remover: ConnectionRemover,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
disconnection_tx: UnboundedSender<SocketAddr>,
endpoint: Endpoint,
) {
let src = *remover.remote_addr();
let _ = tokio::spawn(async move {
let _ = future::join(
read_on_uni_streams(&mut uni_streams, src, message_tx.clone()),
read_on_bi_streams(&mut bi_streams, src, message_tx),
read_on_bi_streams(&mut bi_streams, src, message_tx, &endpoint),
)
.await;

Expand Down Expand Up @@ -245,6 +248,7 @@ async fn read_on_bi_streams(
bi_streams: &mut quinn::IncomingBiStreams,
peer_addr: SocketAddr,
message_tx: UnboundedSender<(SocketAddr, Bytes)>,
endpoint: &Endpoint,
) {
while let Some(result) = bi_streams.next().await {
match result {
Expand Down Expand Up @@ -273,9 +277,13 @@ async fn read_on_bi_streams(
}
}
Ok(WireMsg::EndpointVerificationReq(address_sent)) => {
if let Err(error) =
handle_endpoint_verification_req(peer_addr, address_sent, &mut send)
.await
if let Err(error) = handle_endpoint_verification_req(
peer_addr,
address_sent,
&mut send,
endpoint,
)
.await
{
error!("Failed to handle Endpoint verification request for peer {:?} with error: {}", peer_addr, error);
}
Expand Down Expand Up @@ -315,18 +323,15 @@ async fn handle_endpoint_verification_req(
peer_addr: SocketAddr,
addr_sent: SocketAddr,
send_stream: &mut quinn::SendStream,
endpoint: &Endpoint,
) -> Result<()> {
trace!(
"Received Endpoint verification request {:?} from {:?}",
addr_sent,
peer_addr
);
// Verify if the peer's endpoint is reachable via EchoServiceReq
let qp2p = QuicP2p::with_config(Default::default(), &[], false)?;
let (temporary_endpoint, _, _, _) = qp2p.new_endpoint().await?;
let (mut temp_send, mut temp_recv) = temporary_endpoint
.open_bidirectional_stream(&addr_sent)
.await?;
let (mut temp_send, mut temp_recv) = endpoint.open_bidirectional_stream(&addr_sent).await?;
let message = WireMsg::EndpointEchoReq;
message
.write_to_stream(&mut temp_send.quinn_send_stream)
Expand Down
2 changes: 2 additions & 0 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl Endpoint {
message_tx,
connection_tx,
disconnection_tx,
endpoint.clone(),
);

Ok((
Expand Down Expand Up @@ -367,6 +368,7 @@ impl Endpoint {
guard,
self.message_tx.clone(),
self.disconnection_tx.clone(),
self.clone(),
);

self.connection_deduplicator
Expand Down

0 comments on commit 56e6626

Please sign in to comment.