From 4597cb36e0be5ffcb5ae21a42e4a37648d455aad Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Mon, 3 Jul 2023 19:26:05 +0200 Subject: [PATCH] feat: add MagicEndpoint to iroh-net --- iroh-bytes/src/get.rs | 12 +- iroh-net/Cargo.toml | 1 + iroh-net/examples/magic.rs | 137 +++++++ iroh-net/src/client.rs | 145 -------- iroh-net/src/defaults.rs | 30 ++ iroh-net/src/hp/cfg.rs | 2 +- iroh-net/src/hp/hostinfo.rs | 2 +- iroh-net/src/hp/magicsock.rs | 5 +- iroh-net/src/hp/magicsock/conn.rs | 132 ++++--- iroh-net/src/hp/netmap.rs | 2 +- iroh-net/src/lib.rs | 5 +- iroh-net/src/magic_endpoint.rs | 576 ++++++++++++++++++++++++++++++ iroh-net/src/tls.rs | 2 +- iroh/src/commands.rs | 22 +- iroh/src/commands/doctor.rs | 105 ++---- iroh/src/config.rs | 26 +- iroh/src/node.rs | 69 ++-- iroh/tests/provide.rs | 7 +- 18 files changed, 916 insertions(+), 364 deletions(-) create mode 100644 iroh-net/examples/magic.rs delete mode 100644 iroh-net/src/client.rs create mode 100644 iroh-net/src/defaults.rs create mode 100644 iroh-net/src/magic_endpoint.rs diff --git a/iroh-bytes/src/get.rs b/iroh-bytes/src/get.rs index 7e7099299b..afd8f37a63 100644 --- a/iroh-bytes/src/get.rs +++ b/iroh-bytes/src/get.rs @@ -69,12 +69,12 @@ pub async fn run_ticket( keylog: bool, derp_map: Option, ) -> Result { - let connection = iroh_net::client::dial_peer( - ticket.addrs(), + let connection = iroh_net::MagicEndpoint::dial_peer( ticket.peer(), &crate::P2P_ALPN, - keylog, + ticket.addrs(), derp_map, + keylog, ) .await?; @@ -624,12 +624,12 @@ pub async fn run( request: AnyGetRequest, opts: Options, ) -> anyhow::Result { - let connection = iroh_net::client::dial_peer( - &opts.addrs, + let connection = iroh_net::MagicEndpoint::dial_peer( opts.peer_id, &crate::P2P_ALPN, - opts.keylog, + &opts.addrs, opts.derp_map, + opts.keylog, ) .await?; Ok(run_connection(connection, request)) diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 713ea8f5d4..66e06b9057 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -80,6 +80,7 @@ rtnetlink = "0.12.0" wmi = "0.13" [dev-dependencies] +clap = { version = "4", features = ["derive"] } tokio = { version = "1", features = ["io-util", "sync", "rt", "net", "fs", "macros", "time", "test-util"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/iroh-net/examples/magic.rs b/iroh-net/examples/magic.rs new file mode 100644 index 0000000000..0bcbbf6ab8 --- /dev/null +++ b/iroh-net/examples/magic.rs @@ -0,0 +1,137 @@ +use std::net::SocketAddr; + +use clap::Parser; +use ed25519_dalek::SigningKey as SecretKey; +use iroh_net::{ + defaults::default_derp_map, + hp::derp::{DerpMap, UseIpv4, UseIpv6}, + magic_endpoint::accept_conn, + tls::{Keypair, PeerId}, + MagicEndpoint, +}; +use tracing::{debug, info}; +use url::Url; + +const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/magic/0"; + +#[derive(Debug, Parser)] +struct Cli { + #[clap(short, long)] + secret: Option, + #[clap(short, long, default_value = "n0/iroh/examples/magic/0")] + alpn: String, + #[clap(short, long, default_value = "0")] + bind_port: u16, + #[clap(short, long)] + derp_url: Option, + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +enum Command { + Listen, + Connect { + peer_id: String, + addrs: Option>, + }, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let args = Cli::parse(); + let keypair = match args.secret { + None => { + let keypair = Keypair::generate(); + println!("our secret key: {}", fmt_secret(&keypair)); + keypair + } + Some(key) => parse_secret(&key)?, + }; + + let derp_map = match args.derp_url { + None => default_derp_map(), + Some(url) => { + // TODO: This should be done by the DERP client. + let derp_port = match url.port() { + Some(port) => port, + None => match url.scheme() { + "http" => 80, + "https" => 443, + _ => anyhow::bail!( + "Invalid scheme in DERP URL, only http: and https: schemes are supported." + ), + }, + }; + DerpMap::default_from_node(url, 3478, derp_port, UseIpv4::None, UseIpv6::None) + } + }; + + let endpoint = MagicEndpoint::builder() + .keypair(keypair) + .alpns(vec![args.alpn.to_string().into_bytes()]) + .derp_map(Some(derp_map)) + .bind(args.bind_port) + .await?; + + let me = endpoint.peer_id(); + let local_addr = endpoint.local_addr()?; + println!("magic socket listening on {local_addr:?}"); + println!("our peer id: {me}"); + + match args.command { + Command::Listen => { + while let Some(conn) = endpoint.accept().await { + let (peer_id, alpn, conn) = accept_conn(conn).await?; + info!( + "new connection from {peer_id} with ALPN {alpn} (coming from {})", + conn.remote_address() + ); + tokio::spawn(async move { + let (mut send, mut recv) = conn.accept_bi().await?; + debug!("accepted bi stream, waiting for data..."); + let message = recv.read_to_end(1000).await?; + let message = String::from_utf8(message)?; + println!("received: {message}"); + + let message = format!("hi! you connected to {me}. bye bye"); + send.write_all(message.as_bytes()).await?; + send.finish().await?; + + Ok::<_, anyhow::Error>(()) + }); + } + } + Command::Connect { peer_id, addrs } => { + let peer_id: PeerId = peer_id.parse()?; + let addrs = addrs.unwrap_or_default(); + let conn = endpoint.connect(peer_id, EXAMPLE_ALPN, &addrs).await?; + info!("connected"); + + let (mut send, mut recv) = conn.open_bi().await?; + + let message = format!("hello here's {me}"); + send.write_all(message.as_bytes()).await?; + send.finish().await?; + let message = recv.read_to_end(100).await?; + let message = String::from_utf8(message)?; + println!("received: {message}"); + } + } + Ok(()) +} + +fn fmt_secret(keypair: &Keypair) -> String { + let mut text = data_encoding::BASE32_NOPAD.encode(&keypair.secret().to_bytes()); + text.make_ascii_lowercase(); + text +} +fn parse_secret(secret: &str) -> anyhow::Result { + let bytes: [u8; 32] = data_encoding::BASE32_NOPAD + .decode(secret.to_ascii_uppercase().as_bytes())? + .try_into() + .map_err(|_| anyhow::anyhow!("Invalid secret"))?; + let key = SecretKey::from_bytes(&bytes); + Ok(key.into()) +} diff --git a/iroh-net/src/client.rs b/iroh-net/src/client.rs deleted file mode 100644 index 57615ebe0c..0000000000 --- a/iroh-net/src/client.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::{ - net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, - sync::Arc, - time::Duration, -}; - -use anyhow::{Context, Result}; -use tracing::debug; - -use crate::{ - hp::{self, cfg, derp::DerpMap, netmap}, - tls::{self, Keypair, PeerId}, -}; - -/// Create a quinn client endpoint -/// -/// The *bind_addr* is the address that should be bound locally. Even though this is an -/// outgoing connection a socket must be bound and this is explicit. The main choice to -/// make here is the address family: IPv4 or IPv6. Otherwise you normally bind to the -/// `UNSPECIFIED` address on port `0` thus allowing the kernel to do the right thing. -/// -/// If *peer_id* is present it will verify during the TLS connection setup that the remote -/// connected to has the required [`PeerId`], otherwise this will connect to any peer. -/// -/// The *alpn_protocols* are the list of Application-Layer Protocol Neotiation identifiers -/// you are happy to accept. -/// -/// If *keylog* is `true` and the KEYLOGFILE environment variable is present it will be -/// considered a filename to which the TLS pre-master keys are logged. This can be useful -/// to be able to decrypt captured traffic for debugging purposes. -/// -/// Finally the *derp_map* specifies the DERP servers that can be used to establish this -/// connection. -pub async fn create_endpoint( - bind_addr: SocketAddr, - peer_id: PeerId, - alpn_protocols: Vec>, - keylog: bool, - derp_map: Option, -) -> Result<(quinn::Endpoint, hp::magicsock::Conn)> { - let keypair = Keypair::generate(); - - let tls_client_config = - tls::make_client_config(&keypair, Some(peer_id), alpn_protocols, keylog)?; - let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - - let conn = hp::magicsock::Conn::new(hp::magicsock::Options { - port: bind_addr.port(), - private_key: keypair.secret().clone().into(), - ..Default::default() - }) - .await?; - conn.set_derp_map(derp_map).await?; - - let mut endpoint = quinn::Endpoint::new_with_abstract_socket( - quinn::EndpointConfig::default(), - None, - conn.clone(), - Arc::new(quinn::TokioRuntime), - )?; - - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - client_config.transport_config(Arc::new(transport_config)); - - endpoint.set_default_client_config(client_config); - Ok((endpoint, conn)) -} - -/// Establishes a QUIC connection to the provided peer. -pub async fn dial_peer( - addrs: &[SocketAddr], - peer_id: PeerId, - alpn_protocol: &[u8], - keylog: bool, - derp_map: Option, -) -> Result { - let bind_addr = if addrs.iter().any(|addr| addr.ip().is_ipv6()) { - SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, 0, 0, 0).into() - } else { - SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into() - }; - - let (endpoint, magicsock) = create_endpoint( - bind_addr, - peer_id, - vec![alpn_protocol.to_vec()], - keylog, - derp_map, - ) - .await?; - - // Only a single peer in our network currently. - let node_key: hp::key::node::PublicKey = peer_id.into(); - const DEFAULT_DERP_REGION: u16 = 1; - - let mut addresses = Vec::new(); - let mut endpoints = Vec::new(); - - // Add the provided address as a starting point. - for addr in addrs { - addresses.push(addr.ip()); - endpoints.push(*addr); - } - magicsock - .set_network_map(netmap::NetworkMap { - peers: vec![cfg::Node { - name: None, - addresses, - key: node_key.clone(), - endpoints, - derp: Some(DEFAULT_DERP_REGION), - }], - }) - .await?; - - let addr = magicsock - .get_mapping_addr(&node_key) - .await - .expect("just inserted"); - debug!("connecting to {}: (via {} - {:?})", peer_id, addr, addrs); - let connect = endpoint.connect(addr, "localhost")?; - let connection = connect.await.context("failed connecting to provider")?; - - Ok(connection) -} - -pub fn create_quinn_client( - bind_addr: SocketAddr, - peer_id: Option, - alpn_protocols: Vec>, - keylog: bool, -) -> Result { - let keypair = Keypair::generate(); - - let tls_client_config = tls::make_client_config(&keypair, peer_id, alpn_protocols, keylog)?; - let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - let mut endpoint = quinn::Endpoint::client(bind_addr)?; - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - client_config.transport_config(Arc::new(transport_config)); - - endpoint.set_default_client_config(client_config); - Ok(endpoint) -} diff --git a/iroh-net/src/defaults.rs b/iroh-net/src/defaults.rs new file mode 100644 index 0000000000..60a5b14523 --- /dev/null +++ b/iroh-net/src/defaults.rs @@ -0,0 +1,30 @@ +use std::collections::HashMap; + +use crate::hp::derp::{DerpMap, DerpNode, DerpRegion, UseIpv4, UseIpv6}; + +pub fn default_derp_map() -> DerpMap { + DerpMap { + regions: HashMap::from_iter([(1, default_derp_region())].into_iter()), + } +} + +pub fn default_derp_region() -> DerpRegion { + // The default derper run by number0. + let default_n0_derp = DerpNode { + name: "default-1".into(), + region_id: 1, + host_name: "https://derp.iroh.network".parse().unwrap(), + stun_only: false, + stun_port: 3478, + ipv4: UseIpv4::Some([35, 175, 99, 113].into()), + ipv6: UseIpv6::None, + derp_port: 443, + stun_test_ip: None, + }; + DerpRegion { + region_id: 1, + nodes: vec![default_n0_derp], + avoid: false, + region_code: "default-1".into(), + } +} diff --git a/iroh-net/src/hp/cfg.rs b/iroh-net/src/hp/cfg.rs index 1e9534f9bf..d564e0ec02 100644 --- a/iroh-net/src/hp/cfg.rs +++ b/iroh-net/src/hp/cfg.rs @@ -150,7 +150,7 @@ pub struct PingResult { /// A node or peer in the iroh network. /// /// Nodes are primarily identified by their [`Node::key`]. -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct Node { /// The public key or PeerID, the primary identifier of this node. pub key: key::node::PublicKey, diff --git a/iroh-net/src/hp/hostinfo.rs b/iroh-net/src/hp/hostinfo.rs index e4afcb34c4..6238e3721b 100644 --- a/iroh-net/src/hp/hostinfo.rs +++ b/iroh-net/src/hp/hostinfo.rs @@ -11,7 +11,7 @@ const RUST_VERSION: &str = env!("RUSTC_VERSION"); const GIT_COMMIT: &str = env!("GIT_COMMIT"); /// Contains a summary of the host we are running on. -#[derive(Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub struct Hostinfo { /// Version of this code. pub version: String, diff --git a/iroh-net/src/hp/magicsock.rs b/iroh-net/src/hp/magicsock.rs index 4035d9f91f..423d155343 100644 --- a/iroh-net/src/hp/magicsock.rs +++ b/iroh-net/src/hp/magicsock.rs @@ -9,6 +9,9 @@ mod rebinding_conn; mod timer; mod udp_actor; -pub use self::conn::{Conn, Options}; +pub use self::conn::{Callbacks, Conn, Options}; pub use self::endpoint::EndpointInfo; pub use self::timer::Timer; + +#[cfg(test)] +pub(crate) use conn::tests as conn_tests; diff --git a/iroh-net/src/hp/magicsock/conn.rs b/iroh-net/src/hp/magicsock/conn.rs index 35a9246005..f2a9774599 100644 --- a/iroh-net/src/hp/magicsock/conn.rs +++ b/iroh-net/src/hp/magicsock/conn.rs @@ -101,6 +101,16 @@ pub struct Options { /// Zero means to pick one automatically. pub port: u16, + /// Private key for this node. + pub private_key: key::node::SecretKey, + + /// Callbacks to emit on various socket events + pub callbacks: Callbacks, +} + +/// Contains options for `Conn::listen`. +#[derive(derive_more::Debug, Default)] +pub struct Callbacks { /// Optionally provides a func to be called when endpoints change. #[allow(clippy::type_complexity)] #[debug("on_endpoints: Option>")] @@ -113,18 +123,14 @@ pub struct Options { /// A callback that provides a `cfg::NetInfo` when discovered network conditions change. #[debug("on_net_info: Option>")] pub on_net_info: Option>, - /// Private key for this node. - pub private_key: key::node::SecretKey, } impl Default for Options { fn default() -> Self { Options { port: 0, - on_endpoints: None, - on_derp_active: None, - on_net_info: None, private_key: key::node::SecretKey::generate(), + callbacks: Default::default(), } } } @@ -259,10 +265,13 @@ impl Conn { let Options { port, - on_endpoints, - on_derp_active, - on_net_info, private_key, + callbacks: + Callbacks { + on_endpoints, + on_derp_active, + on_net_info, + }, } = opts; let (network_recv_ch_sender, network_recv_ch_receiver) = flume::bounded(128); @@ -2480,7 +2489,7 @@ impl std::fmt::Display for QuicMappedAddr { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use anyhow::Context; use rand::RngCore; use std::net::Ipv4Addr; @@ -2494,7 +2503,7 @@ mod tests { derp::{DerpNode, DerpRegion, UseIpv4, UseIpv6}, stun, }, - tls, + tls, MagicEndpoint, }; fn make_transmit(destination: SocketAddr) -> quinn_udp::Transmit { @@ -2619,7 +2628,7 @@ mod tests { stun_ip: IpAddr, } - async fn run_derp_and_stun( + pub async fn run_derp_and_stun( stun_ip: IpAddr, ) -> Result<(DerpMap, impl FnOnce() -> BoxFuture<'static, ()>)> { // TODO: pass a mesh_key? @@ -2676,9 +2685,8 @@ mod tests { #[derive(Clone)] struct MagicStack { ep_ch: flume::Receiver>, - key: key::node::SecretKey, - conn: Conn, - quic_ep: quinn::Endpoint, + keypair: tls::Keypair, + endpoint: MagicEndpoint, } const ALPN: [u8; 9] = *b"n0/test/1"; @@ -2687,55 +2695,40 @@ mod tests { async fn new(derp_map: DerpMap) -> Result { let (on_derp_s, mut on_derp_r) = mpsc::channel(8); let (ep_s, ep_r) = flume::bounded(16); - let opts = Options { - on_endpoints: Some(Box::new(move |eps: &[cfg::Endpoint]| { + + let keypair = tls::Keypair::generate(); + + let mut transport_config = quinn::TransportConfig::default(); + transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); + + let endpoint = MagicEndpoint::builder() + .keypair(keypair.clone()) + .on_endpoints(Box::new(move |eps: &[cfg::Endpoint]| { let _ = ep_s.send(eps.to_vec()); - })), - on_derp_active: Some(Box::new(move || { + })) + .on_derp_active(Box::new(move || { on_derp_s.try_send(()).ok(); - })), - ..Default::default() - }; - let key = opts.private_key.clone(); - let conn = Conn::new(opts).await?; - conn.set_derp_map(Some(derp_map)).await?; + })) + .transport_config(transport_config) + .derp_map(Some(derp_map)) + .alpns(vec![ALPN.to_vec()]) + .bind(0) + .await?; tokio::time::timeout(Duration::from_secs(10), on_derp_r.recv()) .await .context("wait for derp connection")?; - let tls_server_config = - tls::make_server_config(&key.clone().into(), vec![ALPN.to_vec()], false)?; - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config)); - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(5))); - transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); - server_config.transport_config(Arc::new(transport_config)); - let mut quic_ep = quinn::Endpoint::new_with_abstract_socket( - quinn::EndpointConfig::default(), - Some(server_config), - conn.clone(), - Arc::new(quinn::TokioRuntime), - )?; - - let tls_client_config = - tls::make_client_config(&key.clone().into(), None, vec![ALPN.to_vec()], false)?; - let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - let mut transport_config = quinn::TransportConfig::default(); - transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); - client_config.transport_config(Arc::new(transport_config)); - quic_ep.set_default_client_config(client_config); - Ok(Self { ep_ch: ep_r, - key, - conn, - quic_ep, + keypair, + endpoint, }) } async fn tracked_endpoints(&self) -> Vec { - self.conn + self.endpoint + .conn() .tracked_endpoints() .await .unwrap_or_default() @@ -2745,7 +2738,8 @@ mod tests { } fn public(&self) -> key::node::PublicKey { - self.key.public_key() + let key: key::node::SecretKey = self.keypair.secret().clone().into(); + key.public_key() } } @@ -2773,7 +2767,7 @@ mod tests { peers.push(cfg::Node { addresses: addresses.clone(), name: Some(format!("node{}", i + 1)), - key: peer.key.public_key(), + key: peer.public(), endpoints: eps[i].iter().map(|ep| ep.addr).collect(), derp: Some(1), }); @@ -2793,7 +2787,7 @@ mod tests { for (i, m) in ms.iter().enumerate() { let nm = build_netmap(eps, ms, i).await; - let _ = m.conn.set_network_map(nm).await; + let _ = m.endpoint.conn().set_network_map(nm).await; } } @@ -2826,7 +2820,7 @@ mod tests { }) } - fn setup_logging() { + pub fn setup_logging() { tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) .with(EnvFilter::from_default_env()) @@ -2872,11 +2866,12 @@ mod tests { let a_name = stringify!($a); let b_name = stringify!($b); println!("{} -> {} ({} bytes)", a_name, b_name, $msg.len()); - println!("[{}] {:?}", a_name, a.conn.local_addr()); - println!("[{}] {:?}", b_name, b.conn.local_addr()); + println!("[{}] {:?}", a_name, a.endpoint.local_addr()); + println!("[{}] {:?}", b_name, b.endpoint.local_addr()); - let a_addr = b.conn.get_mapping_addr(&a.public()).await.unwrap(); - let b_addr = a.conn.get_mapping_addr(&b.public()).await.unwrap(); + let a_addr = b.endpoint.conn().get_mapping_addr(&a.public()).await.unwrap(); + let b_addr = a.endpoint.conn().get_mapping_addr(&b.public()).await.unwrap(); + let b_peer_id = b.endpoint.peer_id(); println!("{}: {}, {}: {}", a_name, a_addr, b_name, b_addr); @@ -2884,7 +2879,7 @@ mod tests { let b_task = tokio::task::spawn( async move { println!("[{}] accepting conn", b_name); - let conn = b.quic_ep.accept().await.expect("no conn"); + let conn = b.endpoint.accept().await.expect("no conn"); println!("[{}] connecting", b_name); let conn = conn @@ -2933,8 +2928,8 @@ mod tests { async move { println!("[{}] connecting to {}", a_name, b_addr); let conn = a - .quic_ep - .connect(b_addr, "localhost")? + .endpoint + .connect(b_peer_id, &ALPN, &[b_addr]) .await .with_context(|| format!("[{}] connect", a_name))?; @@ -2975,7 +2970,7 @@ mod tests { println!("[{}] close", a_name); conn.close(0u32.into(), b"done"); println!("[{}] wait idle", a_name); - a.quic_ep.wait_idle().await; + a.endpoint.endpoint().wait_idle().await; println!("[{}] waiting for channel", a_name); b_task.await??; Ok(()) @@ -3039,16 +3034,11 @@ mod tests { .context("failed to connect peers")?; println!("closing endpoints"); - m1.quic_ep.close(0u32.into(), b"done"); - m2.quic_ep.close(0u32.into(), b"done"); - - println!("closing connection m1"); - m1.conn.close().await?; - assert!(m1.conn.is_closed()); + m1.endpoint.close(0u32.into(), b"done").await?; + m2.endpoint.close(0u32.into(), b"done").await?; - println!("closing connection m2"); - m2.conn.close().await?; - assert!(m2.conn.is_closed()); + assert!(m1.endpoint.conn().is_closed()); + assert!(m2.endpoint.conn().is_closed()); println!("cleaning up"); cleanup().await; diff --git a/iroh-net/src/hp/netmap.rs b/iroh-net/src/hp/netmap.rs index 3f3f60112d..a0270b8b70 100644 --- a/iroh-net/src/hp/netmap.rs +++ b/iroh-net/src/hp/netmap.rs @@ -5,7 +5,7 @@ use super::cfg; /// The local view of the iroh network. /// /// This contains all the peers the local node knows about. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct NetworkMap { pub peers: Vec, } diff --git a/iroh-net/src/lib.rs b/iroh-net/src/lib.rs index bd205141e5..681dd9e9bf 100644 --- a/iroh-net/src/lib.rs +++ b/iroh-net/src/lib.rs @@ -1,7 +1,10 @@ #![recursion_limit = "256"] -pub mod client; +pub mod defaults; pub mod hp; +pub mod magic_endpoint; pub mod net; pub mod tls; pub mod util; + +pub use magic_endpoint::MagicEndpoint; diff --git a/iroh-net/src/magic_endpoint.rs b/iroh-net/src/magic_endpoint.rs new file mode 100644 index 0000000000..8b42126115 --- /dev/null +++ b/iroh-net/src/magic_endpoint.rs @@ -0,0 +1,576 @@ +use std::{ + net::SocketAddr, + sync::{Arc, Mutex}, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use quinn_proto::VarInt; +use tracing::{debug, trace}; + +use crate::{ + hp::{ + self, cfg, + derp::DerpMap, + magicsock::{Callbacks, Conn}, + netmap::NetworkMap, + }, + tls::{self, Keypair, PeerId}, +}; + +/// Builder for [MagicEndpoint] +#[derive(Debug, Default)] +pub struct MagicEndpointBuilder { + keypair: Option, + derp_map: Option, + alpn_protocols: Vec>, + transport_config: Option, + concurrent_connections: Option, + keylog: bool, + callbacks: Callbacks, +} + +impl MagicEndpointBuilder { + /// Set a keypair to authenticate with other peers. + /// + /// This keypair's public key will be the [PeerId] of this endpoint. + /// + /// If not set, a new keypair will be generated. + pub fn keypair(mut self, keypair: Keypair) -> Self { + self.keypair = Some(keypair); + self + } + + /// Set the ALPN protocols that this endpoint will accept on incoming connections. + pub fn alpns(mut self, alpn_protocols: Vec>) -> Self { + self.alpn_protocols = alpn_protocols; + self + } + + /// If *keylog* is `true` and the KEYLOGFILE environment variable is present it will be + /// considered a filename to which the TLS pre-master keys are logged. This can be useful + /// to be able to decrypt captured traffic for debugging purposes. + pub fn keylog(mut self, keylog: bool) -> Self { + self.keylog = keylog; + self + } + + /// Specify the DERP servers that are used by this endpoint. + /// + /// DERP servers are used to discover other peers by [`PeerId`] and also + /// help establish connections between peers by being an initial relay + /// for traffic while assisting in holepunching to establish a direct + /// connection between the peers. + pub fn derp_map(mut self, derp_map: Option) -> Self { + self.derp_map = derp_map; + self + } + + /// Set a custom [quinn::TransportConfig] for this endpoint. + /// + /// The transport config contains parameters governing the QUIC state machine. + /// + /// If unset, the default config is used. Default values should be suitable for most internet + /// applications. Applications protocols which forbid remotely-initiated streams should set + /// `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to zero. + pub fn transport_config(mut self, transport_config: quinn::TransportConfig) -> Self { + self.transport_config = Some(transport_config); + self + } + + /// Maximum number of simultaneous connections to accept. + /// + /// New incoming connections are only accepted if the total number of incoming or outgoing + /// connections is less than this. Outgoing connections are unaffected. + pub fn concurrent_connections(mut self, concurrent_connections: u32) -> Self { + self.concurrent_connections = Some(concurrent_connections); + self + } + + /// Optionally set a callback function to be called when endpoints change. + #[allow(clippy::type_complexity)] + pub fn on_endpoints( + mut self, + on_endpoints: Box, + ) -> Self { + self.callbacks.on_endpoints = Some(on_endpoints); + self + } + + /// Optionally set a callback funcion to be called when a connection is made to a DERP server. + pub fn on_derp_active(mut self, on_derp_active: Box) -> Self { + self.callbacks.on_derp_active = Some(on_derp_active); + self + } + + /// Optionally set a callback function that provides a [cfg::NetInfo] when discovered network conditions change. + pub fn on_net_info( + mut self, + on_net_info: Box, + ) -> Self { + self.callbacks.on_net_info = Some(on_net_info); + self + } + + /// Bind the magic endpoint on the specified socket address. + /// + /// The *bind_port* is the port that should be bound locally. + /// The port will be used to bind an IPv4 and, if supported, and IPv6 socket. + /// You can pass `0` to let the operating system chosse a free port for you. + /// NOTE: This will be improved soon to add support for binding on specific addresses. + pub async fn bind(self, bind_port: u16) -> anyhow::Result { + let keypair = self.keypair.unwrap_or_else(Keypair::generate); + let mut server_config = make_server_config( + &keypair, + self.alpn_protocols, + self.transport_config, + self.keylog, + )?; + if let Some(c) = self.concurrent_connections { + server_config.concurrent_connections(c); + } + MagicEndpoint::bind( + keypair, + bind_port, + Some(server_config), + self.derp_map, + Some(self.callbacks), + self.keylog, + ) + .await + } +} + +fn make_server_config( + keypair: &Keypair, + alpn_protocols: Vec>, + transport_config: Option, + keylog: bool, +) -> anyhow::Result { + let tls_server_config = tls::make_server_config(keypair, alpn_protocols, keylog)?; + let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config)); + server_config.transport_config(Arc::new(transport_config.unwrap_or_default())); + Ok(server_config) +} + +#[derive(Clone, Debug)] +pub struct MagicEndpoint { + keypair: Arc, + conn: Conn, + endpoint: quinn::Endpoint, + netmap: Arc>, + keylog: bool, +} + +impl MagicEndpoint { + /// Build a MagicEndpoint + pub fn builder() -> MagicEndpointBuilder { + MagicEndpointBuilder::default() + } + + /// Connect to a remote endpoint, creating an endpoint on the fly. + /// + /// The PeerId and the ALPN protocol are required. If you happen to know dialable addresses of + /// the remote endpoint, they can be specified and will be used to try and establish a direct + /// connection without involving a DERP server. If no addresses are specified, the endpoint + /// will try to dial the peer through the configured DERP servers. + /// + /// If *derp_map* is set, these DERP servers are used to discover the dialed peer by its + /// [`PeerId`], help establish the connection being an initial relay for traffic and assist in + /// holepunching. + /// + /// If *keylog* is `true` and the KEYLOGFILE environment variable is present it will be + /// considered a filename to which the TLS pre-master keys are logged. This can be useful + /// to be able to decrypt captured traffic for debugging purposes. + pub async fn dial_peer( + peer_id: PeerId, + alpn_protocol: &[u8], + known_addrs: &[SocketAddr], + derp_map: Option, + keylog: bool, + ) -> anyhow::Result { + let endpoint = + MagicEndpoint::bind(Keypair::generate(), 0, None, derp_map, None, keylog).await?; + endpoint + .connect(peer_id, alpn_protocol, known_addrs) + .await + .context("failed to connect to provider") + } + + /// Create a quinn endpoint backed by a magicsock. + /// + /// This is for internal use, the public interface is the [MagicEndpointBuilder] obtained from + /// [Self::builder]. See the methods on the builder for documentation of the parameters. + async fn bind( + keypair: Keypair, + bind_port: u16, + server_config: Option, + derp_map: Option, + callbacks: Option, + keylog: bool, + ) -> anyhow::Result { + let conn = hp::magicsock::Conn::new(hp::magicsock::Options { + port: bind_port, + private_key: keypair.secret().clone().into(), + callbacks: callbacks.unwrap_or_default(), + }) + .await?; + trace!("created magicsock"); + + let derp_map = derp_map.unwrap_or_default(); + conn.set_derp_map(Some(derp_map)) + .await + .context("setting derp map")?; + + let endpoint = quinn::Endpoint::new_with_abstract_socket( + quinn::EndpointConfig::default(), + server_config, + conn.clone(), + Arc::new(quinn::TokioRuntime), + )?; + trace!("created quinn endpoint"); + + Ok(Self { + keypair: Arc::new(keypair), + conn, + endpoint, + netmap: Arc::new(Mutex::new(NetworkMap { peers: vec![] })), + keylog, + }) + } + + /// Accept an incoming connection on the socket. + pub fn accept(&self) -> quinn::Accept<'_> { + self.endpoint.accept() + } + + /// Get the peer id of this endpoint. + pub fn peer_id(&self) -> PeerId { + self.keypair.public().into() + } + + /// Get the keypair of this endpoint. + pub fn keypair(&self) -> &Keypair { + &self.keypair + } + + /// Get the local endpoint addresses on which the underlying magic socket is bound. + /// + /// Returns a tuple of the IPv4 and the optional IPv6 address. + pub fn local_addr(&self) -> anyhow::Result<(SocketAddr, Option)> { + self.conn.local_addr() + } + + /// Get the local and discovered endpoint addresses on which the underlying + /// magic socket is reachable. + /// + /// This list contains both the locally-bound addresses and the endpoint's + /// publicly-reachable addresses, if they could be discovered through + /// STUN or port mapping. + pub async fn local_endpoints(&self) -> anyhow::Result> { + self.conn.local_endpoints().await + } + + /// Connect to a remote endpoint. + /// + /// The PeerId and the ALPN protocol are required. If you happen to know dialable addresses of + /// the remote endpoint, they can be specified and will be used to try and establish a direct + /// connection without involving a DERP server. If no addresses are specified, the endpoint + /// will try to dial the peer through the configured DERP servers. + pub async fn connect( + &self, + peer_id: PeerId, + alpn: &[u8], + known_addrs: &[SocketAddr], + ) -> anyhow::Result { + self.add_known_addrs(peer_id, known_addrs).await?; + + let node_key: hp::key::node::PublicKey = peer_id.into(); + let addr = self.conn.get_mapping_addr(&node_key).await.ok_or_else(|| { + anyhow!("failed to retrieve the mapped address from the magic socket") + })?; + + let client_config = { + let alpn_protocols = vec![alpn.to_vec()]; + let tls_client_config = + tls::make_client_config(&self.keypair, Some(peer_id), alpn_protocols, self.keylog)?; + let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(Duration::from_secs(1))); + client_config.transport_config(Arc::new(transport_config)); + client_config + }; + + debug!( + "connecting to {}: (via {} - {:?})", + peer_id, addr, known_addrs + ); + + // TODO: We'd eventually want to replace "localhost" with something that makes more sense. + let connect = self + .endpoint + .connect_with(client_config, addr, "localhost")?; + + connect.await.context("failed connecting to provider") + } + + /// Inform the magic socket about addresses of the peer. + /// + /// This updates the magic socket's *netmap* with these addresses, which are used as candidates + /// when connecting to this peer (in addition to addresses obtained from a derp server). + pub async fn add_known_addrs( + &self, + peer_id: PeerId, + endpoints: &[SocketAddr], + ) -> anyhow::Result<()> { + const DEFAULT_DERP_REGION: u16 = 1; + + let node_key: hp::key::node::PublicKey = peer_id.into(); + let netmap = { + let mut netmap = self.netmap.lock().unwrap(); + let node = netmap.peers.iter_mut().find(|peer| peer.key == node_key); + if let Some(node) = node { + for endpoint in endpoints { + if !node.endpoints.contains(endpoint) { + node.endpoints.push(*endpoint); + node.addresses.push(endpoint.ip()); + } + } + } else { + let endpoints = endpoints.to_vec(); + let addresses = endpoints.iter().map(|ep| ep.ip()).collect(); + let node = cfg::Node { + name: None, + addresses, + endpoints, + key: node_key.clone(), + derp: Some(DEFAULT_DERP_REGION), + }; + netmap.peers.push(node) + } + netmap.clone() + }; + self.conn.set_network_map(netmap).await?; + Ok(()) + } + + /// Close the QUIC endpoint and the magic socket. + /// + /// This will close all open QUIC connections with the provided error_code and reason. See + /// [quinn::Connection] for details on how these are interpreted. + /// + /// It will then wait for all connections to actually be shutdown, and afterwards + /// close the magic socket. + /// + /// Returns an error if closing the magic socket failed. + /// TODO: Document error cases. + pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> anyhow::Result<()> { + self.endpoint.close(error_code, reason); + self.endpoint.wait_idle().await; + self.conn.close().await?; + Ok(()) + } + + #[cfg(test)] + pub(crate) fn conn(&self) -> &Conn { + &self.conn + } + #[cfg(test)] + pub(crate) fn endpoint(&self) -> &quinn::Endpoint { + &self.endpoint + } +} + +/// Accept an incoming connection and extract the client-provided [`PeerId`] and ALPN protocol. +pub async fn accept_conn( + mut conn: quinn::Connecting, +) -> anyhow::Result<(PeerId, String, quinn::Connection)> { + let alpn = get_alpn(&mut conn).await?; + let conn = conn.await?; + let peer_id = get_peer_id(&conn).await?; + Ok((peer_id, alpn, conn)) +} + +/// Extract the ALPN protocol from the peer's TLS certificate. +pub async fn get_alpn(connecting: &mut quinn::Connecting) -> anyhow::Result { + let data = connecting.handshake_data().await?; + match data.downcast::() { + Ok(data) => match data.protocol { + Some(protocol) => std::string::String::from_utf8(protocol).map_err(Into::into), + None => anyhow::bail!("no ALPN protocol available"), + }, + Err(_) => anyhow::bail!("unknown handshake type"), + } +} + +/// Extract the [`PeerId`] from the peer's TLS certificate. +pub async fn get_peer_id(connection: &quinn::Connection) -> anyhow::Result { + let data = connection.peer_identity(); + match data { + None => anyhow::bail!("no peer certificate found"), + Some(data) => match data.downcast::>() { + Ok(certs) => { + if certs.len() != 1 { + anyhow::bail!( + "expected a single peer certificate, but {} found", + certs.len() + ); + } + let cert = tls::certificate::parse(&certs[0])?; + Ok(cert.peer_id()) + } + Err(_) => anyhow::bail!("invalid peer certificate"), + }, + } +} + +#[cfg(test)] +mod test { + use futures::future::BoxFuture; + + use super::{accept_conn, MagicEndpoint}; + use crate::hp::magicsock::conn_tests::{run_derp_and_stun, setup_logging}; + + const TEST_ALPN: &[u8] = b"n0/iroh/test"; + + async fn setup_pair() -> anyhow::Result<( + MagicEndpoint, + MagicEndpoint, + impl FnOnce() -> BoxFuture<'static, ()>, + )> { + let (derp_map, cleanup) = run_derp_and_stun([127, 0, 0, 1].into()).await?; + + let ep1 = MagicEndpoint::builder() + .alpns(vec![TEST_ALPN.to_vec()]) + .derp_map(Some(derp_map.clone())) + .bind(0) + .await?; + + let ep2 = MagicEndpoint::builder() + .alpns(vec![TEST_ALPN.to_vec()]) + .derp_map(Some(derp_map.clone())) + .bind(0) + .await?; + + Ok((ep1, ep2, cleanup)) + } + + #[tokio::test] + async fn magic_endpoint_connect_close() { + setup_logging(); + let (ep1, ep2, cleanup) = setup_pair().await.unwrap(); + let peer_id_1 = ep1.peer_id(); + + let accept = tokio::spawn(async move { + let conn = ep1.accept().await.unwrap(); + let (_peer_id, _alpn, conn) = accept_conn(conn).await.unwrap(); + let mut stream = conn.accept_uni().await.unwrap(); + ep1.close(23u8.into(), b"badbadnotgood").await.unwrap(); + let res = conn.accept_uni().await; + assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed); + + let res = stream.read_to_end(10).await; + assert_eq!( + res.unwrap_err(), + quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost( + quinn::ConnectionError::LocallyClosed + )) + ); + }); + + let conn = ep2.connect(peer_id_1, TEST_ALPN, &[]).await.unwrap(); + // open a first stream - this does not error before we accept one stream before closing + // on the other peer + let mut stream = conn.open_uni().await.unwrap(); + // now the other peer closed the connection. + stream.write_all(b"hi").await.unwrap(); + // now the other peer closed the connection. + let expected_err = quinn::ConnectionError::ApplicationClosed(quinn::ApplicationClose { + error_code: 23u8.into(), + reason: b"badbadnotgood".to_vec().into(), + }); + let err = conn.closed().await; + assert_eq!(err, expected_err); + + let res = stream.finish().await; + assert_eq!( + res.unwrap_err(), + quinn::WriteError::ConnectionLost(expected_err.clone()) + ); + + let res = conn.open_uni().await; + assert_eq!(res.unwrap_err(), expected_err); + + accept.await.unwrap(); + cleanup().await; + } + + #[tokio::test] + async fn magic_endpoint_bidi_send_recv() { + setup_logging(); + let (ep1, ep2, cleanup) = setup_pair().await.unwrap(); + + let peer_id_1 = ep1.peer_id(); + eprintln!("peer id 1 {peer_id_1}"); + let peer_id_2 = ep2.peer_id(); + eprintln!("peer id 2 {peer_id_2}"); + + let endpoint = ep2.clone(); + let p2_connect = tokio::spawn(async move { + let conn = endpoint.connect(peer_id_1, TEST_ALPN, &[]).await.unwrap(); + let (mut send, mut recv) = conn.open_bi().await.unwrap(); + send.write_all(b"hello").await.unwrap(); + send.finish().await.unwrap(); + let m = recv.read_to_end(100).await.unwrap(); + assert_eq!(&m, b"world"); + }); + + let endpoint = ep1.clone(); + let p1_accept = tokio::spawn(async move { + let conn = endpoint.accept().await.unwrap(); + let (peer_id, alpn, conn) = accept_conn(conn).await.unwrap(); + assert_eq!(peer_id, peer_id_2); + assert_eq!(alpn.as_bytes(), TEST_ALPN); + + let (mut send, mut recv) = conn.accept_bi().await.unwrap(); + let m = recv.read_to_end(100).await.unwrap(); + assert_eq!(m, b"hello"); + + send.write_all(b"world").await.unwrap(); + send.finish().await.unwrap(); + }); + + let endpoint = ep1.clone(); + let p1_connect = tokio::spawn(async move { + let conn = endpoint.connect(peer_id_2, TEST_ALPN, &[]).await.unwrap(); + let (mut send, mut recv) = conn.open_bi().await.unwrap(); + send.write_all(b"ola").await.unwrap(); + send.finish().await.unwrap(); + let m = recv.read_to_end(100).await.unwrap(); + assert_eq!(&m, b"mundo"); + }); + + let endpoint = ep2.clone(); + let p2_accept = tokio::spawn(async move { + let conn = endpoint.accept().await.unwrap(); + let (peer_id, alpn, conn) = accept_conn(conn).await.unwrap(); + assert_eq!(peer_id, peer_id_1); + assert_eq!(alpn.as_bytes(), TEST_ALPN); + + let (mut send, mut recv) = conn.accept_bi().await.unwrap(); + let m = recv.read_to_end(100).await.unwrap(); + assert_eq!(m, b"ola"); + + send.write_all(b"mundo").await.unwrap(); + send.finish().await.unwrap(); + }); + + p1_accept.await.unwrap(); + p2_connect.await.unwrap(); + + p2_accept.await.unwrap(); + p1_connect.await.unwrap(); + + cleanup().await; + } +} diff --git a/iroh-net/src/tls.rs b/iroh-net/src/tls.rs index 3bda985fa3..d72dfaf540 100644 --- a/iroh-net/src/tls.rs +++ b/iroh-net/src/tls.rs @@ -17,7 +17,7 @@ use serde::{Deserialize, Serialize}; use ssh_key::LineEnding; /// A keypair. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Keypair { public: PublicKey, secret: SecretKey, diff --git a/iroh/src/commands.rs b/iroh/src/commands.rs index 4fd2592c4d..bc064dfa04 100644 --- a/iroh/src/commands.rs +++ b/iroh/src/commands.rs @@ -1,11 +1,12 @@ use std::net::{Ipv4Addr, SocketAddrV4}; +use std::sync::Arc; use std::time::Duration; use std::{net::SocketAddr, path::PathBuf}; use anyhow::{Context, Result}; use clap::{Parser, Subcommand}; use iroh_bytes::{cid::Blake3Cid, protocol::RequestToken, provider::Ticket, runtime}; -use iroh_net::{client::create_quinn_client, tls::PeerId}; +use iroh_net::tls::PeerId; use quic_rpc::transport::quinn::QuinnConnection; use quic_rpc::RpcClient; @@ -235,7 +236,7 @@ async fn make_rpc_client( ) -> anyhow::Result>> { let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into(); - let endpoint = create_quinn_client(bind_addr, None, vec![RPC_ALPN.to_vec()], false)?; + let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?; let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port); let server_name = "localhost".to_string(); let connection = QuinnConnection::new(endpoint, addr, server_name); @@ -247,6 +248,23 @@ async fn make_rpc_client( Ok(client) } +pub fn create_quinn_client( + bind_addr: SocketAddr, + alpn_protocols: Vec>, + keylog: bool, +) -> Result { + let keypair = iroh_net::tls::Keypair::generate(); + let tls_client_config = + iroh_net::tls::make_client_config(&keypair, None, alpn_protocols, keylog)?; + let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); + let mut endpoint = quinn::Endpoint::client(bind_addr)?; + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(Duration::from_secs(1))); + client_config.transport_config(Arc::new(transport_config)); + endpoint.set_default_client_config(client_config); + Ok(endpoint) +} + #[cfg(feature = "metrics")] pub fn init_metrics_collection( metrics_addr: Option, diff --git a/iroh/src/commands/doctor.rs b/iroh/src/commands/doctor.rs index 054fdce315..c6433b264d 100644 --- a/iroh/src/commands/doctor.rs +++ b/iroh/src/commands/doctor.rs @@ -2,7 +2,6 @@ //! and to test connectivity to specific other nodes. use std::{ net::SocketAddr, - sync::Arc, time::{Duration, Instant}, }; @@ -17,9 +16,9 @@ use iroh_net::{ self, derp::{DerpMap, UseIpv4, UseIpv6}, key::node::SecretKey, - magicsock, }, - tls::{self, Keypair}, + tls::{Keypair, PeerId, PublicKey}, + MagicEndpoint, }; use postcard::experimental::max_size::MaxSize; use serde::{Deserialize, Serialize}; @@ -443,12 +442,17 @@ fn configure_local_derp_map() -> DerpMap { } const DR_DERP_ALPN: [u8; 11] = *b"n0/drderp/1"; -const DEFAULT_DERP_REGION: u16 = 1; async fn make_endpoint( private_key: SecretKey, derp_map: Option, -) -> anyhow::Result<(magicsock::Conn, quinn::Endpoint)> { +) -> anyhow::Result { + tracing::info!( + "public key: {}", + hex::encode(private_key.public_key().as_bytes()) + ); + tracing::info!("derp map {:#?}", derp_map); + let (on_derp_s, mut on_derp_r) = sync::mpsc::channel(8); let on_net_info = |ni: hp::cfg::NetInfo| { tracing::info!("got net info {:#?}", ni); @@ -463,51 +467,26 @@ async fn make_endpoint( on_derp_s.try_send(()).ok(); }; - tracing::info!( - "public key: {}", - hex::encode(private_key.public_key().as_bytes()) - ); - tracing::info!("derp map {:#?}", derp_map); - let opts = magicsock::Options { - port: 0, - on_endpoints: Some(Box::new(on_endpoints)), - on_derp_active: Some(Box::new(on_derp_active)), - on_net_info: Some(Box::new(on_net_info)), - private_key, - }; - let key = opts.private_key.clone(); - let conn = magicsock::Conn::new(opts).await?; + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(Duration::from_secs(5))); + transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); + + let endpoint = MagicEndpoint::builder() + .keypair(private_key.into()) + .alpns(vec![DR_DERP_ALPN.to_vec()]) + .derp_map(derp_map) + .transport_config(transport_config) + .on_net_info(Box::new(on_net_info)) + .on_endpoints(Box::new(on_endpoints)) + .on_derp_active(Box::new(on_derp_active)) + .bind(0) + .await?; - conn.set_derp_map(derp_map).await?; tokio::time::timeout(Duration::from_secs(10), on_derp_r.recv()) .await .context("wait for derp connection")?; - let tls_server_config = - tls::make_server_config(&key.clone().into(), vec![DR_DERP_ALPN.to_vec()], false)?; - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config)); - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(5))); - transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); - server_config.transport_config(Arc::new(transport_config)); - let mut endpoint = quinn::Endpoint::new_with_abstract_socket( - quinn::EndpointConfig::default(), - Some(server_config), - conn.clone(), - Arc::new(quinn::TokioRuntime), - )?; - - let tls_client_config = tls::make_client_config( - &key.clone().into(), - None, - vec![DR_DERP_ALPN.to_vec()], - false, - )?; - let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - let mut transport_config = quinn::TransportConfig::default(); - transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); - client_config.transport_config(Arc::new(transport_config)); - endpoint.set_default_client_config(client_config); - Ok((conn, endpoint)) + + Ok(endpoint) } async fn connect( @@ -516,36 +495,24 @@ async fn connect( remote_endpoints: Vec, derp_map: Option, ) -> anyhow::Result<()> { - let (conn, endpoint) = make_endpoint(private_key.clone(), derp_map).await?; + let endpoint = make_endpoint(private_key.clone(), derp_map).await?; let bytes = hex::decode(dial)?; let bytes: [u8; 32] = bytes.try_into().ok().context("unexpected key length")?; - let key: hp::key::node::PublicKey = hp::key::node::PublicKey::from(bytes); - - let endpoints = remote_endpoints; - let addresses = endpoints.iter().map(|a| a.ip()).collect(); - conn.set_network_map(hp::netmap::NetworkMap { - peers: vec![hp::cfg::Node { - name: None, - key: key.clone(), - endpoints, - addresses, - derp: Some(DEFAULT_DERP_REGION), - }], - }) - .await?; - let addr = conn.get_mapping_addr(&key).await; - let addr = addr.context("no mapping address")?; - tracing::info!("dialing {:?} at {:?}", key, addr); - let connecting = endpoint.connect(addr, "localhost")?; - match connecting.await { + let peer_id = PeerId::from(PublicKey::from_bytes(&bytes).context("failed to parse PeerId")?); + + tracing::info!("dialing {:?}", peer_id); + let conn = endpoint + .connect(peer_id, &DR_DERP_ALPN, &remote_endpoints) + .await; + match conn { Ok(connection) => { if let Err(cause) = passive_side(connection).await { eprintln!("error handling connection: {cause}"); } } Err(cause) => { - eprintln!("unable to connect to {addr}: {cause}"); + eprintln!("unable to connect to {peer_id}: {cause}"); } } @@ -566,9 +533,9 @@ async fn accept( config: TestConfig, derp_map: Option, ) -> anyhow::Result<()> { - let (conn, endpoint) = make_endpoint(private_key.clone(), derp_map).await?; + let endpoint = make_endpoint(private_key.clone(), derp_map).await?; - let endpoints = conn.local_endpoints().await?; + let endpoints = endpoint.local_endpoints().await?; let remote_addrs = endpoints .iter() .map(|endpoint| format!("--remote-endpoint {}", format_addr(endpoint.addr))) diff --git a/iroh/src/config.rs b/iroh/src/config.rs index 64441b8f63..2ac2761024 100644 --- a/iroh/src/config.rs +++ b/iroh/src/config.rs @@ -8,7 +8,10 @@ use std::{ use anyhow::{anyhow, Result}; use config::{Environment, File, Value}; -use iroh_net::hp::derp::{DerpMap, DerpNode, DerpRegion, UseIpv4, UseIpv6}; +use iroh_net::{ + defaults::default_derp_region, + hp::derp::{DerpMap, DerpRegion}, +}; use serde::{Deserialize, Serialize}; use tracing::debug; @@ -99,27 +102,6 @@ impl Config { } } -fn default_derp_region() -> DerpRegion { - // The default derper run by number0. - let default_n0_derp = DerpNode { - name: "default-1".into(), - region_id: 1, - host_name: "https://derp.iroh.network".parse().unwrap(), - stun_only: false, - stun_port: 3478, - ipv4: UseIpv4::Some("35.175.99.113".parse().unwrap()), - ipv6: UseIpv6::None, - derp_port: 443, - stun_test_ip: None, - }; - DerpRegion { - region_id: 1, - nodes: vec![default_n0_derp], - avoid: false, - region_code: "default-1".into(), - } -} - /// Name of directory that wraps all iroh files in a given application directory const IROH_DIR: &str = "iroh"; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index f2e5675adc..f79ba4fbf3 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -28,6 +28,7 @@ use iroh_bytes::{ use iroh_net::{ hp::{cfg::Endpoint, derp::DerpMap}, tls::{self, Keypair, PeerId}, + MagicEndpoint, }; use quic_rpc::server::RpcChannel; use quic_rpc::transport::flume::FlumeConnection; @@ -205,46 +206,27 @@ where pub async fn spawn(self) -> Result { trace!("spawning node"); let rt = self.rt.context("runtime not set")?; - let tls_server_config = tls::make_server_config( - &self.keypair, - PROTOCOLS.iter().map(|p| p.to_vec()).collect(), - self.keylog, - )?; - let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config)); + + let (endpoints_update_s, endpoints_update_r) = flume::bounded(1); let mut transport_config = quinn::TransportConfig::default(); transport_config .max_concurrent_bidi_streams(MAX_STREAMS.try_into()?) .max_concurrent_uni_streams(0u32.into()); - server_config - .transport_config(Arc::new(transport_config)) - .concurrent_connections(MAX_CONNECTIONS); - - let (endpoints_update_s, endpoints_update_r) = flume::bounded(1); - let conn = iroh_net::hp::magicsock::Conn::new(iroh_net::hp::magicsock::Options { - port: self.bind_addr.port(), - private_key: self.keypair.secret().clone().into(), - on_endpoints: Some(Box::new(move |eps| { + let endpoint = MagicEndpoint::builder() + .keypair(self.keypair.clone()) + .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) + .keylog(self.keylog) + .derp_map(self.derp_map) + .transport_config(transport_config) + .concurrent_connections(MAX_CONNECTIONS) + .on_endpoints(Box::new(move |eps| { if !endpoints_update_s.is_disconnected() && !eps.is_empty() { endpoints_update_s.send(()).ok(); } - })), - ..Default::default() - }) - .await?; - trace!("created magicsock"); - - let derp_map = self.derp_map.unwrap_or_default(); - conn.set_derp_map(Some(derp_map)) - .await - .context("setting derp map")?; - - let endpoint = quinn::Endpoint::new_with_abstract_socket( - quinn::EndpointConfig::default(), - Some(server_config), - conn.clone(), - Arc::new(quinn::TokioRuntime), - )?; + })) + .bind(self.bind_addr.port()) + .await?; trace!("created quinn endpoint"); @@ -262,7 +244,7 @@ where let rt3 = rt.clone(); let inner = Arc::new(NodeInner { db: self.db, - conn, + endpoint: endpoint.clone(), keypair: self.keypair, events, controller, @@ -305,7 +287,7 @@ where #[allow(clippy::too_many_arguments)] async fn run( - server: quinn::Endpoint, + server: MagicEndpoint, events: broadcast::Sender, handler: RpcHandler, rpc: E, @@ -316,8 +298,12 @@ where ) { let rpc = RpcServer::new(rpc); let internal_rpc = RpcServer::new(internal_rpc); - if let Ok(addr) = server.local_addr() { - debug!("listening at: {addr}"); + if let Ok((ipv4, ipv6)) = server.local_addr() { + debug!( + "listening at: {}{}", + ipv4, + ipv6.map(|addr| format!(" and {addr}")).unwrap_or_default() + ); } let cancel_token = handler.inner.cancel_token.clone(); loop { @@ -379,7 +365,10 @@ where // ConnectionError::LocallyClosed. All streams are interrupted, this is not // graceful. let error_code = Closed::ProviderTerminating; - server.close(error_code.into(), error_code.reason()); + server + .close(error_code.into(), error_code.reason()) + .await + .ok(); } } @@ -425,7 +414,7 @@ pub struct Node { #[derive(Debug)] struct NodeInner { db: Database, - conn: iroh_net::hp::magicsock::Conn, + endpoint: MagicEndpoint, keypair: Keypair, events: broadcast::Sender, cancel_token: CancellationToken, @@ -512,7 +501,7 @@ impl Node { impl NodeInner { async fn local_endpoints(&self) -> Result> { - self.conn.local_endpoints().await + self.endpoint.local_endpoints().await } async fn local_endpoint_addresses(&self) -> Result> { @@ -521,7 +510,7 @@ impl NodeInner { } fn local_address(&self) -> Result> { - let (v4, v6) = self.conn.local_addr()?; + let (v4, v6) = self.endpoint.local_addr()?; let mut addrs = vec![v4]; if let Some(v6) = v6 { addrs.push(v6); diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 3a7781b1a1..17cb9b7212 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -9,7 +9,7 @@ use anyhow::{anyhow, bail, Context, Result}; use bytes::Bytes; use futures::{future::BoxFuture, FutureExt}; use iroh::node::{Event, Node}; -use iroh_net::client::dial_peer; +use iroh_net::MagicEndpoint; use rand::RngCore; use testdir::testdir; use tokio::{fs, io::AsyncWriteExt, sync::broadcast}; @@ -583,7 +583,8 @@ async fn test_run_fsm() { let addrs = node.local_endpoint_addresses().await.unwrap(); let peer_id = node.peer_id(); tokio::time::timeout(Duration::from_secs(10), async move { - let connection = dial_peer(&addrs, peer_id, &iroh_bytes::P2P_ALPN, true, None).await?; + let connection = + MagicEndpoint::dial_peer(peer_id, &iroh_bytes::P2P_ALPN, &addrs, None, true).await?; let request = GetRequest::all(hash).into(); let stream = get::run_connection(connection, request); let (collection, children, _) = aggregate_get_response(stream).await?; @@ -788,7 +789,7 @@ async fn test_token_passthrough() -> Result<()> { let addrs = provider.local_endpoint_addresses().await?; let peer_id = provider.peer_id(); tokio::time::timeout(Duration::from_secs(10), async move { - dial_peer(&addrs, peer_id, &iroh_bytes::P2P_ALPN, true, None).await?; + MagicEndpoint::dial_peer(peer_id, &iroh_bytes::P2P_ALPN, &addrs, None, true).await?; let request = GetRequest::all(hash).with_token(token).into(); let response = get::run( request,