Skip to content

Commit

Permalink
ping and removal of connections
Browse files Browse the repository at this point in the history
  • Loading branch information
ddeguglielmo committed Nov 23, 2023
1 parent 2b8785d commit 719ee76
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 29 deletions.
2 changes: 1 addition & 1 deletion dht-cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async-trait = "0.1.68"
futures = "0.3.21"
futures-util = "0.3.29"
jsonpath_lib = "0.3.0"
libp2p = { version = "0.53.0", features = ["tokio", "mdns", "gossipsub", "noise", "yamux", "pnet", "rsa", "tcp", "macros"] }
libp2p = { version="0.53.1", features = ["tokio", "mdns", "gossipsub", "noise", "ping", "yamux", "pnet", "rsa", "tcp", "macros"] }
log = "0.4.17"
rand = "0.8"
sea-query = "0.28.3"
Expand Down
31 changes: 11 additions & 20 deletions dht-cache/src/domocache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::utils;
use futures::prelude::*;
use libp2p::gossipsub::IdentTopic as Topic;
use libp2p::identity::Keypair;
use libp2p::{mdns, PeerId};
use libp2p::{mdns, ping};
use libp2p::swarm::{SwarmEvent};
use rsa::pkcs8::EncodePrivateKey;
use rsa::RsaPrivateKey;
Expand All @@ -20,7 +20,6 @@ use time::OffsetDateTime;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::utils::get_epoch_ms;
use std::str::FromStr;
use libp2p::swarm::dial_opts::DialOpts;

fn generate_rsa_key() -> (Vec<u8>, Vec<u8>) {
Expand Down Expand Up @@ -384,19 +383,6 @@ impl DomoCache {
// }
}

pub fn remove_connections_of_peers(&mut self) {
for (peer_id, last_mdns_rec_timestamp) in self.mdns_peers_cache.iter() {

if last_mdns_rec_timestamp.to_owned() < (utils::get_epoch_ms() - 1000 * 10 as u128) {
if let Ok(peer_id) = PeerId::from_str(peer_id) {
if let Ok(_res) = self.swarm.disconnect_peer_id(peer_id) {
println!("DISCONNECTING LOCAL CONNECTIONS TO {peer_id}");
}
}
}
}

}

pub fn print_peers_cache(&self) {
for (peer_id, peer_data) in self.peers_caches_state.iter() {
Expand Down Expand Up @@ -467,10 +453,16 @@ impl DomoCache {
}
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(
mdns::Event::Expired(_list),
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Ping(
ping::Event { peer, connection, result}
)) => {
println!("MDNS TTL Expired");

if let Ok(_res) = result {
println!("PING OK {} {}", peer.to_string(), connection);
} else {
println!("PING FAILED {} {}, CLOSE CONNECTION", peer.to_string(), connection);
self.swarm.close_connection(connection);
}
}
SwarmEvent::Behaviour(crate::domolibp2p::OutEvent::Mdns(
mdns::Event::Discovered(list),
Expand All @@ -487,7 +479,7 @@ impl DomoCache {

let dial_opts = DialOpts::from(peer_id);
let _res = self.swarm.dial(dial_opts);
println!("INSERT INTO MDNS CACHE {} {} ", peer_id.to_string(), get_epoch_ms());
println!("INSERT INTO MDNS CACHE {} {} ", peer_id.to_string(), get_epoch_ms()/1000);

self.mdns_peers_cache.insert(peer_id.to_string(), get_epoch_ms());
println!("{:?}", self.mdns_peers_cache);
Expand Down Expand Up @@ -517,7 +509,6 @@ impl DomoCache {
self.send_cache_state_timer = tokio::time::Instant::now()
+ Duration::from_secs(u64::from(SEND_CACHE_HASH_PERIOD));
self.send_cache_state().await;
self.remove_connections_of_peers();
}
PersistentData(data) => {
return self.handle_persistent_message_data(&data).await;
Expand Down
25 changes: 17 additions & 8 deletions dht-cache/src/domolibp2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use libp2p::yamux;
//use libp2p::tcp::TcpConfig;
use libp2p::Transport;

use libp2p::{identity, mdns, swarm::NetworkBehaviour, PeerId, Swarm};
use libp2p::{identity, mdns, swarm::NetworkBehaviour, PeerId, Swarm, ping};

use std::error::Error;
use std::io;
Expand Down Expand Up @@ -70,9 +70,6 @@ pub async fn start(
listen_addr: String
) -> Result<Swarm<DomoBehaviour>, Box<dyn Error>> {
let local_peer_id = PeerId::from(local_key_pair.public());



let arr = parse_hex_key(&shared_key)?;
let psk = PreSharedKey::new(arr);

Expand All @@ -93,8 +90,8 @@ pub async fn start(
.with_behaviour(|key| {

let mdnsconf = mdns::Config {
ttl: Duration::from_secs(10),
query_interval: Duration::from_secs(5),
ttl: Duration::from_secs(30),
query_interval: Duration::from_secs(10),
enable_ipv6: false
};

Expand All @@ -121,7 +118,11 @@ pub async fn start(
gossipsub_config,
)?;

let behaviour = DomoBehaviour { mdns, gossipsub };
let ping_config = ping::Config::new().with_interval(Duration::from_secs(5)).with_timeout(Duration::from_secs(1));

let ping = ping::Behaviour::new(ping_config);

let behaviour = DomoBehaviour { mdns, gossipsub , ping };

Ok(behaviour)

Expand Down Expand Up @@ -150,19 +151,21 @@ pub async fn start(
Ok(swarm)
}

// We create a custom network behaviour that combines mDNS and gossipsub.
// We create a custom network behaviour that combines mDNS and gossipsub and ping.
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "OutEvent")]
pub struct DomoBehaviour {
pub mdns: mdns::tokio::Behaviour,
pub gossipsub: gossipsub::Behaviour,
pub ping: ping::Behaviour
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum OutEvent {
Gossipsub(gossipsub::Event),
Mdns(mdns::Event),
Ping(ping::Event)
}

impl From<mdns::Event> for OutEvent {
Expand All @@ -176,3 +179,9 @@ impl From<gossipsub::Event> for OutEvent {
Self::Gossipsub(v)
}
}

impl From<ping::Event> for OutEvent {
fn from(v: ping::Event) -> Self {
Self::Ping(v)
}
}

0 comments on commit 719ee76

Please sign in to comment.