Skip to content

Commit

Permalink
refactor!(iroh): move protocol specific fields in NodeInner into prot…
Browse files Browse the repository at this point in the history
…ocols (#2723)

- refactor(iroh): store gossip in protocolmap
- refactor(iroh): avoid duplicate SecretKey storage
- refactor(iroh): store downloader as part of the blobs
- refactor(iroh): store DocsEngine in protocols
- refactor(iroh): move blobs store into BlobsProtocol
- refactor(iroh): move BlobsBatches into BlobsProtocol

## Breaking Changes

- removed
  - `iroh::node::ProtocolBuilder::downloader`
  - `iroh::node::ProtocolBuilder::blobs_db`
  - `iroh::node::ProtocolBuilder::gossip`
  • Loading branch information
dignifiedquire committed Sep 10, 2024
1 parent 8a4bb09 commit 3d75ded
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 368 deletions.
118 changes: 27 additions & 91 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
//! well, without going through [`client`](crate::client::Iroh))
//!
//! To shut down the node, call [`Node::shutdown`].
use std::collections::{BTreeMap, BTreeSet};
use std::collections::BTreeSet;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
Expand All @@ -47,14 +48,14 @@ use futures_lite::StreamExt;
use futures_util::future::MapErr;
use futures_util::future::Shared;
use iroh_base::key::PublicKey;
use iroh_blobs::protocol::Closed;
use iroh_blobs::store::Store as BaoStore;
use iroh_blobs::util::local_pool::{LocalPool, LocalPoolHandle};
use iroh_blobs::{downloader::Downloader, protocol::Closed};
use iroh_blobs::{HashAndFormat, TempTag};
use iroh_gossip::net::Gossip;
use iroh_docs::net::DOCS_ALPN;
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
use iroh_net::endpoint::{DirectAddrsStream, RemoteInfo};
use iroh_net::key::SecretKey;
use iroh_net::{AddrInfo, Endpoint, NodeAddr};
use protocol::BlobsProtocol;
use quic_rpc::transport::ServerEndpoint as _;
use quic_rpc::RpcServer;
use tokio::task::{JoinError, JoinSet};
Expand All @@ -64,7 +65,6 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};

use crate::node::nodes_storage::store_node_addrs;
use crate::node::{docs::DocsEngine, protocol::ProtocolMap};
use crate::rpc_protocol::blobs::BatchId;

mod builder;
mod docs;
Expand Down Expand Up @@ -118,70 +118,14 @@ pub(crate) type JoinErrToStr = Box<dyn Fn(JoinError) -> String + Send + Sync + '

#[derive(derive_more::Debug)]
struct NodeInner<D> {
db: D,
db: PhantomData<D>,
rpc_addr: Option<SocketAddr>,
docs: Option<DocsEngine>,
endpoint: Endpoint,
gossip: Gossip,
secret_key: SecretKey,
cancel_token: CancellationToken,
client: crate::client::Iroh,
downloader: Downloader,
blob_batches: tokio::sync::Mutex<BlobBatches>,
local_pool_handle: LocalPoolHandle,
}

/// Keeps track of all the currently active batch operations of the blobs api.
#[derive(Debug, Default)]
struct BlobBatches {
/// Currently active batches
batches: BTreeMap<BatchId, BlobBatch>,
/// Used to generate new batch ids.
max: u64,
}

/// A single batch of blob operations
#[derive(Debug, Default)]
struct BlobBatch {
/// The tags in this batch.
tags: BTreeMap<HashAndFormat, Vec<TempTag>>,
}

impl BlobBatches {
/// Create a new unique batch id.
fn create(&mut self) -> BatchId {
let id = self.max;
self.max += 1;
BatchId(id)
}

/// Store a temp tag in a batch identified by a batch id.
fn store(&mut self, batch: BatchId, tt: TempTag) {
let entry = self.batches.entry(batch).or_default();
entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
}

/// Remove a tag from a batch.
fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> {
if let Some(batch) = self.batches.get_mut(&batch) {
if let Some(tags) = batch.tags.get_mut(content) {
tags.pop();
if tags.is_empty() {
batch.tags.remove(content);
}
return Ok(());
}
}
// this can happen if we try to upgrade a tag from an expired batch
anyhow::bail!("tag not found in batch");
}

/// Remove an entire batch.
fn remove(&mut self, batch: BatchId) {
self.batches.remove(&batch);
}
}

/// In memory node.
pub type MemNode = Node<iroh_blobs::store::mem::Store>;

Expand Down Expand Up @@ -245,7 +189,7 @@ impl<D: BaoStore> Node<D> {

/// Returns the [`PublicKey`] of the node.
pub fn node_id(&self) -> PublicKey {
self.inner.secret_key.public()
self.inner.endpoint.secret_key().public()
}

/// Return a client to control this node over an in-memory channel.
Expand Down Expand Up @@ -344,32 +288,40 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
let external_rpc = RpcServer::new(external_rpc);
let internal_rpc = RpcServer::new(internal_rpc);

let gossip = protocols
.get_typed::<Gossip>(GOSSIP_ALPN)
.expect("missing gossip");

// TODO(frando): I think this is not needed as we do the same in a task just below.
// forward the initial endpoints to the gossip protocol.
// it may happen the the first endpoint update callback is missed because the gossip cell
// is only initialized once the endpoint is fully bound
if let Some(direct_addresses) = self.endpoint.direct_addresses().next().await {
debug!(me = ?self.endpoint.node_id(), "gossip initial update: {direct_addresses:?}");
self.gossip.update_direct_addresses(&direct_addresses).ok();
gossip.update_direct_addresses(&direct_addresses).ok();
}

// Spawn a task for the garbage collection.
if let GcPolicy::Interval(gc_period) = gc_policy {
let inner = self.clone();
let protocols = protocols.clone();
let handle = local_pool.spawn(move || async move {
let inner2 = inner.clone();
inner
.db
let docs_engine = protocols.get_typed::<DocsEngine>(DOCS_ALPN);
let blobs = protocols
.get_typed::<BlobsProtocol<D>>(iroh_blobs::protocol::ALPN)
.expect("missing blobs");

blobs
.store()
.gc_run(
iroh_blobs::store::GcConfig {
period: gc_period,
done_callback: gc_done_callback,
},
move || {
let inner2 = inner2.clone();
let docs_engine = docs_engine.clone();
async move {
let mut live = BTreeSet::default();
if let Some(docs) = &inner2.docs {
if let Some(docs) = docs_engine {
let doc_hashes = match docs.sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
Expand Down Expand Up @@ -449,7 +401,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
join_set.spawn(async move {
let mut stream = inner.endpoint.direct_addresses();
while let Some(eps) = stream.next().await {
if let Err(err) = inner.gossip.update_direct_addresses(&eps) {
if let Err(err) = gossip.update_direct_addresses(&eps) {
warn!("Failed to update direct addresses for gossip: {err:?}");
}
}
Expand All @@ -468,7 +420,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
request = external_rpc.accept() => {
match request {
Ok(accepting) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting);
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, protocols.clone());
}
Err(e) => {
info!("rpc request error: {:?}", e);
Expand All @@ -479,7 +431,7 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
request = internal_rpc.accept() => {
match request {
Ok(accepting) => {
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting);
rpc::Handler::spawn_rpc_request(self.clone(), &mut join_set, accepting, protocols.clone());
}
Err(e) => {
info!("internal rpc request error: {:?}", e);
Expand Down Expand Up @@ -533,18 +485,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
let error_code = Closed::ProviderTerminating;

// Shutdown future for the docs engine, if enabled.
let docs_shutdown = {
let docs = self.docs.clone();
async move {
if let Some(docs) = docs {
docs.shutdown().await
} else {
Ok(())
}
}
};

// We ignore all errors during shutdown.
let _ = tokio::join!(
// Close the endpoint.
Expand All @@ -554,10 +494,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
self.endpoint
.clone()
.close(error_code.into(), error_code.reason()),
// Shutdown docs engine.
docs_shutdown,
// Shutdown blobs store engine.
self.db.shutdown(),
// Shutdown protocol handlers.
protocols.shutdown(),
);
Expand Down Expand Up @@ -636,7 +572,7 @@ mod tests {
use bytes::Bytes;
use iroh_base::node_addr::AddrInfoOptions;
use iroh_blobs::{provider::AddProgress, util::SetTagOption, BlobFormat};
use iroh_net::{relay::RelayMode, test_utils::DnsPkarrServer, NodeAddr};
use iroh_net::{key::SecretKey, relay::RelayMode, test_utils::DnsPkarrServer, NodeAddr};

use crate::client::blobs::{AddOutcome, WrapOption};

Expand Down
45 changes: 19 additions & 26 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,16 +647,11 @@ where

let inner = Arc::new(NodeInner {
rpc_addr: self.rpc_addr,
db: self.blobs_store,
docs,
db: Default::default(),
endpoint,
secret_key: self.secret_key,
client,
cancel_token: CancellationToken::new(),
downloader,
gossip,
local_pool_handle: lp.handle().clone(),
blob_batches: Default::default(),
});

let protocol_builder = ProtocolBuilder {
Expand All @@ -670,7 +665,13 @@ where
local_pool: lp,
};

let protocol_builder = protocol_builder.register_iroh_protocols(self.blob_events);
let protocol_builder = protocol_builder.register_iroh_protocols(
self.blob_events,
self.blobs_store,
gossip,
downloader,
docs,
);

Ok(protocol_builder)
}
Expand Down Expand Up @@ -764,26 +765,11 @@ impl<D: iroh_blobs::store::Store> ProtocolBuilder<D> {
&self.inner.endpoint
}

/// Returns the [`crate::blobs::store::Store`] used by the node.
pub fn blobs_db(&self) -> &D {
&self.inner.db
}

/// Returns a reference to the used [`LocalPoolHandle`].
pub fn local_pool_handle(&self) -> &LocalPoolHandle {
self.local_pool.handle()
}

/// Returns a reference to the [`Downloader`] used by the node.
pub fn downloader(&self) -> &Downloader {
&self.inner.downloader
}

/// Returns a reference to the [`Gossip`] handle used by the node.
pub fn gossip(&self) -> &Gossip {
&self.inner.gossip
}

/// Returns a protocol handler for an ALPN.
///
/// This downcasts to the concrete type and returns `None` if the handler registered for `alpn`
Expand All @@ -793,21 +779,28 @@ impl<D: iroh_blobs::store::Store> ProtocolBuilder<D> {
}

/// Registers the core iroh protocols (blobs, gossip, docs).
fn register_iroh_protocols(mut self, blob_events: EventSender) -> Self {
fn register_iroh_protocols(
mut self,
blob_events: EventSender,
store: D,
gossip: Gossip,
downloader: Downloader,
docs: Option<DocsEngine>,
) -> Self {
// Register blobs.
let blobs_proto = BlobsProtocol::new_with_events(
self.blobs_db().clone(),
store,
self.local_pool_handle().clone(),
blob_events,
downloader,
);
self = self.accept(iroh_blobs::protocol::ALPN.to_vec(), Arc::new(blobs_proto));

// Register gossip.
let gossip = self.gossip().clone();
self = self.accept(GOSSIP_ALPN.to_vec(), Arc::new(gossip));

// Register docs, if enabled.
if let Some(docs) = self.inner.docs.clone() {
if let Some(docs) = docs {
self = self.accept(DOCS_ALPN.to_vec(), Arc::new(docs));
}

Expand Down
9 changes: 9 additions & 0 deletions iroh/src/node/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,13 @@ impl ProtocolHandler for DocsEngine {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}

fn shutdown(self: Arc<Self>) -> BoxedFuture<()> {
Box::pin(async move {
let this: &Self = &self;
if let Err(err) = this.shutdown().await {
tracing::warn!("shutdown error: {:?}", err);
}
})
}
}
Loading

0 comments on commit 3d75ded

Please sign in to comment.