diff --git a/anchor/network/src/behaviour.rs b/anchor/network/src/behaviour.rs index 23e88fe..f52459a 100644 --- a/anchor/network/src/behaviour.rs +++ b/anchor/network/src/behaviour.rs @@ -1,5 +1,5 @@ use libp2p::swarm::NetworkBehaviour; -use libp2p::{identify, ping}; +use libp2p::{gossipsub, identify, ping}; #[derive(NetworkBehaviour)] pub struct AnchorBehaviour { @@ -7,6 +7,6 @@ pub struct AnchorBehaviour { pub identify: identify::Behaviour, /// Used for connection health checks. pub ping: ping::Behaviour, - // /// The routing pub-sub mechanism for Anchor. - // pub gossipsub: gossipsub::Behaviour, + /// The routing pub-sub mechanism for Anchor. + pub gossipsub: gossipsub::Behaviour, } diff --git a/anchor/network/src/network.rs b/anchor/network/src/network.rs index 1b4192b..dc0325a 100644 --- a/anchor/network/src/network.rs +++ b/anchor/network/src/network.rs @@ -1,17 +1,22 @@ use crate::behaviour::AnchorBehaviour; +use crate::behaviour::AnchorBehaviourEvent; use crate::keypair_utils::load_private_key; use crate::transport::build_transport; use crate::Config; use futures::StreamExt; use libp2p::core::muxing::StreamMuxerBox; use libp2p::core::transport::Boxed; +use libp2p::gossipsub::{MessageAuthenticity, ValidationMode}; use libp2p::identity::Keypair; use libp2p::multiaddr::Protocol; -use libp2p::{futures, identify, ping, PeerId, Swarm, SwarmBuilder}; +use libp2p::swarm::SwarmEvent; +use libp2p::{futures, gossipsub, identify, ping, PeerId, Swarm, SwarmBuilder}; +use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256}; use std::num::{NonZeroU8, NonZeroUsize}; use std::pin::Pin; +use std::time::Duration; use task_executor::TaskExecutor; -use tracing::info; +use tracing::{info, log}; pub struct Network { swarm: Swarm, @@ -74,8 +79,22 @@ impl Network { pub async fn run(mut self) { loop { tokio::select! { - _swarm_message = self.swarm.select_next_some() => { - // TODO handle and match swarm messages + swarm_message = self.swarm.select_next_some() => { + match swarm_message { + SwarmEvent::Behaviour(behaviour_event) => match behaviour_event { + AnchorBehaviourEvent::Gossipsub(_ge) => { + // TODO handle gossipsub events + }, + // TODO handle other behaviour events + _ => { + log::debug!("Unhandled behaviour event: {:?}", behaviour_event); + } + }, + // TODO handle other swarm events + _ => { + log::debug!("Unhandled swarm event: {:?}", swarm_message); + } + } } // TODO match input channels } @@ -84,8 +103,7 @@ impl Network { } fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { - // setup gossipsub - // discv5 + // TODO setup discv5 let identify = { let local_public_key = local_keypair.public(); let identify_config = identify::Config::new("anchor".into(), local_public_key) @@ -94,10 +112,40 @@ fn build_anchor_behaviour(local_keypair: Keypair) -> AnchorBehaviour { identify::Behaviour::new(identify_config) }; + // TODO those values might need to be parameterized based on the network + let slots_per_epoch = 32; + let seconds_per_slot = 12; + let duplicate_cache_time = Duration::from_secs(slots_per_epoch * seconds_per_slot); // 6.4 min + + let gossip_message_id = move |message: &gossipsub::Message| { + gossipsub::MessageId::from(&Sha256::digest(&message.data)[..20]) + }; + + // TODO Add Topic Message Validator and Subscription Filter + let config = gossipsub::ConfigBuilder::default() + .duplicate_cache_time(duplicate_cache_time) + .message_id_fn(gossip_message_id) + .flood_publish(false) + .validation_mode(ValidationMode::Strict) + .mesh_n(8) //D + .mesh_n_low(6) // Dlo + .mesh_n_high(12) // Dhi + .mesh_outbound_min(4) // Dout + .heartbeat_interval(Duration::from_millis(700)) + .history_length(6) + .history_gossip(4) + .max_ihave_length(1500) + .max_ihave_messages(32) + .build() + .unwrap(); + + let gossipsub = + gossipsub::Behaviour::new(MessageAuthenticity::Signed(local_keypair), config).unwrap(); + AnchorBehaviour { identify, ping: ping::Behaviour::default(), - // gossipsub: gossipsub::Behaviour::default(), + gossipsub, } }