Skip to content

Commit

Permalink
Merge work on reusing bittorrent IP addresses on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
inetic committed Jan 24, 2024
2 parents 0712b37 + c7b4e39 commit 692f82b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ rust-version = "1.75.0"

[workspace.dependencies]
async-trait = "0.1.73"
btdht = { git = "https://github.com/equalitie/btdht.git", rev = "1d114b2" }
btdht = { git = "https://github.com/equalitie/btdht.git", rev = "e7ddf5607b20f0b82cbc3ea6259425c00bd8d16b" }
bytes = "1.5.0"
camino = "1.1.6"
clap = { version = "4.4.6", features = ["derive"] }
Expand Down
72 changes: 35 additions & 37 deletions lib/src/network/dht_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::{
use tokio::{
select,
sync::{mpsc, watch},
time::{self, Duration},
time::{self, timeout, Duration},
};
use tracing::{instrument::Instrument, Span};

Expand Down Expand Up @@ -257,20 +257,17 @@ impl MonitoredDht {
contacts_store: Option<Arc<dyn DhtContactsStoreTrait>>,
) -> Self {
// TODO: load the DHT state from a previous save if it exists.
let builder = MainlineDht::builder()
let mut builder = MainlineDht::builder()
.add_routers(DHT_ROUTERS.iter().copied())
.set_read_only(false);

// TODO: The reuse of initial contacts is incorrectly implemented, once the issue
// https://github.com/equalitie/btdht/issues/8
// is fixed, this can be uncommented again.
//if let Some(contacts_store) = &contacts_store {
// let initial_contacts = Self::load_initial_contacts(is_v4, &**contacts_store).await;
if let Some(contacts_store) = &contacts_store {
let initial_contacts = Self::load_initial_contacts(is_v4, &**contacts_store).await;

// for contact in initial_contacts {
// builder = builder.add_node(contact);
// }
//}
for contact in initial_contacts {
builder = builder.add_node(contact);
}
}

let dht = builder
.start(Socket(socket))
Expand All @@ -293,7 +290,7 @@ impl MonitoredDht {
async move {
tracing::info!("bootstrap started");

if dht.bootstrapped(None).await {
if dht.bootstrapped().await {
*first_bootstrap.get() = "done";
tracing::info!("bootstrap complete");
} else {
Expand Down Expand Up @@ -401,29 +398,28 @@ impl MonitoredDht {
}
}

// TODO: See comment where this function is used (it's also commented out).
//async fn load_initial_contacts(
// is_v4: bool,
// contacts_store: &(impl DhtContactsStoreTrait + ?Sized),
//) -> HashSet<SocketAddr> {
// if is_v4 {
// match contacts_store.load_v4().await {
// Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V4).collect(),
// Err(error) => {
// tracing::error!("Failed to load DHT IPv4 contacts {:?}", error);
// Default::default()
// }
// }
// } else {
// match contacts_store.load_v6().await {
// Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V6).collect(),
// Err(error) => {
// tracing::error!("Failed to load DHT IPv4 contacts {:?}", error);
// Default::default()
// }
// }
// }
//}
async fn load_initial_contacts(
is_v4: bool,
contacts_store: &(impl DhtContactsStoreTrait + ?Sized),
) -> HashSet<SocketAddr> {
if is_v4 {
match contacts_store.load_v4().await {
Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V4).collect(),
Err(error) => {
tracing::error!("Failed to load DHT IPv4 contacts {:?}", error);
Default::default()
}
}
} else {
match contacts_store.load_v6().await {
Ok(contacts) => contacts.iter().cloned().map(SocketAddr::V6).collect(),
Err(error) => {
tracing::error!("Failed to load DHT IPv4 contacts {:?}", error);
Default::default()
}
}
}
}
}

type Lookups = HashMap<InfoHash, Lookup>;
Expand Down Expand Up @@ -582,7 +578,9 @@ impl Lookup {

let mut peers = Box::pin(stream::iter(dhts).flat_map(|dht| {
stream::once(async {
dht.dht.bootstrapped(Some(Duration::from_secs(10))).await;
timeout(Duration::from_secs(10), dht.dht.bootstrapped())
.await
.unwrap_or(false);
dht.dht.search(info_hash, true)
})
.flatten()
Expand Down Expand Up @@ -637,7 +635,7 @@ impl btdht::SocketTrait for Socket {
Ok(())
}

async fn recv_from(&mut self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.0.recv_from(buf).await
}

Expand Down
2 changes: 1 addition & 1 deletion utils/btdht/src/btdht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async fn run_single_action(info_hash: InfoHash, command: &Single) {

async fn lookup(prefix: &str, dht: &MainlineDht, info_hash: InfoHash, announce: bool) {
println!("{prefix} Bootstrapping...");
if dht.bootstrapped(None).await {
if dht.bootstrapped().await {
let mut seen_peers = HashSet::new();

if !announce {
Expand Down

0 comments on commit 692f82b

Please sign in to comment.