diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 4aafb4686f0..f5e33741544 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -50,15 +50,15 @@ use async_std::{io, task}; use env_logger::{Builder, Env}; use futures::prelude::*; use libp2p::gossipsub::protocol::MessageId; -use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, Topic}; -use libp2p::{ - gossipsub, identity, - PeerId, -}; +use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, MessageAuthenticity, Topic}; +use libp2p::{gossipsub, identity, PeerId}; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; use std::time::Duration; -use std::{error::Error, task::{Context, Poll}}; +use std::{ + error::Error, + task::{Context, Poll}, +}; fn main() -> Result<(), Box> { Builder::from_env(Env::default().default_filter_or("info")).init(); @@ -69,7 +69,7 @@ fn main() -> Result<(), Box> { println!("Local peer id: {:?}", local_peer_id); // Set up an encrypted TCP Transport over the Mplex and Yamux protocols - let transport = libp2p::build_development_transport(local_key)?; + let transport = libp2p::build_development_transport(local_key.clone())?; // Create a Gossipsub topic let topic = Topic::new("test-net".into()); @@ -83,7 +83,7 @@ fn main() -> Result<(), Box> { let message_id_fn = |message: &GossipsubMessage| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); - MessageId(s.finish().to_string()) + MessageId::from(s.finish().to_string()) }; // set custom gossipsub @@ -93,7 +93,8 @@ fn main() -> Result<(), Box> { //same content will be propagated. .build(); // build a gossipsub network behaviour - let mut gossipsub = gossipsub::Gossipsub::new(local_peer_id.clone(), gossipsub_config); + let mut gossipsub = + gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config); gossipsub.subscribe(topic.clone()); libp2p::Swarm::new(transport, gossipsub, local_peer_id) }; @@ -120,11 +121,13 @@ fn main() -> Result<(), Box> { let mut listening = false; task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { - match stdin.try_poll_next_unpin(cx)? { + if let Err(e) = match stdin.try_poll_next_unpin(cx)? { Poll::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break, - }; + } { + println!("Publish error: {:?}", e); + } } loop { diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs index f70f44049d1..08e4fdc8fe5 100644 --- a/examples/ipfs-private.rs +++ b/examples/ipfs-private.rs @@ -35,7 +35,7 @@ use async_std::{io, task}; use futures::{future, prelude::*}; use libp2p::{ core::{either::EitherTransport, transport::upgrade::Version, StreamMuxer}, - gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent}, + gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity}, identify::{Identify, IdentifyEvent}, identity, multiaddr::Protocol, @@ -178,18 +178,14 @@ fn main() -> Result<(), Box> { ping: Ping, } - impl NetworkBehaviourEventProcess - for MyBehaviour - { + impl NetworkBehaviourEventProcess for MyBehaviour { // Called when `identify` produces an event. fn inject_event(&mut self, event: IdentifyEvent) { println!("identify: {:?}", event); } } - impl NetworkBehaviourEventProcess - for MyBehaviour - { + impl NetworkBehaviourEventProcess for MyBehaviour { // Called when `gossipsub` produces an event. fn inject_event(&mut self, event: GossipsubEvent) { match event { @@ -204,9 +200,7 @@ fn main() -> Result<(), Box> { } } - impl NetworkBehaviourEventProcess - for MyBehaviour - { + impl NetworkBehaviourEventProcess for MyBehaviour { // Called when `ping` produces an event. fn inject_event(&mut self, event: PingEvent) { use ping::handler::{PingFailure, PingSuccess}; @@ -245,11 +239,11 @@ fn main() -> Result<(), Box> { // Create a Swarm to manage peers and events let mut swarm = { - let gossipsub_config = GossipsubConfigBuilder::default() + let gossipsub_config = GossipsubConfigBuilder::new() .max_transmit_size(262144) .build(); let mut behaviour = MyBehaviour { - gossipsub: Gossipsub::new(local_peer_id.clone(), gossipsub_config), + gossipsub: Gossipsub::new(MessageAuthenticity::Signed(local_key.clone()), gossipsub_config), identify: Identify::new( "/ipfs/0.1.0".into(), "rust-ipfs-example".into(), @@ -280,12 +274,14 @@ fn main() -> Result<(), Box> { let mut listening = false; task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { - match stdin.try_poll_next_unpin(cx)? { + if let Err(e) = match stdin.try_poll_next_unpin(cx)? { Poll::Ready(Some(line)) => { - swarm.gossipsub.publish(&gossipsub_topic, line.as_bytes()); + swarm.gossipsub.publish(&gossipsub_topic, line.as_bytes()) } Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break, + } { + println!("Publish error: {:?}", e); } } loop { diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 59ccb548e3c..1541d6f9359 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -23,10 +23,10 @@ unsigned-varint = { version = "0.4.0", features = ["futures-codec"] } log = "0.4.8" sha2 = "0.8.1" base64 = "0.11.0" -lru = "0.4.3" smallvec = "1.1.0" prost = "0.6.1" hex_fmt = "0.3.0" +lru_time_cache = "0.10.0" [dev-dependencies] async-std = "1.6.2" @@ -34,6 +34,7 @@ env_logger = "0.7.1" libp2p-plaintext = { path = "../plaintext" } libp2p-yamux = { path = "../../muxers/yamux" } quickcheck = "0.9.2" +hex = "0.4.2" [build-dependencies] prost-build = "0.6" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 4f1ae52a8d2..7636c2c06b1 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -18,32 +18,33 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::config::GossipsubConfig; +use crate::config::{GossipsubConfig, ValidationMode}; +use crate::error::PublishError; use crate::handler::GossipsubHandler; use crate::mcache::MessageCache; use crate::protocol::{ GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, - MessageId, + MessageId, SIGNING_PREFIX, }; +use crate::rpc_proto; use crate::topic::{Topic, TopicHash}; use futures::prelude::*; -use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{ + connection::ConnectionId, identity::error::SigningError, identity::Keypair, Multiaddr, PeerId, +}; use libp2p_swarm::{ - NetworkBehaviour, - NetworkBehaviourAction, - NotifyHandler, - PollParameters, - ProtocolsHandler + NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler, }; use log::{debug, error, info, trace, warn}; -use lru::LruCache; +use lru_time_cache::LruCache; +use prost::Message; use rand; use rand::{seq::SliceRandom, thread_rng}; use std::{ - collections::hash_map::HashMap, collections::HashSet, collections::VecDeque, - iter, + collections::{hash_map::HashMap, BTreeSet}, + fmt, iter, sync::Arc, task::{Context, Poll}, }; @@ -51,8 +52,102 @@ use wasm_timer::{Instant, Interval}; mod tests; -#[derive(Debug)] +/// Determines if published messages should be signed or not. +/// +/// Without signing, a number of privacy preserving modes can be selected. +/// +/// NOTE: The default validation settings are to require signatures. The [`ValidationMode`] +/// should be updated in the [`GossipsubConfig`] to allow for unsigned messages. +#[derive(Clone)] +pub enum MessageAuthenticity { + /// Message signing is enabled. The author will be the owner of the key and the sequence number + /// will be a random number. + Signed(Keypair), + /// Message signing is disabled. + /// + /// The specified `PeerId` will be used as the author of all published messages. The sequence + /// number will be randomized. + Author(PeerId), + /// Message signing is disabled. + /// + /// A random `PeerId` will be used when publishing each message. The sequence number will be + /// randomized. + RandomAuthor, + /// Message signing is disabled. + /// + /// The author of the message and the sequence numbers are excluded from the message. + /// + /// NOTE: Excluding these fields may make these messages invalid by other nodes who + /// enforce validation of these fields. See [`ValidationMode`] in the `GossipsubConfig` + /// for how to customise this for rust-libp2p gossipsub. A custom `message_id` + /// function will need to be set to prevent all messages from a peer being filtered + /// as duplicates. + Anonymous, +} + +impl MessageAuthenticity { + /// Returns true if signing is enabled. + fn is_signing(&self) -> bool { + match self { + MessageAuthenticity::Signed(_) => true, + _ => false, + } + } + + fn is_anonymous(&self) -> bool { + match self { + MessageAuthenticity::Anonymous => true, + _ => false, + } + } +} + +/// A data structure for storing configuration for publishing messages. See [`MessageAuthenticity`] +/// for further details. +enum PublishConfig { + Signing { + keypair: Keypair, + author: PeerId, + inline_key: Option>, + }, + Author(PeerId), + RandomAuthor, + Anonymous, +} + +impl From for PublishConfig { + fn from(authenticity: MessageAuthenticity) -> Self { + match authenticity { + MessageAuthenticity::Signed(keypair) => { + let public_key = keypair.public(); + let key_enc = public_key.clone().into_protobuf_encoding(); + let key = if key_enc.len() <= 42 { + // The public key can be inlined in [`rpc_proto::Message::from`], so we don't include it + // specifically in the [`rpc_proto::Message::key`] field. + None + } else { + // Include the protobuf encoding of the public key in the message. + Some(key_enc) + }; + + PublishConfig::Signing { + keypair, + author: public_key.into_peer_id(), + inline_key: key, + } + } + MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id), + MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor, + MessageAuthenticity::Anonymous => PublishConfig::Anonymous, + } + } +} + /// Network behaviour that handles the gossipsub protocol. +/// +/// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If message signing is +/// disabled, the [`ValidationMode`] in the config should be adjusted to an appropriate level to +/// accept unsigned messages. pub struct Gossipsub { /// Configuration providing gossipsub performance parameters. config: GossipsubConfig, @@ -63,20 +158,24 @@ pub struct Gossipsub { /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, - /// Peer id of the local node. Used for the source of the messages that we publish. - local_peer_id: PeerId, + /// Information used for publishing messages. + publish_config: PublishConfig, + + /// An LRU Time cache for storing seen messages (based on their ID). This cache prevents + /// duplicates from being propagated to the application and on the network. + duplication_cache: LruCache, /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. - topic_peers: HashMap>, + topic_peers: HashMap>, /// A map of all connected peers to their subscribed topics. - peer_topics: HashMap>, + peer_topics: HashMap>, /// Overlay network of connected peers - Maps topics to connected gossipsub peers. - mesh: HashMap>, + mesh: HashMap>, /// Map of topics to list of peers that we publish to, but don't subscribe to. - fanout: HashMap>, + fanout: HashMap>, /// The last publish time for fanout topics. fanout_last_pub: HashMap, @@ -84,43 +183,41 @@ pub struct Gossipsub { /// Message cache for the last few heartbeats. mcache: MessageCache, - // We keep track of the messages we received (in the format `string(source ID, seq_no)`) so that - // we don't dispatch the same message twice if we receive it twice on the network. - received: LruCache, - /// Heartbeat interval stream. heartbeat: Interval, } impl Gossipsub { - /// Creates a `Gossipsub` struct given a set of parameters specified by `gs_config`. - pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self { - let local_peer_id = if gs_config.no_source_id { - PeerId::from_bytes(crate::config::IDENTITY_SOURCE.to_vec()).expect("Valid peer id") - } else { - local_peer_id - }; + /// Creates a `Gossipsub` struct given a set of parameters specified via a `GossipsubConfig`. + pub fn new(privacy: MessageAuthenticity, config: GossipsubConfig) -> Self { + // Set up the router given the configuration settings. + + // We do not allow configurations where a published message would also be rejected if it + // were received locally. + validate_config(&privacy, &config.validation_mode); + + // Set up message publishing parameters. Gossipsub { - config: gs_config.clone(), events: VecDeque::new(), control_pool: HashMap::new(), - local_peer_id, + publish_config: privacy.into(), + duplication_cache: LruCache::with_expiry_duration(config.duplicate_cache_time), topic_peers: HashMap::new(), peer_topics: HashMap::new(), mesh: HashMap::new(), fanout: HashMap::new(), fanout_last_pub: HashMap::new(), mcache: MessageCache::new( - gs_config.history_gossip, - gs_config.history_length, - gs_config.message_id_fn, + config.history_gossip, + config.history_length, + config.message_id_fn, ), - received: LruCache::new(256), // keep track of the last 256 messages heartbeat: Interval::new_at( - Instant::now() + gs_config.heartbeat_initial_delay, - gs_config.heartbeat_interval, + Instant::now() + config.heartbeat_initial_delay, + config.heartbeat_interval, ), + config, } } @@ -135,29 +232,21 @@ impl Gossipsub { return false; } - // send subscription request to all peers in the topic - if let Some(peer_list) = self.topic_peers.get(&topic_hash) { - let mut fixed_event = None; // initialise the event once if needed - if fixed_event.is_none() { - fixed_event = Some(Arc::new(GossipsubRpc { - messages: Vec::new(), - subscriptions: vec![GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }], - control_msgs: Vec::new(), - })); - } - - let event = fixed_event.expect("event has been initialised"); + // send subscription request to all peers + let peer_list = self.peer_topics.keys().cloned().collect::>(); + if !peer_list.is_empty() { + let event = Arc::new(GossipsubRpc { + messages: Vec::new(), + subscriptions: vec![GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }], + control_msgs: Vec::new(), + }); for peer in peer_list { debug!("Sending SUBSCRIBE to peer: {:?}", peer); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer.clone(), - handler: NotifyHandler::Any, - event: event.clone(), - }); + self.send_message(peer, event.clone()); } } @@ -181,29 +270,21 @@ impl Gossipsub { return false; } - // announce to all peers in the topic - let mut fixed_event = None; // initialise the event once if needed - if let Some(peer_list) = self.topic_peers.get(topic_hash) { - if fixed_event.is_none() { - fixed_event = Some(Arc::new(GossipsubRpc { - messages: Vec::new(), - subscriptions: vec![GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Unsubscribe, - }], - control_msgs: Vec::new(), - })); - } - - let event = fixed_event.expect("event has been initialised"); + // announce to all peers + let peer_list = self.peer_topics.keys().cloned().collect::>(); + if !peer_list.is_empty() { + let event = Arc::new(GossipsubRpc { + messages: Vec::new(), + subscriptions: vec![GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Unsubscribe, + }], + control_msgs: Vec::new(), + }); for peer in peer_list { - debug!("Sending UNSUBSCRIBE to peer: {:?}", peer); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer.clone(), - event: event.clone(), - handler: NotifyHandler::Any, - }); + debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string()); + self.send_message(peer, event.clone()); } } @@ -216,41 +297,48 @@ impl Gossipsub { } /// Publishes a message to the network. - pub fn publish(&mut self, topic: &Topic, data: impl Into>) { + pub fn publish(&mut self, topic: &Topic, data: impl Into>) -> Result<(), PublishError> { self.publish_many(iter::once(topic.clone()), data) } /// Publishes a message with multiple topics to the network. pub fn publish_many( &mut self, - topic: impl IntoIterator, + topics: impl IntoIterator, data: impl Into>, - ) { - let message = GossipsubMessage { - source: self.local_peer_id.clone(), - data: data.into(), - // To be interoperable with the go-implementation this is treated as a 64-bit - // big-endian uint. - sequence_number: rand::random(), - topics: topic.into_iter().map(|t| self.topic_hash(t)).collect(), - }; + ) -> Result<(), PublishError> { + let message = self.build_message( + topics.into_iter().map(|t| self.topic_hash(t)).collect(), + data.into(), + )?; + let msg_id = (self.config.message_id_fn)(&message); - debug!( - "Publishing message: {:?}", - (self.config.message_id_fn)(&message) - ); + // Add published message to the duplicate cache. + if self.duplication_cache.insert(msg_id.clone(), ()).is_some() { + // This message has already been seen. We don't re-publish messages that have already + // been published on the network. + warn!( + "Not publishing a message that has already been published. Msg-id {}", + msg_id + ); + return Err(PublishError::Duplicate); + } + + // If the message isn't a duplicate add it to the memcache. + self.mcache.put(message.clone()); + + debug!("Publishing message: {:?}", msg_id); - // forward the message to mesh peers - let local_peer_id = self.local_peer_id.clone(); - self.forward_msg(message.clone(), &local_peer_id); + // Forward the message to mesh peers. + let mesh_peers_sent = self.forward_msg(message.clone(), None); let mut recipient_peers = HashSet::new(); for topic_hash in &message.topics { - // if not subscribed to the topic, use fanout peers + // If not subscribed to the topic, use fanout peers. if self.mesh.get(&topic_hash).is_none() { debug!("Topic: {:?} not in the mesh", topic_hash); - // build a list of peers to forward the message to - // if we have fanout peers add them to the map + // Build a list of peers to forward the message to + // if we have fanout peers add them to the map. if self.fanout.contains_key(&topic_hash) { for peer in self.fanout.get(&topic_hash).expect("Topic must exist") { recipient_peers.insert(peer.clone()); @@ -275,12 +363,9 @@ impl Gossipsub { } } - // add published message to our received caches - let msg_id = (self.config.message_id_fn)(&message); - self.mcache.put(message.clone()); - self.received.put(msg_id.clone(), ()); - - info!("Published message: {:?}", msg_id); + if recipient_peers.is_empty() && !mesh_peers_sent { + return Err(PublishError::InsufficientPeers); + } let event = Arc::new(GossipsubRpc { subscriptions: Vec::new(), @@ -290,37 +375,41 @@ impl Gossipsub { // Send to peers we know are subscribed to the topic. for peer_id in recipient_peers.iter() { debug!("Sending message to peer: {:?}", peer_id); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - event: event.clone(), - handler: NotifyHandler::Any, - }); + self.send_message(peer_id.clone(), event.clone()); } + + info!("Published message: {:?}", msg_id); + Ok(()) } - /// This function should be called when `config.manual_propagation` is `true` in order to - /// propagate messages. Messages are stored in the ['Memcache'] and validation is expected to be + /// This function should be called when `config.validate_messages` is `true` in order to + /// validate and propagate messages. Messages are stored in the ['Memcache'] and validation is expected to be /// fast enough that the messages should still exist in the cache. /// /// Calling this function will propagate a message stored in the cache, if it still exists. /// If the message still exists in the cache, it will be forwarded and this function will return true, /// otherwise it will return false. - pub fn propagate_message( + /// + /// The `propagation_source` parameter indicates who the message was received by and will not + /// be forwarded back to that peer. + /// + /// This should only be called once per message. + pub fn validate_message( &mut self, message_id: &MessageId, propagation_source: &PeerId, ) -> bool { - let message = match self.mcache.get(message_id) { + let message = match self.mcache.validate(message_id) { Some(message) => message.clone(), None => { warn!( "Message not in cache. Ignoring forwarding. Message Id: {}", - message_id.0 + message_id ); return false; } }; - self.forward_msg(message, propagation_source); + self.forward_msg(message, Some(propagation_source)); true } @@ -350,9 +439,11 @@ impl Gossipsub { "JOIN: Adding {:?} peers from the fanout for topic: {:?}", add_peers, topic_hash ); - added_peers.extend_from_slice(&peers[..add_peers]); - self.mesh - .insert(topic_hash.clone(), peers[..add_peers].to_vec()); + added_peers.extend(peers.iter().cloned().take(add_peers)); + self.mesh.insert( + topic_hash.clone(), + peers.into_iter().take(add_peers).collect(), + ); // remove the last published time self.fanout_last_pub.remove(topic_hash); } @@ -364,9 +455,9 @@ impl Gossipsub { &self.topic_peers, topic_hash, self.config.mesh_n - added_peers.len(), - |_| true, + |peer| !added_peers.contains(peer), ); - added_peers.extend_from_slice(&new_peers); + added_peers.extend(new_peers.clone()); // add them to the mesh debug!( "JOIN: Inserting {:?} random peers into the mesh", @@ -375,8 +466,8 @@ impl Gossipsub { let mesh_peers = self .mesh .entry(topic_hash.clone()) - .or_insert_with(|| Vec::new()); - mesh_peers.extend_from_slice(&new_peers); + .or_insert_with(Default::default); + mesh_peers.extend(new_peers); } for peer_id in added_peers { @@ -432,7 +523,7 @@ impl Gossipsub { } for id in ids { - if !self.received.contains(&id) { + if self.mcache.get(&id).is_none() { // have not seen this message, request it iwant_ids.insert(id); } @@ -471,15 +562,14 @@ impl Gossipsub { debug!("IWANT: Sending cached messages to peer: {:?}", peer_id); // Send the messages to the peer let message_list = cached_messages.into_iter().map(|entry| entry.1).collect(); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::Any, - event: Arc::new(GossipsubRpc { + self.send_message( + peer_id.clone(), + GossipsubRpc { subscriptions: Vec::new(), messages: message_list, control_msgs: Vec::new(), - }), - }); + }, + ); } debug!("Completed IWANT handling for peer: {:?}", peer_id); } @@ -497,10 +587,8 @@ impl Gossipsub { "GRAFT: Mesh link added for peer: {:?} in topic: {:?}", peer_id, topic_hash ); - // ensure peer is not already added - if !peers.contains(peer_id) { - peers.push(peer_id.clone()); - } + // Duplicates are ignored + peers.insert(peer_id.clone()); } else { to_prune_topics.insert(topic_hash.clone()); } @@ -519,49 +607,58 @@ impl Gossipsub { "GRAFT: Not subscribed to topics - Sending PRUNE to peer: {:?}", peer_id ); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer_id.clone(), - handler: NotifyHandler::Any, - event: Arc::new(GossipsubRpc { + self.send_message( + peer_id.clone(), + GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: prune_messages, - }), - }); + }, + ); } debug!("Completed GRAFT handling for peer: {:?}", peer_id); } /// Handles PRUNE control messages. Removes peer from the mesh. fn handle_prune(&mut self, peer_id: &PeerId, topics: Vec) { - debug!("Handling PRUNE message for peer: {:?}", peer_id); + debug!("Handling PRUNE message for peer: {}", peer_id.to_string()); for topic_hash in topics { if let Some(peers) = self.mesh.get_mut(&topic_hash) { // remove the peer if it exists in the mesh - info!( - "PRUNE: Removing peer: {:?} from the mesh for topic: {:?}", - peer_id, topic_hash - ); - peers.retain(|p| p != peer_id); + if peers.remove(peer_id) { + info!( + "PRUNE: Removing peer: {} from the mesh for topic: {:?}", + peer_id.to_string(), + topic_hash + ); + } } } - debug!("Completed PRUNE handling for peer: {:?}", peer_id); + debug!("Completed PRUNE handling for peer: {}", peer_id.to_string()); } /// Handles a newly received GossipsubMessage. /// Forwards the message to all peers in the mesh. - fn handle_received_message(&mut self, msg: GossipsubMessage, propagation_source: &PeerId) { + fn handle_received_message(&mut self, mut msg: GossipsubMessage, propagation_source: &PeerId) { let msg_id = (self.config.message_id_fn)(&msg); debug!( - "Handling message: {:?} from peer: {:?}", - msg_id, propagation_source + "Handling message: {:?} from peer: {}", + msg_id, + propagation_source.to_string() ); - if self.received.put(msg_id.clone(), ()).is_some() { + + // If we are not validating messages, assume this message is validated + // This will allow the message to be gossiped without explicitly calling + // `validate_message`. + if !self.config.validate_messages { + msg.validated = true; + } + + // Add the message to the duplication cache and memcache. + if self.duplication_cache.insert(msg_id.clone(), ()).is_some() { debug!("Message already received, ignoring. Message: {:?}", msg_id); return; } - - // add to the memcache self.mcache.put(msg.clone()); // dispatch the message to the user @@ -573,9 +670,9 @@ impl Gossipsub { } // forward the message to mesh peers, if no validation is required - if !self.config.manual_propagation { + if !self.config.validate_messages { let message_id = (self.config.message_id_fn)(&msg); - self.forward_msg(msg, propagation_source); + self.forward_msg(msg, Some(propagation_source)); debug!("Completed message handling for message: {:?}", message_id); } } @@ -587,55 +684,70 @@ impl Gossipsub { propagation_source: &PeerId, ) { debug!( - "Handling subscriptions: {:?}, from source: {:?}", - subscriptions, propagation_source + "Handling subscriptions: {:?}, from source: {}", + subscriptions, + propagation_source.to_string() ); let subscribed_topics = match self.peer_topics.get_mut(propagation_source) { Some(topics) => topics, None => { - error!("Subscription by unknown peer: {:?}", &propagation_source); + error!( + "Subscription by unknown peer: {}", + propagation_source.to_string() + ); return; } }; + // Collect potential graft messages for the peer. + let mut grafts = Vec::new(); + + // Notify the application about the subscription, after the grafts are sent. + let mut application_event = Vec::new(); + for subscription in subscriptions { // get the peers from the mapping, or insert empty lists if topic doesn't exist let peer_list = self .topic_peers .entry(subscription.topic_hash.clone()) - .or_insert_with(Vec::new); + .or_insert_with(Default::default); match subscription.action { GossipsubSubscriptionAction::Subscribe => { - if !peer_list.contains(&propagation_source) { + if peer_list.insert(propagation_source.clone()) { debug!( - "SUBSCRIPTION: topic_peer: Adding gossip peer: {:?} to topic: {:?}", - propagation_source, subscription.topic_hash + "SUBSCRIPTION: Adding gossip peer: {} to topic: {:?}", + propagation_source.to_string(), + subscription.topic_hash ); - peer_list.push(propagation_source.clone()); } // add to the peer_topics mapping - if !subscribed_topics.contains(&subscription.topic_hash) { - info!( - "SUBSCRIPTION: Adding peer: {:?} to topic: {:?}", - propagation_source, subscription.topic_hash - ); - subscribed_topics.push(subscription.topic_hash.clone()); - } + subscribed_topics.insert(subscription.topic_hash.clone()); // if the mesh needs peers add the peer to the mesh if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { if peers.len() < self.config.mesh_n_low { - debug!( - "SUBSCRIPTION: Adding peer {:?} to the mesh", - propagation_source, - ); + if peers.insert(propagation_source.clone()) { + debug!( + "SUBSCRIPTION: Adding peer {} to the mesh for topic {:?}", + propagation_source.to_string(), + subscription.topic_hash + ); + // send graft to the peer + debug!( + "Sending GRAFT to peer {} for topic {:?}", + propagation_source.to_string(), + subscription.topic_hash + ); + grafts.push(GossipsubControlAction::Graft { + topic_hash: subscription.topic_hash.clone(), + }); + } } - peers.push(propagation_source.clone()); } // generates a subscription event to be polled - self.events.push_back(NetworkBehaviourAction::GenerateEvent( + application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Subscribed { peer_id: propagation_source.clone(), topic: subscription.topic_hash.clone(), @@ -643,27 +755,23 @@ impl Gossipsub { )); } GossipsubSubscriptionAction::Unsubscribe => { - if let Some(pos) = peer_list.iter().position(|p| p == propagation_source) { + if peer_list.remove(propagation_source) { info!( - "SUBSCRIPTION: Removing gossip peer: {:?} from topic: {:?}", - propagation_source, subscription.topic_hash + "SUBSCRIPTION: Removing gossip peer: {} from topic: {:?}", + propagation_source.to_string(), + subscription.topic_hash ); - peer_list.remove(pos); } // remove topic from the peer_topics mapping - if let Some(pos) = subscribed_topics - .iter() - .position(|t| t == &subscription.topic_hash) - { - subscribed_topics.remove(pos); - } + subscribed_topics.remove(&subscription.topic_hash); // remove the peer from the mesh if it exists if let Some(peers) = self.mesh.get_mut(&subscription.topic_hash) { - peers.retain(|peer| peer != propagation_source); + peers.remove(propagation_source); + // the peer requested the unsubscription so we don't need to send a PRUNE. } // generate an unsubscribe event to be polled - self.events.push_back(NetworkBehaviourAction::GenerateEvent( + application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Unsubscribed { peer_id: propagation_source.clone(), topic: subscription.topic_hash.clone(), @@ -672,6 +780,25 @@ impl Gossipsub { } } } + + // If we need to send grafts to peer, do so immediately, rather than waiting for the + // heartbeat. + if !grafts.is_empty() { + self.send_message( + propagation_source.clone(), + GossipsubRpc { + subscriptions: Vec::new(), + messages: Vec::new(), + control_msgs: grafts, + }, + ); + } + + // Notify the application of the subscriptions + for event in application_event { + self.events.push_back(event); + } + trace!( "Completed handling subscriptions from source: {:?}", propagation_source @@ -690,8 +817,8 @@ impl Gossipsub { // too little peers - add some if peers.len() < self.config.mesh_n_low { debug!( - "HEARTBEAT: Mesh low. Topic: {:?} Contains: {:?} needs: {:?}", - topic_hash.clone().into_string(), + "HEARTBEAT: Mesh low. Topic: {} Contains: {} needs: {}", + topic_hash, peers.len(), self.config.mesh_n_low ); @@ -702,7 +829,7 @@ impl Gossipsub { |peer| !peers.contains(peer) }); for peer in &peer_list { - let current_topic = to_graft.entry(peer.clone()).or_insert_with(|| vec![]); + let current_topic = to_graft.entry(peer.clone()).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } // update the mesh @@ -713,7 +840,7 @@ impl Gossipsub { // too many peers - remove some if peers.len() > self.config.mesh_n_high { debug!( - "HEARTBEAT: Mesh high. Topic: {:?} Contains: {:?} needs: {:?}", + "HEARTBEAT: Mesh high. Topic: {} Contains: {} needs: {}", topic_hash, peers.len(), self.config.mesh_n_high @@ -721,13 +848,15 @@ impl Gossipsub { let excess_peer_no = peers.len() - self.config.mesh_n; // shuffle the peers let mut rng = thread_rng(); - peers.shuffle(&mut rng); + let mut shuffled = peers.iter().cloned().collect::>(); + shuffled.shuffle(&mut rng); // remove the first excess_peer_no peers adding them to to_prune for _ in 0..excess_peer_no { - let peer = peers + let peer = shuffled .pop() .expect("There should always be enough peers to remove"); - let current_topic = to_prune.entry(peer).or_insert_with(|| vec![]); + peers.remove(&peer); + let current_topic = to_prune.entry(peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); } } @@ -772,7 +901,9 @@ impl Gossipsub { } } } - peers.retain(|peer| to_remove_peers.contains(&peer)); + for to_remove in to_remove_peers { + peers.remove(&to_remove); + } // not enough peers if peers.len() < self.config.mesh_n { @@ -808,7 +939,6 @@ impl Gossipsub { /// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh /// and fanout peers fn emit_gossip(&mut self) { - debug!("Started gossip"); for (topic_hash, peers) in self.mesh.iter().chain(self.fanout.iter()) { let message_ids = self.mcache.get_gossip_ids(&topic_hash); if message_ids.is_empty() { @@ -822,6 +952,9 @@ impl Gossipsub { self.config.gossip_lazy, |peer| !peers.contains(peer), ); + + debug!("Gossiping IHAVE to {} peers.", to_msg_peers.len()); + for peer in to_msg_peers { // send an IHAVE message Self::control_pool_add( @@ -834,7 +967,6 @@ impl Gossipsub { ); } } - debug!("Completed gossip"); } /// Handles multiple GRAFT/PRUNE messages and coalesces them into chunked gossip control @@ -844,34 +976,35 @@ impl Gossipsub { to_graft: HashMap>, mut to_prune: HashMap>, ) { - // handle the grafts and overlapping prunes + // handle the grafts and overlapping prunes per peer for (peer, topics) in to_graft.iter() { - let mut grafts: Vec = topics + let mut control_msgs: Vec = topics .iter() .map(|topic_hash| GossipsubControlAction::Graft { topic_hash: topic_hash.clone(), }) .collect(); - let mut prunes: Vec = to_prune - .remove(peer) - .unwrap_or_else(|| vec![]) - .iter() - .map(|topic_hash| GossipsubControlAction::Prune { - topic_hash: topic_hash.clone(), - }) - .collect(); - grafts.append(&mut prunes); + + // If there are prunes associated with the same peer add them. + if let Some(topics) = to_prune.remove(peer) { + let mut prunes = topics + .iter() + .map(|topic_hash| GossipsubControlAction::Prune { + topic_hash: topic_hash.clone(), + }) + .collect::>(); + control_msgs.append(&mut prunes); + } // send the control messages - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer.clone(), - handler: NotifyHandler::Any, - event: Arc::new(GossipsubRpc { + self.send_message( + peer.clone(), + GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), - control_msgs: grafts, - }), - }); + control_msgs, + }, + ); } // handle the remaining prunes @@ -882,20 +1015,20 @@ impl Gossipsub { topic_hash: topic_hash.clone(), }) .collect(); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer.clone(), - handler: NotifyHandler::Any, - event: Arc::new(GossipsubRpc { + self.send_message( + peer.clone(), + GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: remaining_prunes, - }), - }); + }, + ); } } /// Helper function which forwards a message to mesh\[topic\] peers. - fn forward_msg(&mut self, message: GossipsubMessage, source: &PeerId) { + /// Returns true if at least one peer was messaged. + fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool { let msg_id = (self.config.message_id_fn)(&message); debug!("Forwarding message: {:?}", msg_id); let mut recipient_peers = HashSet::new(); @@ -905,7 +1038,7 @@ impl Gossipsub { // mesh if let Some(mesh_peers) = self.mesh.get(&topic) { for peer_id in mesh_peers { - if peer_id != source { + if Some(peer_id) != source { recipient_peers.insert(peer_id.clone()); } } @@ -922,24 +1055,113 @@ impl Gossipsub { for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer.clone(), - event: event.clone(), - handler: NotifyHandler::Any, - }); + self.send_message(peer.clone(), event.clone()); + } + debug!("Completed forwarding message"); + true + } else { + false + } + } + + /// Constructs a `GossipsubMessage` performing message signing if required. + pub(crate) fn build_message( + &self, + topics: Vec, + data: Vec, + ) -> Result { + match &self.publish_config { + PublishConfig::Signing { + ref keypair, + author, + inline_key, + } => { + // Build and sign the message + let sequence_number: u64 = rand::random(); + + let signature = { + let message = rpc_proto::Message { + from: Some(author.clone().into_bytes()), + data: Some(data.clone()), + seqno: Some(sequence_number.to_be_bytes().to_vec()), + topic_ids: topics.clone().into_iter().map(|t| t.into()).collect(), + signature: None, + key: None, + }; + + let mut buf = Vec::with_capacity(message.encoded_len()); + message + .encode(&mut buf) + .expect("Buffer has sufficient capacity"); + + // the signature is over the bytes "libp2p-pubsub:" + let mut signature_bytes = SIGNING_PREFIX.to_vec(); + signature_bytes.extend_from_slice(&buf); + Some(keypair.sign(&signature_bytes)?) + }; + + Ok(GossipsubMessage { + source: Some(author.clone()), + data, + // To be interoperable with the go-implementation this is treated as a 64-bit + // big-endian uint. + sequence_number: Some(sequence_number), + topics, + signature, + key: inline_key.clone(), + validated: true, // all published messages are valid + }) + } + PublishConfig::Author(peer_id) => { + Ok(GossipsubMessage { + source: Some(peer_id.clone()), + data, + // To be interoperable with the go-implementation this is treated as a 64-bit + // big-endian uint. + sequence_number: Some(rand::random()), + topics, + signature: None, + key: None, + validated: true, // all published messages are valid + }) + } + PublishConfig::RandomAuthor => { + Ok(GossipsubMessage { + source: Some(PeerId::random()), + data, + // To be interoperable with the go-implementation this is treated as a 64-bit + // big-endian uint. + sequence_number: Some(rand::random()), + topics, + signature: None, + key: None, + validated: true, // all published messages are valid + }) + } + PublishConfig::Anonymous => { + Ok(GossipsubMessage { + source: None, + data, + // To be interoperable with the go-implementation this is treated as a 64-bit + // big-endian uint. + sequence_number: None, + topics, + signature: None, + key: None, + validated: true, // all published messages are valid + }) } } - debug!("Completed forwarding message"); } /// Helper function to get a set of `n` random gossipsub peers for a `topic_hash` /// filtered by the function `f`. fn get_random_peers( - topic_peers: &HashMap>, + topic_peers: &HashMap>, topic_hash: &TopicHash, n: usize, mut f: impl FnMut(&PeerId) -> bool, - ) -> Vec { + ) -> BTreeSet { let mut gossip_peers = match topic_peers.get(topic_hash) { // if they exist, filter the peers by `f` Some(peer_list) => peer_list.iter().cloned().filter(|p| f(p)).collect(), @@ -949,7 +1171,7 @@ impl Gossipsub { // if we have less than needed, return them if gossip_peers.len() <= n { debug!("RANDOM PEERS: Got {:?} peers", gossip_peers.len()); - return gossip_peers.to_vec(); + return gossip_peers.into_iter().collect(); } // we have more peers than needed, shuffle them and return n of them @@ -958,7 +1180,7 @@ impl Gossipsub { debug!("RANDOM PEERS: Got {:?} peers", n); - gossip_peers[..n].to_vec() + gossip_peers.into_iter().take(n).collect() } // adds a control action to control_pool @@ -984,18 +1206,28 @@ impl Gossipsub { /// Takes each control action mapping and turns it into a message fn flush_control_pool(&mut self) { - for (peer, controls) in self.control_pool.drain() { - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: peer, - handler: NotifyHandler::Any, - event: Arc::new(GossipsubRpc { + for (peer, controls) in self.control_pool.drain().collect::>() { + self.send_message( + peer, + GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), control_msgs: controls, - }), - }); + }, + ); } } + + /// Send a GossipsubRpc message to a peer. This will wrap the message in an arc if it + /// is not already an arc. + fn send_message(&mut self, peer_id: PeerId, message: impl Into>) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + event: message.into(), + handler: NotifyHandler::Any, + }) + } } impl NetworkBehaviour for Gossipsub { @@ -1006,6 +1238,7 @@ impl NetworkBehaviour for Gossipsub { GossipsubHandler::new( self.config.protocol_id.clone(), self.config.max_transmit_size, + self.config.validation_mode.clone(), ) } @@ -1026,19 +1259,18 @@ impl NetworkBehaviour for Gossipsub { if !subscriptions.is_empty() { // send our subscriptions to the peer - self.events.push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: id.clone(), - handler: NotifyHandler::Any, - event: Arc::new(GossipsubRpc { + self.send_message( + id.clone(), + GossipsubRpc { messages: Vec::new(), subscriptions, control_msgs: Vec::new(), - }), - }); + }, + ); } // For the time being assume all gossipsub peers - self.peer_topics.insert(id.clone(), Vec::new()); + self.peer_topics.insert(id.clone(), Default::default()); } fn inject_disconnected(&mut self, id: &PeerId) { @@ -1058,18 +1290,13 @@ impl NetworkBehaviour for Gossipsub { // check the mesh for the topic if let Some(mesh_peers) = self.mesh.get_mut(&topic) { // check if the peer is in the mesh and remove it - if let Some(pos) = mesh_peers.iter().position(|p| p == id) { - mesh_peers.remove(pos); - } + mesh_peers.remove(id); } // remove from topic_peers if let Some(peer_list) = self.topic_peers.get_mut(&topic) { - if let Some(pos) = peer_list.iter().position(|p| p == id) { - peer_list.remove(pos); - } - // debugging purposes - else { + if !peer_list.remove(id) { + // debugging purposes warn!("Disconnected node: {:?} not in topic_peers peer list", &id); } } else { @@ -1080,9 +1307,7 @@ impl NetworkBehaviour for Gossipsub { } // remove from fanout - self.fanout - .get_mut(&topic) - .map(|peers| peers.retain(|p| p != id)); + self.fanout.get_mut(&topic).map(|peers| peers.remove(id)); } } @@ -1094,7 +1319,9 @@ impl NetworkBehaviour for Gossipsub { fn inject_event(&mut self, propagation_source: PeerId, _: ConnectionId, event: GossipsubRpc) { // Handle subscriptions // Update connected peers topics - self.handle_received_subscriptions(&event.subscriptions, &propagation_source); + if !event.subscriptions.is_empty() { + self.handle_received_subscriptions(&event.subscriptions, &propagation_source); + } // Handle messages for message in event.messages { @@ -1143,35 +1370,33 @@ impl NetworkBehaviour for Gossipsub { >, > { if let Some(event) = self.events.pop_front() { - // clone send event reference if others references are present - match event { + return Poll::Ready(match event { NetworkBehaviourAction::NotifyHandler { - peer_id, handler, event: send_event, - } => match Arc::try_unwrap(send_event) { - Ok(event) => { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, event, handler - }); - } - Err(event) => { - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, event: (*event).clone(), handler - }); + peer_id, + handler, + event: send_event, + } => { + // clone send event reference if others references are present + let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone()); + NetworkBehaviourAction::NotifyHandler { + peer_id, + event, + handler, } - }, + } NetworkBehaviourAction::GenerateEvent(e) => { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)); + NetworkBehaviourAction::GenerateEvent(e) } NetworkBehaviourAction::DialAddress { address } => { - return Poll::Ready(NetworkBehaviourAction::DialAddress { address }); + NetworkBehaviourAction::DialAddress { address } } NetworkBehaviourAction::DialPeer { peer_id, condition } => { - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }); + NetworkBehaviourAction::DialPeer { peer_id, condition } } NetworkBehaviourAction::ReportObservedAddr { address } => { - return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }); + NetworkBehaviourAction::ReportObservedAddr { address } } - } + }); } while let Poll::Ready(Some(())) = self.heartbeat.poll_next_unpin(cx) { @@ -1183,7 +1408,7 @@ impl NetworkBehaviour for Gossipsub { } /// An RPC received/sent. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, Eq, Hash)] pub struct GossipsubRpc { /// List of messages that were part of this RPC query. pub messages: Vec, @@ -1193,6 +1418,22 @@ pub struct GossipsubRpc { pub control_msgs: Vec, } +impl fmt::Debug for GossipsubRpc { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut b = f.debug_struct("GossipsubRpc"); + if !self.messages.is_empty() { + b.field("messages", &self.messages); + } + if !self.subscriptions.is_empty() { + b.field("subscriptions", &self.subscriptions); + } + if !self.control_msgs.is_empty() { + b.field("control_msgs", &self.control_msgs); + } + b.finish() + } +} + /// Event that can happen on the gossipsub behaviour. #[derive(Debug)] pub enum GossipsubEvent { @@ -1217,3 +1458,61 @@ pub enum GossipsubEvent { topic: TopicHash, }, } + +/// Validates the combination of signing, privacy and message validation to ensure the +/// configuration will not reject published messages. +fn validate_config(authenticity: &MessageAuthenticity, validation_mode: &ValidationMode) { + match validation_mode { + ValidationMode::Anonymous => { + if authenticity.is_signing() { + panic!("Cannot enable message signing with an Anonymous validation mode. Consider changing either the ValidationMode or MessageAuthenticity"); + } + + if !authenticity.is_anonymous() { + panic!("Published messages contain an author but incoming messages with an author will be rejected. Consider adjusting the validation or privacy settings in the config"); + } + } + ValidationMode::Strict => { + if !authenticity.is_signing() { + panic!( + "Messages will be + published unsigned and incoming unsigned messages will be rejected. Consider adjusting + the validation or privacy settings in the config" + ); + } + } + _ => {} + } +} + + + +impl fmt::Debug for Gossipsub { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Gossipsub") + .field("config", &self.config) + .field("events", &self.events) + .field("control_pool", &self.control_pool) + .field("publish_config", &self.publish_config) + .field("topic_peers", &self.topic_peers) + .field("peer_topics", &self.peer_topics) + .field("mesh", &self.mesh) + .field("fanout", &self.fanout) + .field("fanout_last_pub", &self.fanout_last_pub) + .field("mcache", &self.mcache) + .field("heartbeat", &self.heartbeat) + .finish() + } +} + +impl fmt::Debug for PublishConfig { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PublishConfig::Signing { author, .. } => f.write_fmt(format_args!("PublishConfig::Signing({})", author)), + PublishConfig::Author(author) => f.write_fmt(format_args!("PublishConfig::Author({})", author)), + PublishConfig::RandomAuthor => f.write_fmt(format_args!("PublishConfig::RandomAuthor")), + PublishConfig::Anonymous => f.write_fmt(format_args!("PublishConfig::Anonymous")), + } + } +} + diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index c933659434b..73a6a0b2904 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -33,10 +33,11 @@ mod tests { topics: Vec, to_subscribe: bool, ) -> (Gossipsub, Vec, Vec) { - // generate a default GossipsubConfig + let keypair = libp2p_core::identity::Keypair::generate_secp256k1(); + // generate a default GossipsubConfig with signing let gs_config = GossipsubConfig::default(); // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config); + let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Signed(keypair), gs_config); let mut topic_hashes = vec![]; @@ -53,10 +54,7 @@ mod tests { for _ in 0..peer_no { let peer = PeerId::random(); peers.push(peer.clone()); - ::inject_connected( - &mut gs, - &peer, - ); + ::inject_connected(&mut gs, &peer); if to_subscribe { gs.handle_received_subscriptions( &topic_hashes @@ -230,21 +228,23 @@ mod tests { "Should have added 6 nodes to the mesh" ); - // there should be mesh_n GRAFT messages. - let graft_messages = - gs.control_pool - .iter() - .fold(vec![], |mut collected_grafts, (_, controls)| { - for c in controls.iter() { - match c { - GossipsubControlAction::Graft { topic_hash: _ } => { - collected_grafts.push(c.clone()) - } - _ => {} - } + fn collect_grafts( + mut collected_grafts: Vec, + (_, controls): (&PeerId, &Vec), + ) -> Vec { + for c in controls.iter() { + match c { + GossipsubControlAction::Graft { topic_hash: _ } => { + collected_grafts.push(c.clone()) } - collected_grafts - }); + _ => {} + } + } + collected_grafts + } + + // there should be mesh_n GRAFT messages. + let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts); assert_eq!( graft_messages.len(), @@ -254,11 +254,12 @@ mod tests { // verify fanout nodes // add 3 random peers to the fanout[topic1] - gs.fanout.insert(topic_hashes[1].clone(), vec![]); - let new_peers = vec![]; + gs.fanout + .insert(topic_hashes[1].clone(), Default::default()); + let new_peers: Vec = vec![]; for _ in 0..3 { let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); - fanout_peers.push(PeerId::random()); + fanout_peers.insert(PeerId::random()); } // subscribe to topic1 @@ -272,26 +273,13 @@ mod tests { let mesh_peers = gs.mesh.get(&topic_hashes[1]).unwrap(); for new_peer in new_peers { assert!( - mesh_peers.contains(new_peer), + mesh_peers.contains(&new_peer), "Fanout peer should be included in the mesh" ); } // there should now be 12 graft messages to be sent - let graft_messages = - gs.control_pool - .iter() - .fold(vec![], |mut collected_grafts, (_, controls)| { - for c in controls.iter() { - match c { - GossipsubControlAction::Graft { topic_hash: _ } => { - collected_grafts.push(c.clone()) - } - _ => {} - } - } - collected_grafts - }); + let graft_messages = gs.control_pool.iter().fold(vec![], collect_grafts); assert!( graft_messages.len() == 12, @@ -315,9 +303,17 @@ mod tests { "Subscribe should add a new entry to the mesh[topic] hashmap" ); + // all peers should be subscribed to the topic + assert_eq!( + gs.topic_peers.get(&topic_hashes[0]).map(|p| p.len()), + Some(20), + "Peers should be subscribed to the topic" + ); + // publish on topic let publish_data = vec![0; 42]; - gs.publish(&Topic::new(publish_topic), publish_data); + gs.publish(&Topic::new(publish_topic), publish_data) + .unwrap(); // Collect all publish messages let publishes = gs @@ -336,8 +332,10 @@ mod tests { let msg_id = (gs.config.message_id_fn)(&publishes.first().expect("Should contain > 0 entries")); - assert!( - publishes.len() == 20, + let config = GossipsubConfig::default(); + assert_eq!( + publishes.len(), + config.mesh_n_low, "Should send a publish message to all known peers" ); @@ -345,10 +343,6 @@ mod tests { gs.mcache.get(&msg_id).is_some(), "Message cache should contain published message" ); - assert!( - gs.received.get(&msg_id).is_some(), - "Received cache should contain published message" - ); } /// Test local node publish to unsubscribed topic @@ -374,7 +368,8 @@ mod tests { // Publish on unsubscribed topic let publish_data = vec![0; 42]; - gs.publish(&Topic::new(fanout_topic.clone()), publish_data); + gs.publish(&Topic::new(fanout_topic.clone()), publish_data) + .unwrap(); assert_eq!( gs.fanout @@ -412,10 +407,6 @@ mod tests { gs.mcache.get(&msg_id).is_some(), "Message cache should contain published message" ); - assert!( - gs.received.get(&msg_id).is_some(), - "Received cache should contain published message" - ); } #[test] @@ -433,7 +424,9 @@ mod tests { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { .. } => true, + NetworkBehaviourAction::NotifyHandler { event, .. } => { + !event.subscriptions.is_empty() + } _ => false, }) .collect(); @@ -461,7 +454,7 @@ mod tests { for peer in peers { let known_topics = gs.peer_topics.get(&peer).unwrap(); assert!( - known_topics == &topic_hashes, + known_topics == &topic_hashes.iter().cloned().collect(), "The topics for each node should all topics" ); } @@ -508,12 +501,12 @@ mod tests { let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); assert!( - peer_topics == topic_hashes[..3].to_vec(), + peer_topics == topic_hashes.iter().take(3).cloned().collect(), "First peer should be subscribed to three topics" ); let peer_topics = gs.peer_topics.get(&peers[1]).unwrap().clone(); assert!( - peer_topics == topic_hashes[..3].to_vec(), + peer_topics == topic_hashes.iter().take(3).cloned().collect(), "Second peer should be subscribed to three topics" ); @@ -525,7 +518,7 @@ mod tests { for topic_hash in topic_hashes[..3].iter() { let topic_peers = gs.topic_peers.get(topic_hash).unwrap().clone(); assert!( - topic_peers == peers[..2].to_vec(), + topic_peers == peers[..2].into_iter().cloned().collect(), "Two peers should be added to the first three topics" ); } @@ -542,13 +535,13 @@ mod tests { let peer_topics = gs.peer_topics.get(&peers[0]).unwrap().clone(); assert!( - peer_topics == topic_hashes[1..3].to_vec(), + peer_topics == topic_hashes[1..3].into_iter().cloned().collect(), "Peer should be subscribed to two topics" ); let topic_peers = gs.topic_peers.get(&topic_hashes[0]).unwrap().clone(); // only gossipsub at the moment assert!( - topic_peers == peers[1..2].to_vec(), + topic_peers == peers[1..2].into_iter().cloned().collect(), "Only the second peers should be in the first topic" ); } @@ -557,9 +550,10 @@ mod tests { /// Test Gossipsub.get_random_peers() function fn test_get_random_peers() { // generate a default GossipsubConfig - let gs_config = GossipsubConfig::default(); + let mut gs_config = GossipsubConfig::default(); + gs_config.validation_mode = ValidationMode::Anonymous; // create a gossipsub struct - let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config); + let mut gs: Gossipsub = Gossipsub::new(MessageAuthenticity::Anonymous, gs_config); // create a topic and fill it with some peers let topic_hash = Topic::new("Test".into()).no_hash().clone(); @@ -568,30 +562,31 @@ mod tests { peers.push(PeerId::random()) } - gs.topic_peers.insert(topic_hash.clone(), peers.clone()); + gs.topic_peers + .insert(topic_hash.clone(), peers.iter().cloned().collect()); - let random_peers = - Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| true); - assert!(random_peers.len() == 5, "Expected 5 peers to be returned"); - let random_peers = - Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 30, |_| true); + let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| true); + assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned"); + let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 30, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); - assert!(random_peers == peers, "Expected no shuffling"); - let random_peers = - Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 20, |_| true); + assert!( + random_peers == peers.iter().cloned().collect(), + "Expected no shuffling" + ); + let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 20, |_| true); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); - assert!(random_peers == peers, "Expected no shuffling"); - let random_peers = - Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 0, |_| true); + assert!( + random_peers == peers.iter().cloned().collect(), + "Expected no shuffling" + ); + let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 0, |_| true); assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); // test the filter - let random_peers = - Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| false); + let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, |_| false); assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); - let random_peers = - Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 10, { - |peer| peers.contains(peer) - }); + let random_peers = Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 10, { + |peer| peers.contains(peer) + }); assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); } @@ -603,10 +598,13 @@ mod tests { let id = gs.config.message_id_fn; let message = GossipsubMessage { - source: peers[11].clone(), + source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], - sequence_number: 1u64, + sequence_number: Some(1u64), topics: Vec::new(), + signature: None, + key: None, + validated: true, }; let msg_id = id(&message); gs.mcache.put(message.clone()); @@ -642,10 +640,13 @@ mod tests { // perform 10 memshifts and check that it leaves the cache for shift in 1..10 { let message = GossipsubMessage { - source: peers[11].clone(), + source: Some(peers[11].clone()), data: vec![1, 2, 3, 4], - sequence_number: shift, + sequence_number: Some(shift), topics: Vec::new(), + signature: None, + key: None, + validated: true, }; let msg_id = id(&message); gs.mcache.put(message.clone()); @@ -683,7 +684,7 @@ mod tests { let (mut gs, peers, _) = build_and_inject_nodes(20, Vec::new(), true); let events_before = gs.events.len(); - gs.handle_iwant(&peers[7], vec![MessageId(String::from("unknown id"))]); + gs.handle_iwant(&peers[7], vec![MessageId::new(b"unknown id")]); let events_after = gs.events.len(); assert_eq!( @@ -700,10 +701,7 @@ mod tests { gs.handle_ihave( &peers[7], - vec![( - topic_hashes[0].clone(), - vec![MessageId(String::from("unknown id"))], - )], + vec![(topic_hashes[0].clone(), vec![MessageId::new(b"unknown id")])], ); // check that we sent an IWANT request for `unknown id` @@ -711,7 +709,7 @@ mod tests { Some(controls) => controls.iter().any(|c| match c { GossipsubControlAction::IWant { message_ids } => message_ids .iter() - .any(|m| *m.0 == String::from("unknown id")), + .any(|m| *m == MessageId::new(b"unknown id")), _ => false, }), _ => false, @@ -730,8 +728,7 @@ mod tests { let (mut gs, peers, topic_hashes) = build_and_inject_nodes(20, vec![String::from("topic1")], true); - let msg_id = MessageId(String::from("known id")); - gs.received.put(msg_id.clone(), ()); + let msg_id = MessageId::new(b"known id"); let events_before = gs.events.len(); gs.handle_ihave(&peers[7], vec![(topic_hashes[0].clone(), vec![msg_id])]); @@ -754,7 +751,7 @@ mod tests { &peers[7], vec![( TopicHash::from_raw(String::from("unsubscribed topic")), - vec![MessageId(String::from("irrelevant id"))], + vec![MessageId::new(b"irrelevant id")], )], ); let events_after = gs.events.len(); @@ -793,7 +790,7 @@ mod tests { ); assert!( - gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), + !gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), "Expected peer to have been added to mesh" ); } @@ -836,7 +833,8 @@ mod tests { build_and_inject_nodes(20, vec![String::from("topic1")], true); // insert peer into our mesh for 'topic1' - gs.mesh.insert(topic_hashes[0].clone(), peers.clone()); + gs.mesh + .insert(topic_hashes[0].clone(), peers.iter().cloned().collect()); assert!( gs.mesh.get(&topic_hashes[0]).unwrap().contains(&peers[7]), "Expected peer to be in mesh" @@ -848,4 +846,53 @@ mod tests { "Expected peer to be removed from mesh" ); } + + #[test] + // Tests the mesh maintenance addition + fn test_mesh_addition() { + let config = GossipsubConfig::default(); + + // Adds mesh_low peers and PRUNE 2 giving us a deficit. + let (mut gs, peers, topics) = + build_and_inject_nodes(config.mesh_n + 1, vec!["test".into()], true); + + let to_remove_peers = config.mesh_n + 1 - config.mesh_n_low - 1; + + for index in 0..to_remove_peers { + gs.handle_prune(&peers[index], topics.clone()); + } + + // Verify the pruned peers are removed from the mesh. + assert_eq!( + gs.mesh.get(&topics[0]).unwrap().len(), + config.mesh_n_low - 1 + ); + + // run a heartbeat + gs.heartbeat(); + + // Peers should be added to reach mesh_n + assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n); + } + + #[test] + // Tests the mesh maintenance subtraction + fn test_mesh_subtraction() { + let config = GossipsubConfig::default(); + + // Adds mesh_low peers and PRUNE 2 giving us a deficit. + let (mut gs, peers, topics) = + build_and_inject_nodes(config.mesh_n_high + 10, vec!["test".into()], true); + + // graft all the peers + for peer in peers { + gs.handle_graft(&peer, topics.clone()); + } + + // run a heartbeat + gs.heartbeat(); + + // Peers should be removed to reach mesh_n + assert_eq!(gs.mesh.get(&topics[0]).unwrap().len(), config.mesh_n); + } } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index b00070a9022..330788ac5f7 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -19,12 +19,31 @@ // DEALINGS IN THE SOFTWARE. use crate::protocol::{GossipsubMessage, MessageId}; +use libp2p_core::PeerId; use std::borrow::Cow; use std::time::Duration; -/// If the `no_source_id` flag is set, the IDENTITY_SOURCE value is used as the source of the -/// packet. -pub const IDENTITY_SOURCE: [u8; 3] = [0, 1, 0]; +/// The types of message validation that can be employed by gossipsub. +#[derive(Debug, Clone)] +pub enum ValidationMode { + /// This is the default setting. This requires the message author to be a valid `PeerId` and to + /// be present as well as the sequence number. All messages must have valid signatures. + /// + /// NOTE: This setting will reject messages from nodes using `PrivacyMode::Anonymous` and + /// all messages that do not have signatures. + Strict, + /// This setting permits messages that have no author, sequence number or signature. If any of + /// these fields exist in the message these are validated. + Permissive, + /// This setting requires the author, sequence number and signature fields of a message to be + /// empty. Any message that contains these fields is considered invalid. + Anonymous, + /// This setting does not check the author, sequence number or signature fields of incoming + /// messages. If these fields contain data, they are simply ignored. + /// + /// NOTE: This setting will consider messages with invalid signatures as valid messages. + None, +} /// Configuration parameters that define the performance of the gossipsub network. #[derive(Clone)] @@ -42,7 +61,7 @@ pub struct GossipsubConfig { /// Target number of peers for the mesh network (D in the spec, default is 6). pub mesh_n: usize, - /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4). + /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 5). pub mesh_n_low: usize, /// Maximum number of peers in mesh network before removing some (D_high in the spec, default @@ -64,17 +83,26 @@ pub struct GossipsubConfig { /// The maximum byte size for each gossip (default is 2048 bytes). pub max_transmit_size: usize, + /// Duplicates are prevented by storing message id's of known messages in an LRU time cache. + /// This settings sets the time period that messages are stored in the cache. Duplicates can be + /// received if duplicate messages are sent at a time greater than this setting apart. The + /// default is 1 minute. + pub duplicate_cache_time: Duration, + /// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false). pub hash_topics: bool, - /// When set, all published messages will have a 0 source `PeerId` (default is false). - pub no_source_id: bool, - /// When set to `true`, prevents automatic forwarding of all received messages. This setting /// allows a user to validate the messages before propagating them to their peers. If set to - /// true, the user must manually call `propagate_message()` on the behaviour to forward message - /// once validated (default is false). - pub manual_propagation: bool, + /// true, the user must manually call `validate_message()` on the behaviour to forward message + /// once validated (default is `false`). Furthermore, the application may optionally call + /// `invalidate_message()` on the behaviour to remove the message from the memcache. The + /// default is false. + pub validate_messages: bool, + + /// Determines the level of validation used when receiving messages. See [`ValidationMode`] + /// for the available types. The default is ValidationMode::Strict. + pub validation_mode: ValidationMode, /// A user-defined function allowing the user to specify the message id of a gossipsub message. /// The default value is to concatenate the source peer id with a sequence number. Setting this @@ -94,26 +122,35 @@ impl Default for GossipsubConfig { history_length: 5, history_gossip: 3, mesh_n: 6, - mesh_n_low: 4, + mesh_n_low: 5, mesh_n_high: 12, gossip_lazy: 6, // default to mesh_n heartbeat_initial_delay: Duration::from_secs(5), heartbeat_interval: Duration::from_secs(1), fanout_ttl: Duration::from_secs(60), max_transmit_size: 2048, + duplicate_cache_time: Duration::from_secs(60), hash_topics: false, // default compatibility with floodsub - no_source_id: false, - manual_propagation: false, + validate_messages: false, + validation_mode: ValidationMode::Strict, message_id_fn: |message| { // default message id is: source + sequence number - let mut source_string = message.source.to_base58(); - source_string.push_str(&message.sequence_number.to_string()); - MessageId(source_string) + // NOTE: If either the peer_id or source is not provided, we set to 0; + let mut source_string = if let Some(peer_id) = message.source.as_ref() { + peer_id.to_base58() + } else { + PeerId::from_bytes(vec![0, 1, 0]) + .expect("Valid peer id") + .to_base58() + }; + source_string.push_str(&message.sequence_number.unwrap_or_default().to_string()); + MessageId::from(source_string) }, } } } +/// The builder struct for constructing a gossipsub configuration. pub struct GossipsubConfigBuilder { config: GossipsubConfig, } @@ -129,14 +166,18 @@ impl Default for GossipsubConfigBuilder { impl GossipsubConfigBuilder { // set default values pub fn new() -> GossipsubConfigBuilder { - GossipsubConfigBuilder::default() + GossipsubConfigBuilder { + config: GossipsubConfig::default(), + } } + /// The protocol id to negotiate this protocol (default is `/meshsub/1.0.0`). pub fn protocol_id(&mut self, protocol_id: impl Into>) -> &mut Self { self.config.protocol_id = protocol_id.into(); self } + /// Number of heartbeats to keep in the `memcache` (default is 5). pub fn history_length(&mut self, history_length: usize) -> &mut Self { assert!( history_length >= self.config.history_gossip, @@ -146,6 +187,7 @@ impl GossipsubConfigBuilder { self } + /// Number of past heartbeats to gossip about (default is 3). pub fn history_gossip(&mut self, history_gossip: usize) -> &mut Self { assert!( self.config.history_length >= history_gossip, @@ -155,6 +197,7 @@ impl GossipsubConfigBuilder { self } + /// Target number of peers for the mesh network (D in the spec, default is 6). pub fn mesh_n(&mut self, mesh_n: usize) -> &mut Self { assert!( self.config.mesh_n_low <= mesh_n && mesh_n <= self.config.mesh_n_high, @@ -164,6 +207,7 @@ impl GossipsubConfigBuilder { self } + /// Minimum number of peers in mesh network before adding more (D_lo in the spec, default is 4). pub fn mesh_n_low(&mut self, mesh_n_low: usize) -> &mut Self { assert!( mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= self.config.mesh_n_high, @@ -173,6 +217,8 @@ impl GossipsubConfigBuilder { self } + /// Maximum number of peers in mesh network before removing some (D_high in the spec, default + /// is 12). pub fn mesh_n_high(&mut self, mesh_n_high: usize) -> &mut Self { assert!( self.config.mesh_n_low <= self.config.mesh_n && self.config.mesh_n <= mesh_n_high, @@ -182,48 +228,81 @@ impl GossipsubConfigBuilder { self } + /// Number of peers to emit gossip to during a heartbeat (D_lazy in the spec, default is 6). pub fn gossip_lazy(&mut self, gossip_lazy: usize) -> &mut Self { self.config.gossip_lazy = gossip_lazy; self } + /// Initial delay in each heartbeat (default is 5 seconds). pub fn heartbeat_initial_delay(&mut self, heartbeat_initial_delay: Duration) -> &mut Self { self.config.heartbeat_initial_delay = heartbeat_initial_delay; self } + + /// Time between each heartbeat (default is 1 second). pub fn heartbeat_interval(&mut self, heartbeat_interval: Duration) -> &mut Self { self.config.heartbeat_interval = heartbeat_interval; self } + + /// Time to live for fanout peers (default is 60 seconds). pub fn fanout_ttl(&mut self, fanout_ttl: Duration) -> &mut Self { self.config.fanout_ttl = fanout_ttl; self } + + /// The maximum byte size for each gossip (default is 2048 bytes). pub fn max_transmit_size(&mut self, max_transmit_size: usize) -> &mut Self { self.config.max_transmit_size = max_transmit_size; self } + /// Duplicates are prevented by storing message id's of known messages in an LRU time cache. + /// This settings sets the time period that messages are stored in the cache. Duplicates can be + /// received if duplicate messages are sent at a time greater than this setting apart. The + /// default is 1 minute. + pub fn duplicate_cache_time(&mut self, cache_size: Duration) -> &mut Self { + self.config.duplicate_cache_time = cache_size; + self + } + + /// When set, gossipsub topics are hashed instead of being sent as plain strings. pub fn hash_topics(&mut self) -> &mut Self { self.config.hash_topics = true; self } - pub fn no_source_id(&mut self) -> &mut Self { - self.config.no_source_id = true; + /// When set, prevents automatic forwarding of all received messages. This setting + /// allows a user to validate the messages before propagating them to their peers. If set, + /// the user must manually call `validate_message()` on the behaviour to forward a message + /// once validated. + pub fn validate_messages(&mut self) -> &mut Self { + self.config.validate_messages = true; self } - pub fn manual_propagation(&mut self) -> &mut Self { - self.config.manual_propagation = true; + /// Determines the level of validation used when receiving messages. See [`ValidationMode`] + /// for the available types. The default is ValidationMode::Strict. + pub fn validation_mode(&mut self, validation_mode: ValidationMode) -> &mut Self { + self.config.validation_mode = validation_mode; self } + /// A user-defined function allowing the user to specify the message id of a gossipsub message. + /// The default value is to concatenate the source peer id with a sequence number. Setting this + /// parameter allows the user to address packets arbitrarily. One example is content based + /// addressing, where this function may be set to `hash(message)`. This would prevent messages + /// of the same content from being duplicated. + /// + /// The function takes a `GossipsubMessage` as input and outputs a String to be interpreted as + /// the message id. pub fn message_id_fn(&mut self, id_fn: fn(&GossipsubMessage) -> MessageId) -> &mut Self { self.config.message_id_fn = id_fn; self } + /// Constructs a `GossipsubConfig` from the given configuration. pub fn build(&self) -> GossipsubConfig { self.config.clone() } @@ -247,9 +326,9 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("heartbeat_interval", &self.heartbeat_interval); let _ = builder.field("fanout_ttl", &self.fanout_ttl); let _ = builder.field("max_transmit_size", &self.max_transmit_size); + let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time); let _ = builder.field("hash_topics", &self.hash_topics); - let _ = builder.field("no_source_id", &self.no_source_id); - let _ = builder.field("manual_propagation", &self.manual_propagation); + let _ = builder.field("validate_messages", &self.validate_messages); builder.finish() } } diff --git a/protocols/gossipsub/src/error.rs b/protocols/gossipsub/src/error.rs new file mode 100644 index 00000000000..6f774607117 --- /dev/null +++ b/protocols/gossipsub/src/error.rs @@ -0,0 +1,40 @@ +// Copyright 2020 Sigma Prime Pty Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Error types that can result from gossipsub. + +use libp2p_core::identity::error::SigningError; + +/// Error associated with publishing a gossipsub message. +#[derive(Debug)] +pub enum PublishError { + /// This message has already been published. + Duplicate, + /// An error occurred whilst signing the message. + SigningError(SigningError), + /// There were no peers to send this message to. + InsufficientPeers, +} + +impl From for PublishError { + fn from(error: SigningError) -> Self { + PublishError::SigningError(error) + } +} diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 41f146d1be6..6daca114545 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::behaviour::GossipsubRpc; +use crate::config::ValidationMode; use crate::protocol::{GossipsubCodec, ProtocolConfig}; use futures::prelude::*; use futures_codec::Framed; @@ -50,6 +51,10 @@ pub struct GossipsubHandler { /// Queue of values that we want to send to the remote. send_queue: SmallVec<[GossipsubRpc; 16]>, + /// Flag indicating that an outbound substream is being established to prevent duplicate + /// requests. + outbound_substream_establishing: bool, + /// Flag determining whether to maintain the connection to the peer. keep_alive: KeepAlive, } @@ -80,26 +85,20 @@ enum OutboundSubstreamState { impl GossipsubHandler { /// Builds a new `GossipsubHandler`. - pub fn new(protocol_id: impl Into>, max_transmit_size: usize) -> Self { + pub fn new( + protocol_id: impl Into>, + max_transmit_size: usize, + validation_mode: ValidationMode, + ) -> Self { GossipsubHandler { listen_protocol: SubstreamProtocol::new(ProtocolConfig::new( protocol_id, max_transmit_size, + validation_mode, )), inbound_substream: None, outbound_substream: None, - send_queue: SmallVec::new(), - keep_alive: KeepAlive::Yes, - } - } -} - -impl Default for GossipsubHandler { - fn default() -> Self { - GossipsubHandler { - listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()), - inbound_substream: None, - outbound_substream: None, + outbound_substream_establishing: false, send_queue: SmallVec::new(), keep_alive: KeepAlive::Yes, } @@ -132,6 +131,7 @@ impl ProtocolsHandler for GossipsubHandler { substream: >::Output, message: Self::OutboundOpenInfo, ) { + self.outbound_substream_establishing = false; // Should never establish a new outbound substream if one already exists. // If this happens, an outbound message is not sent. if self.outbound_substream.is_some() { @@ -154,6 +154,7 @@ impl ProtocolsHandler for GossipsubHandler { >::Error, >, ) { + self.outbound_substream_establishing = false; // Ignore upgrade errors for now. // If a peer doesn't support this protocol, this will just ignore them, but not disconnect // them. @@ -175,9 +176,13 @@ impl ProtocolsHandler for GossipsubHandler { >, > { // determine if we need to create the stream - if !self.send_queue.is_empty() && self.outbound_substream.is_none() { + if !self.send_queue.is_empty() + && self.outbound_substream.is_none() + && !self.outbound_substream_establishing + { let message = self.send_queue.remove(0); self.send_queue.shrink_to_fit(); + self.outbound_substream_establishing = true; return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { protocol: self.listen_protocol.clone(), info: message, @@ -198,9 +203,21 @@ impl ProtocolsHandler for GossipsubHandler { return Poll::Ready(ProtocolsHandlerEvent::Custom(message)); } Poll::Ready(Some(Err(e))) => { - debug!("Inbound substream error while awaiting input: {:?}", e); - self.inbound_substream = - Some(InboundSubstreamState::Closing(substream)); + match e.kind() { + std::io::ErrorKind::InvalidData => { + // Invalid message, ignore it and reset to waiting + warn!("Invalid message received. Error: {}", e); + self.inbound_substream = + Some(InboundSubstreamState::WaitingInput(substream)); + } + _ => { + // More serious errors, close this side of the stream. If the + // peer is still around, they will re-establish their + // connection + self.inbound_substream = + Some(InboundSubstreamState::Closing(substream)); + } + } } // peer closed the stream Poll::Ready(None) => { @@ -242,7 +259,7 @@ impl ProtocolsHandler for GossipsubHandler { break; } Some(InboundSubstreamState::Poisoned) => { - panic!("Error occurred during inbound stream processing") + unreachable!("Error occurred during inbound stream processing") } } } @@ -338,7 +355,7 @@ impl ProtocolsHandler for GossipsubHandler { break; } Some(OutboundSubstreamState::Poisoned) => { - panic!("Error occurred during outbound stream processing") + unreachable!("Error occurred during outbound stream processing") } } } diff --git a/protocols/gossipsub/src/lib.rs b/protocols/gossipsub/src/lib.rs index e0efa955714..a81614eed17 100644 --- a/protocols/gossipsub/src/lib.rs +++ b/protocols/gossipsub/src/lib.rs @@ -135,6 +135,7 @@ //! println!("Listening on {:?}", addr); //! ``` +pub mod error; pub mod protocol; mod behaviour; @@ -147,7 +148,7 @@ mod rpc_proto { include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs")); } -pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc}; -pub use self::config::{GossipsubConfig, GossipsubConfigBuilder}; +pub use self::behaviour::{Gossipsub, GossipsubEvent, GossipsubRpc, MessageAuthenticity}; +pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode}; pub use self::protocol::{GossipsubMessage, MessageId}; pub use self::topic::{Topic, TopicHash}; diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index 52c0be08f4d..05a49551aed 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -65,34 +65,22 @@ impl MessageCache { } } - /// Creates a `MessageCache` with a default message id function. - #[allow(dead_code)] - pub fn new_default(gossip: usize, history_capacity: usize) -> MessageCache { - let default_id = |message: &GossipsubMessage| { - // default message id is: source + sequence number - let mut source_string = message.source.to_base58(); - source_string.push_str(&message.sequence_number.to_string()); - MessageId(source_string) - }; - MessageCache { - gossip, - msgs: HashMap::default(), - history: vec![Vec::new(); history_capacity], - msg_id: default_id, - } - } - - /// Put a message into the memory cache - pub fn put(&mut self, msg: GossipsubMessage) { + /// Put a message into the memory cache. + /// + /// Returns the message if it already exists. + pub fn put(&mut self, msg: GossipsubMessage) -> Option { let message_id = (self.msg_id)(&msg); let cache_entry = CacheEntry { mid: message_id.clone(), topics: msg.topics.clone(), }; - self.msgs.insert(message_id, msg); - - self.history[0].push(cache_entry); + let seen_message = self.msgs.insert(message_id, msg); + if seen_message.is_none() { + // Don't add duplicate entries to the cache. + self.history[0].push(cache_entry); + } + seen_message } /// Get a message with `message_id` @@ -100,6 +88,14 @@ impl MessageCache { self.msgs.get(message_id) } + /// Gets and validates a message with `message_id`. + pub fn validate(&mut self, message_id: &MessageId) -> Option<&GossipsubMessage> { + self.msgs.get_mut(message_id).map(|message| { + message.validated = true; + &*message + }) + } + /// Get a list of GossipIds for a given topic pub fn get_gossip_ids(&self, topic: &TopicHash) -> Vec { self.history[..self.gossip] @@ -110,7 +106,13 @@ impl MessageCache { .iter() .filter_map(|entry| { if entry.topics.iter().any(|t| t == topic) { - Some(entry.mid.clone()) + let mid = &entry.mid; + // Only gossip validated messages + if let Some(true) = self.msgs.get(mid).map(|msg| msg.validated) { + Some(mid.clone()) + } else { + None + } } else { None } @@ -143,30 +145,38 @@ mod tests { fn gen_testm(x: u64, topics: Vec) -> GossipsubMessage { let u8x: u8 = x as u8; - let source = PeerId::random(); + let source = Some(PeerId::random()); let data: Vec = vec![u8x]; - let sequence_number = x; + let sequence_number = Some(x); let m = GossipsubMessage { source, data, sequence_number, topics, + signature: None, + key: None, + validated: true, }; m } - #[test] - /// Test that the message cache can be created. - fn test_new_cache() { + fn new_cache(gossip_size: usize, history: usize) -> MessageCache { let default_id = |message: &GossipsubMessage| { // default message id is: source + sequence number - let mut source_string = message.source.to_base58(); - source_string.push_str(&message.sequence_number.to_string()); - MessageId(source_string) + let mut source_string = message.source.as_ref().unwrap().to_base58(); + source_string.push_str(&message.sequence_number.unwrap().to_string()); + MessageId::from(source_string) }; + + MessageCache::new(gossip_size, history, default_id) + } + + #[test] + /// Test that the message cache can be created. + fn test_new_cache() { let x: usize = 3; - let mc = MessageCache::new(x, 5, default_id); + let mc = new_cache(x, 5); assert_eq!(mc.gossip, x); } @@ -174,7 +184,7 @@ mod tests { #[test] /// Test you can put one message and get one. fn test_put_get_one() { - let mut mc = MessageCache::new_default(10, 15); + let mut mc = new_cache(10, 15); let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); @@ -200,7 +210,7 @@ mod tests { #[test] /// Test attempting to 'get' with a wrong id. fn test_get_wrong() { - let mut mc = MessageCache::new_default(10, 15); + let mut mc = new_cache(10, 15); let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); @@ -210,7 +220,7 @@ mod tests { mc.put(m.clone()); // Try to get an incorrect ID - let wrong_id = MessageId(String::from("wrongid")); + let wrong_id = MessageId::new(b"wrongid"); let fetched = mc.get(&wrong_id); assert_eq!(fetched.is_none(), true); } @@ -218,10 +228,10 @@ mod tests { #[test] /// Test attempting to 'get' empty message cache. fn test_get_empty() { - let mc = MessageCache::new_default(10, 15); + let mc = new_cache(10, 15); // Try to get an incorrect ID - let wrong_string = MessageId(String::from("imempty")); + let wrong_string = MessageId::new(b"imempty"); let fetched = mc.get(&wrong_string); assert_eq!(fetched.is_none(), true); } @@ -229,7 +239,7 @@ mod tests { #[test] /// Test adding a message with no topics. fn test_no_topic_put() { - let mut mc = MessageCache::new_default(3, 5); + let mut mc = new_cache(3, 5); // Build the message let m = gen_testm(1, vec![]); @@ -247,7 +257,7 @@ mod tests { #[test] /// Test shift mechanism. fn test_shift() { - let mut mc = MessageCache::new_default(1, 5); + let mut mc = new_cache(1, 5); let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); @@ -271,7 +281,7 @@ mod tests { #[test] /// Test Shift with no additions. fn test_empty_shift() { - let mut mc = MessageCache::new_default(1, 5); + let mut mc = new_cache(1, 5); let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); @@ -297,7 +307,7 @@ mod tests { #[test] /// Test shift to see if the last history messages are removed. fn test_remove_last_from_shift() { - let mut mc = MessageCache::new_default(4, 5); + let mut mc = new_cache(4, 5); let topic1_hash = Topic::new("topic1".into()).no_hash().clone(); let topic2_hash = Topic::new("topic2".into()).no_hash().clone(); diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 53a7506372d..b1785513c5d 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::behaviour::GossipsubRpc; +use crate::config::ValidationMode; use crate::rpc_proto; use crate::topic::TopicHash; use byteorder::{BigEndian, ByteOrder}; @@ -27,25 +28,23 @@ use bytes::BytesMut; use futures::future; use futures::prelude::*; use futures_codec::{Decoder, Encoder, Framed}; -use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; +use libp2p_core::{identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; +use log::{debug, warn}; use prost::Message as ProtobufMessage; use std::{borrow::Cow, fmt, io, iter, pin::Pin}; use unsigned_varint::codec; +pub const SIGNING_PREFIX: &'static [u8] = b"libp2p-pubsub:"; + /// Implementation of the `ConnectionUpgrade` for the Gossipsub protocol. -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct ProtocolConfig { + /// The gossipsub protocol id to listen on. protocol_id: Cow<'static, [u8]>, + /// The maximum transmit size for a packet. max_transmit_size: usize, -} - -impl Default for ProtocolConfig { - fn default() -> Self { - Self { - protocol_id: Cow::Borrowed(b"/meshsub/1.0.0"), - max_transmit_size: 2048, - } - } + /// Determines the level of validation to be done on incoming messages. + validation_mode: ValidationMode, } impl ProtocolConfig { @@ -54,10 +53,12 @@ impl ProtocolConfig { pub fn new( protocol_id: impl Into>, max_transmit_size: usize, + validation_mode: ValidationMode, ) -> ProtocolConfig { ProtocolConfig { protocol_id: protocol_id.into(), max_transmit_size, + validation_mode, } } } @@ -84,7 +85,7 @@ where length_codec.set_max_len(self.max_transmit_size); Box::pin(future::ok(Framed::new( socket, - GossipsubCodec { length_codec }, + GossipsubCodec::new(length_codec, self.validation_mode), ))) } } @@ -102,7 +103,7 @@ where length_codec.set_max_len(self.max_transmit_size); Box::pin(future::ok(Framed::new( socket, - GossipsubCodec { length_codec }, + GossipsubCodec::new(length_codec, self.validation_mode), ))) } } @@ -112,6 +113,81 @@ where pub struct GossipsubCodec { /// Codec to encode/decode the Unsigned varint length prefix of the frames. length_codec: codec::UviBytes, + /// Determines the level of validation performed on incoming messages. + validation_mode: ValidationMode, +} + +impl GossipsubCodec { + pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> Self { + GossipsubCodec { + length_codec, + validation_mode, + } + } + + /// Verifies a gossipsub message. This returns either a success or failure. All errors + /// are logged, which prevents error handling in the codec and handler. We simply drop invalid + /// messages and log warnings, rather than propagating errors through the codec. + fn verify_signature(message: &rpc_proto::Message) -> bool { + let from = match message.from.as_ref() { + Some(v) => v, + None => { + debug!("Signature verification failed: No source id given"); + return false; + } + }; + + let source = match PeerId::from_bytes(from.clone()) { + Ok(v) => v, + Err(_) => { + debug!("Signature verification failed: Invalid Peer Id"); + return false; + } + }; + + let signature = match message.signature.as_ref() { + Some(v) => v, + None => { + debug!("Signature verification failed: No signature provided"); + return false; + } + }; + + // If there is a key value in the protobuf, use that key otherwise the key must be + // obtained from the inlined source peer_id. + let public_key = match message + .key + .as_ref() + .map(|key| PublicKey::from_protobuf_encoding(&key)) + { + Some(Ok(key)) => key, + _ => match PublicKey::from_protobuf_encoding(&source.as_bytes()[2..]) { + Ok(v) => v, + Err(_) => { + warn!("Signature verification failed: No valid public key supplied"); + return false; + } + }, + }; + + // The key must match the peer_id + if source != public_key.clone().into_peer_id() { + warn!("Signature verification failed: Public key doesn't match source peer id"); + return false; + } + + // Construct the signature bytes + let mut message_sig = message.clone(); + message_sig.signature = None; + message_sig.key = None; + let mut buf = Vec::with_capacity(message_sig.encoded_len()); + message_sig + .encode(&mut buf) + .expect("Buffer has sufficient capacity"); + let mut signature_bytes = SIGNING_PREFIX.to_vec(); + signature_bytes.extend_from_slice(&buf); + public_key.verify(&signature_bytes, signature) + } } impl Encoder for GossipsubCodec { @@ -119,21 +195,20 @@ impl Encoder for GossipsubCodec { type Error = io::Error; fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { - // messages - let publish = item - .messages - .into_iter() - .map(|message| rpc_proto::Message { - from: Some(message.source.into_bytes()), + // Messages + let mut publish = Vec::new(); + + for message in item.messages.into_iter() { + let message = rpc_proto::Message { + from: message.source.map(|m| m.into_bytes()), data: Some(message.data), - seqno: Some(message.sequence_number.to_be_bytes().to_vec()), - topic_ids: message - .topics - .into_iter() - .map(TopicHash::into_string) - .collect(), - }) - .collect::>(); + seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()), + topic_ids: message.topics.into_iter().map(TopicHash::into).collect(), + signature: message.signature, + key: message.key, + }; + publish.push(message); + } // subscriptions let subscriptions = item @@ -141,7 +216,7 @@ impl Encoder for GossipsubCodec { .into_iter() .map(|sub| rpc_proto::rpc::SubOpts { subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe), - topic_id: Some(sub.topic_hash.into_string()), + topic_id: Some(sub.topic_hash.into()), }) .collect::>(); @@ -163,7 +238,7 @@ impl Encoder for GossipsubCodec { message_ids, } => { let rpc_ihave = rpc_proto::ControlIHave { - topic_id: Some(topic_hash.into_string()), + topic_id: Some(topic_hash.into()), message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(), }; control.ihave.push(rpc_ihave); @@ -176,13 +251,13 @@ impl Encoder for GossipsubCodec { } GossipsubControlAction::Graft { topic_hash } => { let rpc_graft = rpc_proto::ControlGraft { - topic_id: Some(topic_hash.into_string()), + topic_id: Some(topic_hash.into()), }; control.graft.push(rpc_graft); } GossipsubControlAction::Prune { topic_hash } => { let rpc_prune = rpc_proto::ControlPrune { - topic_id: Some(topic_hash.into_string()), + topic_id: Some(topic_hash.into()), }; control.prune.push(rpc_prune); } @@ -222,30 +297,101 @@ impl Decoder for GossipsubCodec { let rpc = rpc_proto::Rpc::decode(&packet[..])?; let mut messages = Vec::with_capacity(rpc.publish.len()); - for publish in rpc.publish.into_iter() { + for message in rpc.publish.into_iter() { + let mut verify_signature = false; + let mut verify_sequence_no = false; + let mut verify_source = false; + + match self.validation_mode { + ValidationMode::Strict => { + // Validate everything + verify_signature = true; + verify_sequence_no = true; + verify_source = true; + } + ValidationMode::Permissive => { + // If the fields exist, validate them + if message.signature.is_some() { + verify_signature = true; + } + if message.seqno.is_some() { + verify_sequence_no = true; + } + if message.from.is_some() { + verify_source = true; + } + } + ValidationMode::Anonymous => { + if message.signature.is_some() { + warn!("Message dropped. Signature field was non-empty and anonymous validation mode is set"); + return Ok(None); + } + if message.seqno.is_some() { + warn!("Message dropped. Sequence number was non-empty and anonymous validation mode is set"); + return Ok(None); + } + if message.from.is_some() { + warn!("Message dropped. Message source was non-empty and anonymous validation mode is set"); + return Ok(None); + } + } + ValidationMode::None => {} + } + + // verify message signatures if required + if verify_signature { + // If a single message is unsigned, we will drop all of them + // Most implementations should not have a list of mixed signed/not-signed messages in a single RPC + // NOTE: Invalid messages are simply dropped with a warning log. We don't throw an + // error to avoid extra logic to deal with these errors in the handler. + if !GossipsubCodec::verify_signature(&message) { + warn!("Message dropped. Invalid signature"); + // Drop the message + return Ok(None); + } + } + // ensure the sequence number is a u64 - let seq_no = publish.seqno.ok_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidData, - "sequence number was not provided", + let sequence_number = if verify_sequence_no { + let seq_no = message.seqno.ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + "sequence number was not provided", + ) + })?; + if seq_no.len() != 8 { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "sequence number has an incorrect size", + )); + } + Some(BigEndian::read_u64(&seq_no)) + } else { + None + }; + + let source = if verify_source { + Some( + PeerId::from_bytes(message.from.unwrap_or_default()).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id") + })?, ) - })?; - if seq_no.len() != 8 { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "sequence number has an incorrect size", - )); - } + } else { + None + }; + messages.push(GossipsubMessage { - source: PeerId::from_bytes(publish.from.unwrap_or_default()) - .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?, - data: publish.data.unwrap_or_default(), - sequence_number: BigEndian::read_u64(&seq_no), - topics: publish + source, + data: message.data.unwrap_or_default(), + sequence_number, + topics: message .topic_ids .into_iter() .map(TopicHash::from_raw) .collect(), + signature: message.signature, + key: message.key, + validated: false, }); } @@ -261,7 +407,7 @@ impl Decoder for GossipsubCodec { message_ids: ihave .message_ids .into_iter() - .map(|x| MessageId(x)) + .map(MessageId::from) .collect::>(), }) .collect(); @@ -273,7 +419,7 @@ impl Decoder for GossipsubCodec { message_ids: iwant .message_ids .into_iter() - .map(|x| MessageId(x)) + .map(MessageId::from) .collect::>(), }) .collect(); @@ -320,18 +466,30 @@ impl Decoder for GossipsubCodec { } /// A type for gossipsub message ids. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct MessageId(pub String); +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct MessageId(Vec); + +impl MessageId { + pub fn new(value: &[u8]) -> Self { + Self(value.to_vec()) + } +} + +impl>> From for MessageId { + fn from(value: T) -> Self { + Self(value.into()) + } +} impl std::fmt::Display for MessageId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) + write!(f, "{}", hex_fmt::HexFmt(&self.0)) } } -impl Into for MessageId { - fn into(self) -> String { - self.0.into() +impl std::fmt::Debug for MessageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "MessageId({})", hex_fmt::HexFmt(&self.0)) } } @@ -339,18 +497,27 @@ impl Into for MessageId { #[derive(Clone, PartialEq, Eq, Hash)] pub struct GossipsubMessage { /// Id of the peer that published this message. - pub source: PeerId, + pub source: Option, /// Content of the message. Its meaning is out of scope of this library. pub data: Vec, /// A random sequence number. - pub sequence_number: u64, + pub sequence_number: Option, /// List of topics this message belongs to. /// /// Each message can belong to multiple topics at once. pub topics: Vec, + + /// The signature of the message if it's signed. + pub signature: Option>, + + /// The public key of the message if it is signed and the source `PeerId` cannot be inlined. + pub key: Option>, + + /// Flag indicating if this message has been validated by the application or not. + pub validated: bool, } impl fmt::Debug for GossipsubMessage { @@ -408,3 +575,96 @@ pub enum GossipsubControlAction { topic_hash: TopicHash, }, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::topic::Topic; + use crate::{Gossipsub, GossipsubConfig}; + use libp2p_core::identity::Keypair; + use quickcheck::*; + use rand::Rng; + + #[derive(Clone, Debug)] + struct Message(GossipsubMessage); + + impl Arbitrary for Message { + fn arbitrary(g: &mut G) -> Self { + let keypair = TestKeypair::arbitrary(g); + + // generate an arbitrary GossipsubMessage using the behaviour signing functionality + let config = GossipsubConfig::default(); + let gs = Gossipsub::new( + crate::MessageAuthenticity::Signed(keypair.0.clone()), + config, + ); + let data = (0..g.gen_range(1, 1024)).map(|_| g.gen()).collect(); + let topics = Vec::arbitrary(g) + .into_iter() + .map(|id: TopicId| id.0) + .collect(); + Message(gs.build_message(topics, data).unwrap()) + } + } + + #[derive(Clone, Debug)] + struct TopicId(TopicHash); + + impl Arbitrary for TopicId { + fn arbitrary(g: &mut G) -> Self { + TopicId( + Topic::new((0..g.gen_range(0, 1024)).map(|_| g.gen::()).collect()) + .sha256_hash(), + ) + } + } + + #[derive(Clone)] + struct TestKeypair(Keypair); + + impl Arbitrary for TestKeypair { + fn arbitrary(g: &mut G) -> Self { + let keypair = if g.gen() { + // Small enough to be inlined. + Keypair::generate_secp256k1() + } else { + // Too large to be inlined. + let mut rsa_key = hex::decode("308204bd020100300d06092a864886f70d0101010500048204a7308204a30201000282010100ef930f41a71288b643c1cbecbf5f72ab53992249e2b00835bf07390b6745419f3848cbcc5b030faa127bc88cdcda1c1d6f3ff699f0524c15ab9d2c9d8015f5d4bd09881069aad4e9f91b8b0d2964d215cdbbae83ddd31a7622a8228acee07079f6e501aea95508fa26c6122816ef7b00ac526d422bd12aed347c37fff6c1c307f3ba57bb28a7f28609e0bdcc839da4eedca39f5d2fa855ba4b0f9c763e9764937db929a1839054642175312a3de2d3405c9d27bdf6505ef471ce85c5e015eee85bf7874b3d512f715de58d0794fd8afe021c197fbd385bb88a930342fac8da31c27166e2edab00fa55dc1c3814448ba38363077f4e8fe2bdea1c081f85f1aa6f02030100010282010028ff427a1aac1a470e7b4879601a6656193d3857ea79f33db74df61e14730e92bf9ffd78200efb0c40937c3356cbe049cd32e5f15be5c96d5febcaa9bd3484d7fded76a25062d282a3856a1b3b7d2c525cdd8434beae147628e21adf241dd64198d5819f310d033743915ba40ea0b6acdbd0533022ad6daa1ff42de51885f9e8bab2306c6ef1181902d1cd7709006eba1ab0587842b724e0519f295c24f6d848907f772ae9a0953fc931f4af16a07df450fb8bfa94572562437056613647818c238a6ff3f606cffa0533e4b8755da33418dfbc64a85110b1a036623c947400a536bb8df65e5ebe46f2dfd0cfc86e7aeeddd7574c253e8fbf755562b3669525d902818100f9fff30c6677b78dd31ec7a634361438457e80be7a7faf390903067ea8355faa78a1204a82b6e99cb7d9058d23c1ecf6cfe4a900137a00cecc0113fd68c5931602980267ea9a95d182d48ba0a6b4d5dd32fdac685cb2e5d8b42509b2eb59c9579ea6a67ccc7547427e2bd1fb1f23b0ccb4dd6ba7d206c8dd93253d70a451701302818100f5530dfef678d73ce6a401ae47043af10a2e3f224c71ae933035ecd68ccbc4df52d72bc6ca2b17e8faf3e548b483a2506c0369ab80df3b137b54d53fac98f95547c2bc245b416e650ce617e0d29db36066f1335a9ba02ad3e0edf9dc3d58fd835835042663edebce81803972696c789012847cb1f854ab2ac0a1bd3867ac7fb502818029c53010d456105f2bf52a9a8482bca2224a5eac74bf3cc1a4d5d291fafcdffd15a6a6448cce8efdd661f6617ca5fc37c8c885cc3374e109ac6049bcbf72b37eabf44602a2da2d4a1237fd145c863e6d75059976de762d9d258c42b0984e2a2befa01c95217c3ee9c736ff209c355466ff99375194eff943bc402ea1d172a1ed02818027175bf493bbbfb8719c12b47d967bf9eac061c90a5b5711172e9095c38bb8cc493c063abffe4bea110b0a2f22ac9311b3947ba31b7ef6bfecf8209eebd6d86c316a2366bbafda7279b2b47d5bb24b6202254f249205dcad347b574433f6593733b806f84316276c1990a016ce1bbdbe5f650325acc7791aefe515ecc60063bd02818100b6a2077f4adcf15a17092d9c4a346d6022ac48f3861b73cf714f84c440a07419a7ce75a73b9cbff4597c53c128bf81e87b272d70428a272d99f90cd9b9ea1033298e108f919c6477400145a102df3fb5601ffc4588203cf710002517bfa24e6ad32f4d09c6b1a995fa28a3104131bedd9072f3b4fb4a5c2056232643d310453f").unwrap(); + Keypair::rsa_from_pkcs8(&mut rsa_key).unwrap() + }; + TestKeypair(keypair) + } + } + + impl std::fmt::Debug for TestKeypair { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestKeypair") + .field("public", &self.0.public()) + .finish() + } + } + + #[test] + fn encode_decode() { + fn prop(message: Message) { + let message = message.0; + + let rpc = GossipsubRpc { + messages: vec![message], + subscriptions: vec![], + control_msgs: vec![], + }; + + let mut codec = GossipsubCodec::new(codec::UviBytes::default(), ValidationMode::Strict); + let mut buf = BytesMut::new(); + codec.encode(rpc.clone(), &mut buf).unwrap(); + let mut decoded_rpc = codec.decode(&mut buf).unwrap().unwrap(); + // mark as validated as its a published message + decoded_rpc.messages[0].validated = true; + + assert_eq!(rpc, decoded_rpc); + } + + QuickCheck::new().quickcheck(prop as fn(_) -> _) + } +} diff --git a/protocols/gossipsub/src/rpc.proto b/protocols/gossipsub/src/rpc.proto index 1aa19430aa2..499b3b43af8 100644 --- a/protocols/gossipsub/src/rpc.proto +++ b/protocols/gossipsub/src/rpc.proto @@ -19,6 +19,8 @@ message Message { optional bytes data = 2; optional bytes seqno = 3; repeated string topic_ids = 4; + optional bytes signature = 5; + optional bytes key = 6; } message ControlMessage { @@ -30,11 +32,11 @@ message ControlMessage { message ControlIHave { optional string topic_id = 1; - repeated string message_ids = 2; + repeated bytes message_ids = 2; } message ControlIWant { - repeated string message_ids= 1; + repeated bytes message_ids= 1; } message ControlGraft { diff --git a/protocols/gossipsub/src/topic.rs b/protocols/gossipsub/src/topic.rs index 09d3f578247..6c1527d8ce5 100644 --- a/protocols/gossipsub/src/topic.rs +++ b/protocols/gossipsub/src/topic.rs @@ -24,7 +24,7 @@ use prost::Message; use sha2::{Digest, Sha256}; use std::fmt; -#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct TopicHash { /// The topic hash. Stored as a string to align with the protobuf API. hash: String, @@ -35,17 +35,13 @@ impl TopicHash { TopicHash { hash: hash.into() } } - pub fn into_string(self) -> String { - self.hash - } - pub fn as_str(&self) -> &str { &self.hash } } /// A gossipsub topic. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct Topic { topic: String, } @@ -80,6 +76,12 @@ impl Topic { } } +impl Into for TopicHash { + fn into(self) -> String { + self.hash + } +} + impl fmt::Display for Topic { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.topic) diff --git a/protocols/gossipsub/tests/smoke.rs b/protocols/gossipsub/tests/smoke.rs index 136481f39dc..25e6d64d5b8 100644 --- a/protocols/gossipsub/tests/smoke.rs +++ b/protocols/gossipsub/tests/smoke.rs @@ -30,15 +30,12 @@ use std::{ }; use libp2p_core::{ - Multiaddr, - Transport, - identity, - multiaddr::Protocol, - muxing::StreamMuxerBox, - transport::MemoryTransport, - upgrade, + identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade, + Multiaddr, Transport, +}; +use libp2p_gossipsub::{ + Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity, Topic, ValidationMode, }; -use libp2p_gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic}; use libp2p_plaintext::PlainText2Config; use libp2p_swarm::Swarm; use libp2p_yamux as yamux; @@ -133,6 +130,25 @@ impl Graph { futures::executor::block_on(fut).unwrap() } + + /// Polls the graph until Poll::Pending is obtained, completing the underlying polls. + fn drain_poll(self) -> Self { + // The future below should return self. Given that it is a FnMut and not a FnOnce, one needs + // to wrap `self` in an Option, leaving a `None` behind after the final `Poll::Ready`. + let mut this = Some(self); + + let fut = futures::future::poll_fn(move |cx| match &mut this { + Some(graph) => loop { + match graph.poll_unpin(cx) { + Poll::Ready(_) => {} + Poll::Pending => return Poll::Ready(this.take().unwrap()), + } + }, + None => panic!("future called after final return"), + }); + let fut = async_std::future::timeout(Duration::from_secs(10), fut); + futures::executor::block_on(fut).unwrap() + } } fn build_node() -> (Multiaddr, Swarm) { @@ -150,7 +166,20 @@ fn build_node() -> (Multiaddr, Swarm) { .boxed(); let peer_id = public_key.clone().into_peer_id(); - let behaviour = Gossipsub::new(peer_id.clone(), GossipsubConfig::default()); + + // NOTE: The graph of created nodes can be disconnected from the mesh point of view as nodes + // can reach their d_lo value and not add other nodes to their mesh. To speed up this test, we + // reduce the default values of the heartbeat, so that all nodes will receive gossip in a + // timely fashion. + + let config = GossipsubConfigBuilder::new() + .heartbeat_initial_delay(Duration::from_millis(100)) + .heartbeat_interval(Duration::from_millis(200)) + .history_length(10) + .history_gossip(10) + .validation_mode(ValidationMode::Permissive) + .build(); + let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id.clone()), config); let mut swarm = Swarm::new(transport, behaviour, peer_id); let port = 1 + random::(); @@ -168,14 +197,14 @@ fn build_node() -> (Multiaddr, Swarm) { fn multi_hop_propagation() { let _ = env_logger::try_init(); - fn prop(num_nodes: usize, seed: u64) -> TestResult { + fn prop(num_nodes: u8, seed: u64) -> TestResult { if num_nodes < 2 || num_nodes > 100 { return TestResult::discard(); } debug!("number nodes: {:?}, seed: {:?}", num_nodes, seed); - let mut graph = Graph::new_connected(num_nodes, seed); + let mut graph = Graph::new_connected(num_nodes as usize, seed); let number_nodes = graph.nodes.len(); // Subscribe each node to the same topic. @@ -197,8 +226,12 @@ fn multi_hop_propagation() { false }); + // It can happen that the publish occurs before all grafts have completed causing this test + // to fail. We drain all the poll messages before publishing. + graph = graph.drain_poll(); + // Publish a single message. - graph.nodes[0].1.publish(&topic, vec![1, 2, 3]); + graph.nodes[0].1.publish(&topic, vec![1, 2, 3]).unwrap(); // Wait for all nodes to receive the published message. let mut received_msgs = 0; @@ -218,5 +251,5 @@ fn multi_hop_propagation() { QuickCheck::new() .max_tests(10) - .quickcheck(prop as fn(usize, u64) -> TestResult) + .quickcheck(prop as fn(u8, u64) -> TestResult) }