Skip to content

Commit

Permalink
refactor(iroh-gossip): make use of Endpoint::direct_addresses in iroh…
Browse files Browse the repository at this point in the history
…_gossip::net (#2731)

## Description

`iroh_gossip::net::Gossip` had a method to update the direct addresses
of the iroh_net::Endpoint. IIRC that dates back when the Endpoint only
exposed a single, non-cloneable stream of endpoint events, which was
handled in the iroh Node and thus had to be forwarded to consumers like
Gossip. This was long ago and by now the Endpoint provides a capable API
for this. Therefore there is no need to expose this as a method (because
Gossip always contains an Endpoint).

<!-- A summary of what this pull request achieves and a rough list of
changes. -->

## Breaking Changes

`Gossip::update_direct_addresses` is removed. Updating the direct
addresses is now handled by `Gossip` automatically.

## Notes & open questions

To react to all updates to the endpoint's `NodeAddr`, I am watching both
`Endpoint::direct_addresses` and `Endpoint::watch_home_relay`. A simpler
API for this usecase would be `Endpoint::watch_node_addr`.

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [ ] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
Frando committed Sep 16, 2024
1 parent 28cf153 commit 9583729
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 70 deletions.
77 changes: 34 additions & 43 deletions iroh-gossip/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures_util::TryFutureExt;
use iroh_metrics::inc;
use iroh_net::{
dialer::Dialer,
endpoint::{get_remote_node_id, Connection},
endpoint::{get_remote_node_id, Connection, DirectAddr},
key::PublicKey,
AddrInfo, Endpoint, NodeAddr, NodeId,
};
Expand Down Expand Up @@ -54,8 +54,6 @@ const SEND_QUEUE_CAP: usize = 64;
const TO_ACTOR_CAP: usize = 64;
/// Channel capacity for the InEvent message queue (single)
const IN_EVENT_CAP: usize = 1024;
/// Channel capacity for endpoint change message queue (single)
const ON_ENDPOINTS_CAP: usize = 64;
/// Name used for logging when new node addresses are added from gossip.
const SOURCE_NAME: &str = "gossip";

Expand Down Expand Up @@ -90,7 +88,6 @@ type ProtoMessage = proto::Message<PublicKey>;
#[derive(Debug, Clone)]
pub struct Gossip {
to_actor_tx: mpsc::Sender<ToActor>,
on_direct_addrs_tx: mpsc::Sender<Vec<iroh_net::endpoint::DirectAddr>>,
_actor_handle: Arc<AbortOnDropHandle<()>>,
max_message_size: usize,
}
Expand All @@ -108,7 +105,6 @@ impl Gossip {
);
let (to_actor_tx, to_actor_rx) = mpsc::channel(TO_ACTOR_CAP);
let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP);
let (on_endpoints_tx, on_endpoints_rx) = mpsc::channel(ON_ENDPOINTS_CAP);

let me = endpoint.node_id().fmt_short();
let max_message_size = state.max_message_size();
Expand All @@ -119,7 +115,6 @@ impl Gossip {
to_actor_rx,
in_event_rx,
in_event_tx,
on_direct_addr_rx: on_endpoints_rx,
timers: Timers::new(),
command_rx: StreamGroup::new().keyed(),
peers: Default::default(),
Expand All @@ -138,7 +133,6 @@ impl Gossip {
);
Self {
to_actor_tx,
on_direct_addrs_tx: on_endpoints_tx,
_actor_handle: Arc::new(AbortOnDropHandle::new(actor_handle)),
max_message_size,
}
Expand Down Expand Up @@ -222,23 +216,6 @@ impl Gossip {
.try_flatten_stream()
}

/// Set info on our direct addresses.
///
/// This will be sent to peers on Neighbor and Join requests so that they can connect directly
/// to us.
///
/// This is only best effort, and will drop new events if backed up.
pub fn update_direct_addresses(
&self,
addrs: &[iroh_net::endpoint::DirectAddr],
) -> anyhow::Result<()> {
let addrs = addrs.to_vec();
self.on_direct_addrs_tx
.try_send(addrs)
.map_err(|_| anyhow!("endpoints channel dropped"))?;
Ok(())
}

async fn send(&self, event: ToActor) -> anyhow::Result<()> {
self.to_actor_tx
.send(event)
Expand Down Expand Up @@ -274,8 +251,6 @@ struct Actor {
in_event_tx: mpsc::Sender<InEvent>,
/// Input events to the state (emitted from the connection loops)
in_event_rx: mpsc::Receiver<InEvent>,
/// Updates of discovered endpoint addresses
on_direct_addr_rx: mpsc::Receiver<Vec<iroh_net::endpoint::DirectAddr>>,
/// Queued timers
timers: Timers<Timer>,
/// Map of topics to their state.
Expand All @@ -292,6 +267,21 @@ struct Actor {

impl Actor {
pub async fn run(mut self) -> anyhow::Result<()> {
// Watch for changes in direct addresses to update our peer data.
let mut direct_addresses_stream = self.endpoint.direct_addresses();
// Watch for changes of our home relay to update our peer data.
let mut home_relay_stream = self.endpoint.watch_home_relay();

// With each gossip message we provide addressing information to reach our node.
// We wait until at least one direct address is discovered.
let mut current_addresses = direct_addresses_stream
.next()
.await
.ok_or_else(|| anyhow!("Failed to discover direct addresses"))?;
let peer_data = our_peer_data(&self.endpoint, &current_addresses)?;
self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now())
.await?;

let mut i = 0;
loop {
i += 1;
Expand All @@ -314,24 +304,16 @@ impl Actor {
trace!(?i, "tick: command_rx");
self.handle_command(topic, key, command).await?;
},
new_endpoints = self.on_direct_addr_rx.recv() => {
Some(new_addresses) = direct_addresses_stream.next() => {
trace!(?i, "tick: new_endpoints");
match new_endpoints {
Some(endpoints) => {
inc!(Metrics, actor_tick_endpoint);
let addr = NodeAddr::from_parts(
self.endpoint.node_id(),
self.endpoint.home_relay(),
endpoints.into_iter().map(|x| x.addr).collect(),
);
let peer_data = encode_peer_data(&addr.info)?;
self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?;
}
None => {
debug!("endpoint change handle dropped, stopping gossip actor");
break;
}
}
inc!(Metrics, actor_tick_endpoint);
current_addresses = new_addresses;
let peer_data = our_peer_data(&self.endpoint, &current_addresses)?;
self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?;
}
Some(_relay_url) = home_relay_stream.next() => {
let peer_data = our_peer_data(&self.endpoint, &current_addresses)?;
self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()).await?;
}
(peer_id, res) = self.dialer.next_conn() => {
trace!(?i, "tick: dialer");
Expand Down Expand Up @@ -822,6 +804,15 @@ impl Stream for TopicCommandStream {
}
}

fn our_peer_data(endpoint: &Endpoint, direct_addresses: &[DirectAddr]) -> Result<PeerData> {
let addr = NodeAddr::from_parts(
endpoint.node_id(),
endpoint.home_relay(),
direct_addresses.iter().map(|x| x.addr).collect(),
);
encode_peer_data(&addr.info)
}

#[cfg(test)]
mod test {
use std::time::Duration;
Expand Down
27 changes: 0 additions & 27 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use iroh_blobs::protocol::Closed;
use iroh_blobs::store::Store as BaoStore;
use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle};
use iroh_docs::net::DOCS_ALPN;
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
use iroh_net::endpoint::{DirectAddrsStream, RemoteInfo};
use iroh_net::{AddrInfo, Endpoint, NodeAddr};
use protocol::BlobsProtocol;
Expand Down Expand Up @@ -288,19 +287,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
let external_rpc = RpcServer::new(external_rpc);
let internal_rpc = RpcServer::new(internal_rpc);

let gossip = protocols
.get_typed::<Gossip>(GOSSIP_ALPN)
.expect("missing gossip");

// TODO(frando): I think this is not needed as we do the same in a task just below.
// forward the initial endpoints to the gossip protocol.
// it may happen the the first endpoint update callback is missed because the gossip cell
// is only initialized once the endpoint is fully bound
if let Some(direct_addresses) = self.endpoint.direct_addresses().next().await {
debug!(me = ?self.endpoint.node_id(), "gossip initial update: {direct_addresses:?}");
gossip.update_direct_addresses(&direct_addresses).ok();
}

// Spawn a task for the garbage collection.
if let GcPolicy::Interval(gc_period) = gc_policy {
let protocols = protocols.clone();
Expand Down Expand Up @@ -396,19 +382,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
);
}

// Spawn a task that updates the gossip endpoints.
let inner = self.clone();
join_set.spawn(async move {
let mut stream = inner.endpoint.direct_addresses();
while let Some(eps) = stream.next().await {
if let Err(err) = gossip.update_direct_addresses(&eps) {
warn!("Failed to update direct addresses for gossip: {err:?}");
}
}
warn!("failed to retrieve local endpoints");
Ok(())
});

loop {
tokio::select! {
biased;
Expand Down

0 comments on commit 9583729

Please sign in to comment.