diff --git a/Cargo.toml b/Cargo.toml index 14f858190..936ba6ee3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ rust-version = "1.75.0" [workspace.dependencies] async-trait = "0.1.73" -btdht = { git = "https://github.com/equalitie/btdht.git", rev = "1d114b2" } +btdht = { git = "https://github.com/equalitie/btdht.git", rev = "e7ddf5607b20f0b82cbc3ea6259425c00bd8d16b" } bytes = "1.5.0" camino = "1.1.6" clap = { version = "4.4.6", features = ["derive"] } diff --git a/lib/src/network/dht_discovery.rs b/lib/src/network/dht_discovery.rs index 31f730e30..41acd9fa6 100644 --- a/lib/src/network/dht_discovery.rs +++ b/lib/src/network/dht_discovery.rs @@ -25,7 +25,7 @@ use std::{ use tokio::{ select, sync::{mpsc, watch}, - time::{self, Duration}, + time::{self, timeout, Duration}, }; use tracing::{instrument::Instrument, Span}; @@ -257,20 +257,17 @@ impl MonitoredDht { contacts_store: Option>, ) -> Self { // TODO: load the DHT state from a previous save if it exists. - let builder = MainlineDht::builder() + let mut builder = MainlineDht::builder() .add_routers(DHT_ROUTERS.iter().copied()) .set_read_only(false); - // TODO: The reuse of initial contacts is incorrectly implemented, once the issue - // https://github.com/equalitie/btdht/issues/8 - // is fixed, this can be uncommented again. - //if let Some(contacts_store) = &contacts_store { - // let initial_contacts = Self::load_initial_contacts(is_v4, &**contacts_store).await; + if let Some(contacts_store) = &contacts_store { + let initial_contacts = Self::load_initial_contacts(is_v4, &**contacts_store).await; - // for contact in initial_contacts { - // builder = builder.add_node(contact); - // } - //} + for contact in initial_contacts { + builder = builder.add_node(contact); + } + } let dht = builder .start(Socket(socket)) @@ -293,7 +290,7 @@ impl MonitoredDht { async move { tracing::info!("bootstrap started"); - if dht.bootstrapped(None).await { + if dht.bootstrapped().await { *first_bootstrap.get() = "done"; tracing::info!("bootstrap complete"); } else { @@ -401,29 +398,28 @@ impl MonitoredDht { } } - // TODO: See comment where this function is used (it's also commented out). - //async fn load_initial_contacts( - // is_v4: bool, - // contacts_store: &(impl DhtContactsStoreTrait + ?Sized), - //) -> HashSet { - // if is_v4 { - // match contacts_store.load_v4().await { - // Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V4).collect(), - // Err(error) => { - // tracing::error!("Failed to load DHT IPv4 contacts {:?}", error); - // Default::default() - // } - // } - // } else { - // match contacts_store.load_v6().await { - // Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V6).collect(), - // Err(error) => { - // tracing::error!("Failed to load DHT IPv4 contacts {:?}", error); - // Default::default() - // } - // } - // } - //} + async fn load_initial_contacts( + is_v4: bool, + contacts_store: &(impl DhtContactsStoreTrait + ?Sized), + ) -> HashSet { + if is_v4 { + match contacts_store.load_v4().await { + Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V4).collect(), + Err(error) => { + tracing::error!("Failed to load DHT IPv4 contacts {:?}", error); + Default::default() + } + } + } else { + match contacts_store.load_v6().await { + Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V6).collect(), + Err(error) => { + tracing::error!("Failed to load DHT IPv4 contacts {:?}", error); + Default::default() + } + } + } + } } type Lookups = HashMap; @@ -582,7 +578,9 @@ impl Lookup { let mut peers = Box::pin(stream::iter(dhts).flat_map(|dht| { stream::once(async { - dht.dht.bootstrapped(Some(Duration::from_secs(10))).await; + timeout(Duration::from_secs(10), dht.dht.bootstrapped()) + .await + .unwrap_or(false); dht.dht.search(info_hash, true) }) .flatten() @@ -637,7 +635,7 @@ impl btdht::SocketTrait for Socket { Ok(()) } - async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { self.0.recv_from(buf).await } diff --git a/utils/btdht/src/btdht.rs b/utils/btdht/src/btdht.rs index 465c2ff5b..06c4056b2 100644 --- a/utils/btdht/src/btdht.rs +++ b/utils/btdht/src/btdht.rs @@ -208,7 +208,7 @@ async fn run_single_action(info_hash: InfoHash, command: &Single) { async fn lookup(prefix: &str, dht: &MainlineDht, info_hash: InfoHash, announce: bool) { println!("{prefix} Bootstrapping..."); - if dht.bootstrapped(None).await { + if dht.bootstrapped().await { let mut seen_peers = HashSet::new(); if !announce {