From 9583729420a74dfd80cbcc88a9e23f4ddf7662d3 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Mon, 16 Sep 2024 15:12:12 +0200 Subject: [PATCH] refactor(iroh-gossip): make use of Endpoint::direct_addresses in iroh_gossip::net (#2731) ## Description `iroh_gossip::net::Gossip` had a method to update the direct addresses of the iroh_net::Endpoint. IIRC that dates back when the Endpoint only exposed a single, non-cloneable stream of endpoint events, which was handled in the iroh Node and thus had to be forwarded to consumers like Gossip. This was long ago and by now the Endpoint provides a capable API for this. Therefore there is no need to expose this as a method (because Gossip always contains an Endpoint). ## Breaking Changes `Gossip::update_direct_addresses` is removed. Updating the direct addresses is now handled by `Gossip` automatically. ## Notes & open questions To react to all updates to the endpoint's `NodeAddr`, I am watching both `Endpoint::direct_addresses` and `Endpoint::watch_home_relay`. A simpler API for this usecase would be `Endpoint::watch_node_addr`. ## Change checklist - [x] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [x] All breaking changes documented. --- iroh-gossip/src/net.rs | 77 +++++++++++++++++++----------------------- iroh/src/node.rs | 27 --------------- 2 files changed, 34 insertions(+), 70 deletions(-) diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index a7d0b08d01..88009f8904 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -11,7 +11,7 @@ use futures_util::TryFutureExt; use iroh_metrics::inc; use iroh_net::{ dialer::Dialer, - endpoint::{get_remote_node_id, Connection}, + endpoint::{get_remote_node_id, Connection, DirectAddr}, key::PublicKey, AddrInfo, Endpoint, NodeAddr, NodeId, }; @@ -54,8 +54,6 @@ const SEND_QUEUE_CAP: usize = 64; const TO_ACTOR_CAP: usize = 64; /// Channel capacity for the InEvent message queue (single) const IN_EVENT_CAP: usize = 1024; -/// Channel capacity for endpoint change message queue (single) -const ON_ENDPOINTS_CAP: usize = 64; /// Name used for logging when new node addresses are added from gossip. const SOURCE_NAME: &str = "gossip"; @@ -90,7 +88,6 @@ type ProtoMessage = proto::Message; #[derive(Debug, Clone)] pub struct Gossip { to_actor_tx: mpsc::Sender, - on_direct_addrs_tx: mpsc::Sender>, _actor_handle: Arc>, max_message_size: usize, } @@ -108,7 +105,6 @@ impl Gossip { ); let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP); let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP); - let (on_endpoints_tx, on_endpoints_rx) = mpsc::channel(ON_ENDPOINTS_CAP); let me = endpoint.node_id().fmt_short(); let max_message_size = state.max_message_size(); @@ -119,7 +115,6 @@ impl Gossip { to_actor_rx, in_event_rx, in_event_tx, - on_direct_addr_rx: on_endpoints_rx, timers: Timers::new(), command_rx: StreamGroup::new().keyed(), peers: Default::default(), @@ -138,7 +133,6 @@ impl Gossip { ); Self { to_actor_tx, - on_direct_addrs_tx: on_endpoints_tx, _actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)), max_message_size, } @@ -222,23 +216,6 @@ impl Gossip { .try_flatten_stream() } - /// Set info on our direct addresses. - /// - /// This will be sent to peers on Neighbor and Join requests so that they can connect directly - /// to us. - /// - /// This is only best effort, and will drop new events if backed up. - pub fn update_direct_addresses( - &self, - addrs: &[iroh_net::endpoint::DirectAddr], - ) -> anyhow::Result<()> { - let addrs = addrs.to_vec(); - self.on_direct_addrs_tx - .try_send(addrs) - .map_err(|_| anyhow!("endpoints channel dropped"))?; - Ok(()) - } - async fn send(&self, event: ToActor) -> anyhow::Result<()> { self.to_actor_tx .send(event) @@ -274,8 +251,6 @@ struct Actor { in_event_tx: mpsc::Sender, /// Input events to the state (emitted from the connection loops) in_event_rx: mpsc::Receiver, - /// Updates of discovered endpoint addresses - on_direct_addr_rx: mpsc::Receiver>, /// Queued timers timers: Timers, /// Map of topics to their state. @@ -292,6 +267,21 @@ struct Actor { impl Actor { pub async fn run(mut self) -> anyhow::Result<()> { + // Watch for changes in direct addresses to update our peer data. + let mut direct_addresses_stream = self.endpoint.direct_addresses(); + // Watch for changes of our home relay to update our peer data. + let mut home_relay_stream = self.endpoint.watch_home_relay(); + + // With each gossip message we provide addressing information to reach our node. + // We wait until at least one direct address is discovered. + let mut current_addresses = direct_addresses_stream + .next() + .await + .ok_or_else(|| anyhow!("Failed to discover direct addresses"))?; + let peer_data = our_peer_data(&self.endpoint, ¤t_addresses)?; + self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()) + .await?; + let mut i = 0; loop { i += 1; @@ -314,24 +304,16 @@ impl Actor { trace!(?i, "tick: command_rx"); self.handle_command(topic, key, command).await?; }, - new_endpoints = self.on_direct_addr_rx.recv() => { + Some(new_addresses) = direct_addresses_stream.next() => { trace!(?i, "tick: new_endpoints"); - match new_endpoints { - Some(endpoints) => { - inc!(Metrics, actor_tick_endpoint); - let addr = NodeAddr::from_parts( - self.endpoint.node_id(), - self.endpoint.home_relay(), - endpoints.into_iter().map(|x| x.addr).collect(), - ); - let peer_data = encode_peer_data(&addr.info)?; - self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?; - } - None => { - debug!("endpoint change handle dropped, stopping gossip actor"); - break; - } - } + inc!(Metrics, actor_tick_endpoint); + current_addresses = new_addresses; + let peer_data = our_peer_data(&self.endpoint, ¤t_addresses)?; + self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?; + } + Some(_relay_url) = home_relay_stream.next() => { + let peer_data = our_peer_data(&self.endpoint, ¤t_addresses)?; + self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?; } (peer_id, res) = self.dialer.next_conn() => { trace!(?i, "tick: dialer"); @@ -822,6 +804,15 @@ impl Stream for TopicCommandStream { } } +fn our_peer_data(endpoint: &Endpoint, direct_addresses: &[DirectAddr]) -> Result { + let addr = NodeAddr::from_parts( + endpoint.node_id(), + endpoint.home_relay(), + direct_addresses.iter().map(|x| x.addr).collect(), + ); + encode_peer_data(&addr.info) +} + #[cfg(test)] mod test { use std::time::Duration; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 94163cb745..06b85e89bf 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -52,7 +52,6 @@ use iroh_blobs::protocol::Closed; use iroh_blobs::store::Store as BaoStore; use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle}; use iroh_docs::net::DOCS_ALPN; -use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::endpoint::{DirectAddrsStream, RemoteInfo}; use iroh_net::{AddrInfo, Endpoint, NodeAddr}; use protocol::BlobsProtocol; @@ -288,19 +287,6 @@ impl NodeInner { let external_rpc = RpcServer::new(external_rpc); let internal_rpc = RpcServer::new(internal_rpc); - let gossip = protocols - .get_typed::(GOSSIP_ALPN) - .expect("missing gossip"); - - // TODO(frando): I think this is not needed as we do the same in a task just below. - // forward the initial endpoints to the gossip protocol. - // it may happen the the first endpoint update callback is missed because the gossip cell - // is only initialized once the endpoint is fully bound - if let Some(direct_addresses) = self.endpoint.direct_addresses().next().await { - debug!(me = ?self.endpoint.node_id(), "gossip initial update: {direct_addresses:?}"); - gossip.update_direct_addresses(&direct_addresses).ok(); - } - // Spawn a task for the garbage collection. if let GcPolicy::Interval(gc_period) = gc_policy { let protocols = protocols.clone(); @@ -396,19 +382,6 @@ impl NodeInner { ); } - // Spawn a task that updates the gossip endpoints. - let inner = self.clone(); - join_set.spawn(async move { - let mut stream = inner.endpoint.direct_addresses(); - while let Some(eps) = stream.next().await { - if let Err(err) = gossip.update_direct_addresses(&eps) { - warn!("Failed to update direct addresses for gossip: {err:?}"); - } - } - warn!("failed to retrieve local endpoints"); - Ok(()) - }); - loop { tokio::select! { biased;