Skip to content

Commit

Permalink
refactor!: remove bootstrapping; external IP/port
Browse files Browse the repository at this point in the history
  • Loading branch information
b-zee authored and joshuef committed Jan 20, 2023
1 parent c8714f9 commit 5b2aaac
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 351 deletions.
5 changes: 2 additions & 3 deletions examples/p2p_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect();

// create an endpoint for us to listen on and send from.
let (node, mut incoming_conns, _contact) = Endpoint::new_peer(
let (node, mut incoming_conns) = Endpoint::new_peer(
SocketAddr::from((Ipv4Addr::LOCALHOST, 0)),
&[],
Config {
idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
..Default::default()
Expand Down Expand Up @@ -67,7 +66,7 @@ async fn main() -> Result<()> {
}

println!("\n---");
println!("Listening on: {:?}", node.public_addr());
println!("Listening on: {:?}", node.local_addr());
println!("---\n");

// loop over incoming connections
Expand Down
9 changes: 1 addition & 8 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ fn parse_millis(millis: &str) -> Result<Duration, std::num::ParseIntError> {
pub(crate) struct InternalConfig {
pub(crate) client: quinn::ClientConfig,
pub(crate) server: quinn::ServerConfig,
pub(crate) external_port: Option<u16>,
pub(crate) external_ip: Option<IpAddr>,
}

impl InternalConfig {
Expand Down Expand Up @@ -152,12 +150,7 @@ impl InternalConfig {
let mut client = quinn::ClientConfig::new(Arc::new(client_crypto));
let _ = client.transport_config(transport);

Ok(Self {
client,
server,
external_port: config.external_port,
external_ip: config.external_ip,
})
Ok(Self { client, server })
}

fn new_transport_config(
Expand Down
253 changes: 4 additions & 249 deletions src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::{
connection::Connection,
error::{ClientEndpointError, ConnectionError, EndpointError, RpcError},
};
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use tokio::sync::mpsc::{self, error::TryRecvError, Receiver};
use tokio::time::{timeout, Duration};
use tracing::{error, info, trace, warn};
Expand Down Expand Up @@ -50,7 +50,6 @@ impl IncomingConnections {
pub struct Endpoint {
pub(crate) inner: quinn::Endpoint,
pub(crate) local_addr: SocketAddr,
pub(crate) public_addr: Option<SocketAddr>,
}

impl std::fmt::Debug for Endpoint {
Expand All @@ -67,31 +66,10 @@ impl Endpoint {
///
/// A peer endpoint, unlike a [client](Self::new_client) endpoint, can receive incoming
/// connections.
///
/// # Bootstrapping
///
/// When given a non-empty list of `contacts`, this will attempt to 'bootstrap' against them.
/// This involves connecting to all the contacts concurrently and selecting the first
/// successfully connected peer (if any), whose `SocketAddr` will be returned.
///
/// If bootstrapping is successful, the connected peer will be used to perform a reachability
/// check to validate that this endpoint can be reached at its
/// [`public_addr`](Self::public_addr).
///
/// **Note:** if no contacts are given, the [`public_addr`](Self::public_addr) of the endpoint
/// will not have been validated to be reachable by anyone
pub async fn new_peer(
local_addr: impl Into<SocketAddr>,
contacts: &[SocketAddr],
config: Config,
) -> Result<
(
Self,
IncomingConnections,
Option<(Connection, ConnectionIncoming)>,
),
EndpointError,
> {
) -> Result<(Self, IncomingConnections), EndpointError> {
let config = InternalConfig::try_from_config(config)?;

let mut quinn_endpoint = quinn::Endpoint::server(config.server.clone(), local_addr.into())?;
Expand All @@ -102,39 +80,16 @@ impl Endpoint {
// Get actual socket address.
let local_addr = quinn_endpoint.local_addr()?;

let mut endpoint = Self {
let endpoint = Self {
local_addr,
public_addr: None, // we'll set this below
inner: quinn_endpoint,
};

let contact = endpoint.connect_to_any(contacts).await;
let contact_ref = contact.as_ref().map(|c| &c.0);

let public_addr = endpoint
.resolve_public_addr(config.external_ip, config.external_port, contact_ref)
.await?;

endpoint.public_addr = Some(public_addr);

let (connection_tx, connection_rx) = mpsc::channel(STANDARD_CHANNEL_SIZE);

listen_for_incoming_connections(endpoint.inner.clone(), connection_tx);

if let Some(contact) = contact_ref {
let valid = endpoint
.endpoint_verification(contact, public_addr)
.await
.map_err(|error| EndpointError::EndpointVerification {
peer: contact.remote_address(),
error,
})?;
if !valid {
return Err(EndpointError::Unreachable { public_addr });
}
}

Ok((endpoint, IncomingConnections(connection_rx), contact))
Ok((endpoint, IncomingConnections(connection_rx)))
}

/// Create a client endpoint at the given address.
Expand All @@ -157,7 +112,6 @@ impl Endpoint {

let endpoint = Self {
local_addr,
public_addr: None, // we're a client
inner: quinn_endpoint,
};

Expand All @@ -169,11 +123,6 @@ impl Endpoint {
self.local_addr
}

/// Get the public address of the endpoint.
pub fn public_addr(&self) -> SocketAddr {
self.public_addr.unwrap_or(self.local_addr)
}

/// Connect to a peer.
///
/// Atttempts to connect to a peer at the given address.
Expand Down Expand Up @@ -282,118 +231,6 @@ impl Endpoint {
Ok(new_conn)
}

// set an appropriate public address based on `config` and a reachability check.
async fn resolve_public_addr(
&mut self,
config_external_ip: Option<IpAddr>,
config_external_port: Option<u16>,
contact: Option<&Connection>,
) -> Result<SocketAddr, EndpointError> {
let mut public_addr = self.local_addr;

// get the IP seen for us by our contact
let visible_addr = if let Some(contact) = contact {
Some(self.endpoint_echo(contact).await.map_err(|error| {
EndpointError::EndpointEcho {
peer: contact.remote_address(),
error,
}
})?)
} else {
None
};

if let Some(external_ip) = config_external_ip {
// set the public IP based on config
public_addr.set_ip(external_ip);

if let Some(visible_addr) = visible_addr {
// if we set a different external IP than peers can see, we will have a bad time
if visible_addr.ip() != external_ip {
warn!(
"Configured external IP ({}) does not match that seen by peers ({})",
external_ip,
visible_addr.ip()
);
}
}
} else if let Some(visible_addr) = visible_addr {
// set the public IP based on that seen by the peer
public_addr.set_ip(visible_addr.ip());
} else {
// we have no good source for public IP, leave it as the local IP and warn
warn!(
"Could not determine better public IP than local IP ({})",
public_addr.ip()
);
}

if let Some(external_port) = config_external_port {
// set the public port based on config
public_addr.set_port(external_port);

if let Some(visible_addr) = visible_addr {
// if we set a different external IP than peers can see, we will have a bad time
if visible_addr.port() != external_port {
warn!(
"Configured external port ({}) does not match that seen by peers ({})",
external_port,
visible_addr.port()
);
}
}
} else if let Some(visible_addr) = visible_addr {
// set the public port based on that seen by the peer
public_addr.set_port(visible_addr.port());
} else {
// we have no good source for public port, leave it as the local port and warn
warn!(
"Could not determine better public port than local port ({})",
public_addr.port()
);
}

self.public_addr = Some(public_addr);

// Return the address so callers can avoid the optionality of `self.public_addr`
Ok(public_addr)
}

/// Perform the endpoint echo RPC with the given contact.
async fn endpoint_echo(&self, contact: &Connection) -> Result<SocketAddr, RpcError> {
let (mut send, mut recv) = contact.open_bi().await?;

send.send_wire_msg(WireMsg::EndpointEchoReq).await?;

match timeout(ECHO_SERVICE_QUERY_TIMEOUT, recv.read_wire_msg()).await?? {
WireMsg::EndpointEchoResp(addr) => Ok(addr),
msg => Err(RpcError::EchoResponseMissing {
peer: contact.remote_address(),
response: Some(msg.to_string()),
}),
}
}

/// Perform the endpoint verification RPC with the given peer.
async fn endpoint_verification(
&self,
contact: &Connection,
public_addr: SocketAddr,
) -> Result<bool, RpcError> {
let (mut send, mut recv) = contact.open_bi().await?;

send.send_wire_msg(WireMsg::EndpointVerificationReq(public_addr))
.await?;

match timeout(ECHO_SERVICE_QUERY_TIMEOUT, recv.read_wire_msg()).await?? {
WireMsg::EndpointVerificationResp(valid) => Ok(valid),
msg => Err(RpcError::EndpointVerificationRespMissing {
peer: contact.remote_address(),
response: Some(msg.to_string()),
}),
}
}

/// Builder to create an `Endpoint`.
pub fn builder() -> EndpointBuilder {
EndpointBuilder::default()
Expand Down Expand Up @@ -426,85 +263,3 @@ pub(super) fn listen_for_incoming_connections(
);
});
}

#[cfg(test)]
mod tests {
use super::Endpoint;
use crate::{tests::local_addr, Config};
use color_eyre::eyre::Result;
use std::net::SocketAddr;

#[tokio::test]
async fn new_without_external_addr() -> Result<()> {
let (endpoint, _, _) = Endpoint::new_peer(
local_addr(),
&[],
Config {
external_ip: None,
external_port: None,
..Default::default()
},
)
.await?;
assert_eq!(endpoint.public_addr(), endpoint.local_addr());

Ok(())
}

#[tokio::test]
async fn new_with_external_ip() -> Result<()> {
let (endpoint, _, _) = Endpoint::new_peer(
local_addr(),
&[],
Config {
external_ip: Some([123u8, 123, 123, 123].into()),
external_port: None,
..Default::default()
},
)
.await?;
assert_eq!(
endpoint.public_addr(),
SocketAddr::new([123u8, 123, 123, 123].into(), endpoint.local_addr().port())
);

Ok(())
}

#[tokio::test]
async fn new_with_external_port() -> Result<()> {
let (endpoint, _, _) = Endpoint::new_peer(
local_addr(),
&[],
Config {
external_ip: None,
external_port: Some(123),
..Default::default()
},
)
.await?;
assert_eq!(
endpoint.public_addr(),
SocketAddr::new(endpoint.local_addr().ip(), 123)
);

Ok(())
}

#[tokio::test]
async fn new_with_external_addr() -> Result<()> {
let (endpoint, _, _) = Endpoint::new_peer(
local_addr(),
&[],
Config {
external_ip: Some([123u8, 123, 123, 123].into()),
external_port: Some(123),
..Default::default()
},
)
.await?;
assert_eq!(endpoint.public_addr(), "123.123.123.123:123".parse()?);

Ok(())
}
}
2 changes: 0 additions & 2 deletions src/endpoint_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl EndpointBuilder {
Ok((
Endpoint {
local_addr: self.addr,
public_addr: None,
inner: endpoint,
},
IncomingConnections(connection_rx),
Expand All @@ -102,7 +101,6 @@ impl EndpointBuilder {

Ok(Endpoint {
local_addr: self.addr,
public_addr: None,
inner: endpoint,
})
}
Expand Down
Loading

0 comments on commit 5b2aaac

Please sign in to comment.