Skip to content

Commit

Permalink
Gossipsub v1.1 various improvements (libp2p#49)
Browse files Browse the repository at this point in the history
* remove dbg! calls and add debug logging for peer scoring

* export MessageAcceptance and rename validate_message to report_message_validation_result to also signal that this message should get called in case of invalid messages

* fix double reject_message call

* gossip promises are fulfilled already on receiving the message without validation

* derive debug for MessageAcceptance

* add helper method to get config builder from existing config

* allow adding/changing TopicScoreParams during runtime

* more debug output for messages from self

* fixes incompatibility with anonymous PeerId in lighthouse

* cargo fmt

* more debug output for broken promises
  • Loading branch information
blacktemplar authored Aug 17, 2020
1 parent 83280db commit 5d93c0a
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 61 deletions.
78 changes: 36 additions & 42 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ use crate::handler::{GossipsubHandler, HandlerEvent};
use crate::mcache::MessageCache;
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::rpc_proto;
use crate::time_cache::DuplicateCache;
use crate::topic::{Hasher, Topic, TopicHash};
use crate::types::{
GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction,
MessageId, PeerInfo,
};
use crate::types::{GossipsubRpc, PeerKind};
use crate::{rpc_proto, TopicScoreParams};
use std::cmp::Ordering::Equal;

mod tests;
Expand Down Expand Up @@ -344,6 +344,7 @@ impl BackoffStorage {
}
}

#[derive(Debug)]
pub enum MessageAcceptance {
/// The message is considered valid, and it should be delivered and forwarded to the network
Accept,
Expand Down Expand Up @@ -686,9 +687,10 @@ impl Gossipsub {
}

/// This function should be called when `config.validate_messages()` is `true` after the
/// message got validated by the caller. Messages are stored in the ['Memcache'] and validation
/// is expected to be fast enough that the messages should still exist in the cache.There are
/// three possible validation outcomes and the outcome is given in acceptance.
/// message got validated by the caller. Messages are stored in the
/// ['Memcache'] and validation is expected to be fast enough that the messages should still
/// exist in the cache. There are three possible validation outcomes and the outcome is given
/// in acceptance.
///
/// If acceptance = Accept the message will get propagated to the network. The
/// `propagation_source` parameter indicates who the message was received by and will not
Expand All @@ -704,7 +706,7 @@ impl Gossipsub {
/// in the cache anymore.
///
/// This should only be called once per message.
pub fn validate_message(
pub fn report_message_validation_result(
&mut self,
message_id: &MessageId,
propagation_source: &PeerId,
Expand All @@ -730,14 +732,7 @@ impl Gossipsub {
};

if let Some(message) = self.mcache.remove(message_id) {
//tell peer_score and gossip promises about reject
Self::reject_message(
&mut self.peer_score,
propagation_source,
&message,
message_id,
reject_reason,
);
//tell peer_score about reject
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.reject_message(propagation_source, &message, reject_reason);
}
Expand Down Expand Up @@ -782,6 +777,12 @@ impl Gossipsub {
Ok(())
}

pub fn set_topic_params(&mut self, topic_hash: TopicHash, params: TopicScoreParams) {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.set_topic_params(topic_hash, params);
}
}

/// Sets the application specific score for a peer. Returns true if scoring is active and
/// the peer is connected or if the score of the peer is not yet expired, false otherwise.
pub fn set_application_score(&mut self, peer_id: &PeerId, new_score: f64) -> bool {
Expand Down Expand Up @@ -1315,20 +1316,6 @@ impl Gossipsub {
}
}

/// informs peer score and gossip_promises about a rejected message
fn reject_message(
peer_score: &mut Option<(PeerScore, PeerScoreThresholds, Interval, GossipPromises)>,
from: &PeerId,
msg: &GossipsubMessage,
id: &MessageId,
reason: RejectReason,
) {
if let Some((peer_score, _, _, gossip_promises)) = peer_score {
peer_score.reject_message(from, &msg, reason);
gossip_promises.reject_message(id, &reason);
}
}

/// Handles a newly received GossipsubMessage.
/// Forwards the message to all peers in the mesh.
fn handle_received_message(&mut self, mut msg: GossipsubMessage, propagation_source: &PeerId) {
Expand All @@ -1348,18 +1335,21 @@ impl Gossipsub {

// reject messages claiming to be from ourselves but not locally published
if let Some(own_id) = self.publish_config.get_own_id() {
if own_id != propagation_source && msg.source.as_ref().map_or(false, |s| s == own_id) {
//TODO remove this "hack" as soon as lighthouse uses Anonymous instead of this fixed
// PeerId.
let lighthouse_anonymous_id = PeerId::from_bytes(vec![0, 1, 0]).expect("Valid peer id");
if own_id != &lighthouse_anonymous_id
&& own_id != propagation_source
&& msg.source.as_ref().map_or(false, |s| s == own_id)
{
debug!(
"Dropping message claiming to be from self but forwarded from {:?}",
propagation_source
);
Self::reject_message(
&mut self.peer_score,
propagation_source,
&msg,
&msg_id,
RejectReason::SelfOrigin,
"Dropping message {:?} claiming to be from self but forwarded from {:?}",
msg_id, propagation_source
);
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
peer_score.reject_message(propagation_source, &msg, RejectReason::SelfOrigin);
gossip_promises.reject_message(&msg_id, &RejectReason::SelfOrigin);
}
return;
}
}
Expand All @@ -1374,8 +1364,10 @@ impl Gossipsub {
}

//tells score that message arrived (but is maybe not fully validated yet)
if let Some((peer_score, ..)) = &mut self.peer_score {
//Consider message as delivered for gossip promises
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
peer_score.validate_message(propagation_source, &msg);
gossip_promises.deliver_message(&msg_id);
}

self.mcache.put(msg.clone());
Expand Down Expand Up @@ -1885,6 +1877,7 @@ impl Gossipsub {
self.mcache.shift();

debug!("Completed Heartbeat");
debug!("peer_scores: {:?}", scores);
}

/// Emits gossip - Send IHAVE messages to a random set of gossip peers. This is applied to mesh
Expand Down Expand Up @@ -2032,12 +2025,11 @@ impl Gossipsub {
fn forward_msg(&mut self, message: GossipsubMessage, source: Option<&PeerId>) -> bool {
let msg_id = (self.config.message_id_fn())(&message);

//message is fully validated, inform peer_score and gossip promises
//message is fully validated inform peer_score
if let Some((peer_score, _, _, gossip_promises)) = &mut self.peer_score {
if let Some(peer) = source {
peer_score.deliver_message(peer, &message);
}
gossip_promises.deliver_message(&msg_id);
}

debug!("Forwarding message: {:?}", msg_id);
Expand All @@ -2058,7 +2050,7 @@ impl Gossipsub {
//add explicit peers
for p in &self.explicit_peers {
if let Some(topics) = self.peer_topics.get(p) {
if message.topics.iter().any(|t| topics.contains(t)) {
if Some(p) != source && message.topics.iter().any(|t| topics.contains(t)) {
recipient_peers.insert(p.clone());
}
}
Expand Down Expand Up @@ -2475,10 +2467,12 @@ impl NetworkBehaviour for Gossipsub {
invalid_messages,
} => {
// Handle any invalid messages from this peer
if let Some((peer_score, ..)) = &mut self.peer_score {
if let Some((peer_score, .., gossip_promises)) = &mut self.peer_score {
let mut id_fn = self.config.message_id_fn();
for (_message, validation_error) in invalid_messages {
let reason = RejectReason::ProtocolValidationError(validation_error);
peer_score.reject_message(&propagation_source, &_message, reason);
gossip_promises.reject_message(&id_fn(&_message), &reason);
}
}

Expand Down
16 changes: 8 additions & 8 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3186,7 +3186,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets validated
gs.validate_message(&id(&m1), &peers[0], MessageAcceptance::Accept);
gs.report_message_validation_result(&id(&m1), &peers[0], MessageAcceptance::Accept);

assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);
}
Expand Down Expand Up @@ -3348,7 +3348,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets ignored
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Ignore,
Expand Down Expand Up @@ -3404,7 +3404,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
Expand Down Expand Up @@ -3467,7 +3467,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[1]), 0.0);

//message m1 gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
Expand Down Expand Up @@ -3534,17 +3534,17 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//messages gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
);
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m2),
&peers[0],
MessageAcceptance::Reject,
);
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m3),
&peers[0],
MessageAcceptance::Reject,
Expand Down Expand Up @@ -3604,7 +3604,7 @@ mod tests {
assert_eq!(gs.peer_score.as_ref().unwrap().0.score(&peers[0]), 0.0);

//message m1 gets rejected
gs.validate_message(
gs.report_message_validation_result(
&(config.message_id_fn())(&m1),
&peers[0],
MessageAcceptance::Reject,
Expand Down
6 changes: 6 additions & 0 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,12 @@ impl Default for GossipsubConfigBuilder {
}
}

impl From<GossipsubConfig> for GossipsubConfigBuilder {
fn from(config: GossipsubConfig) -> Self {
GossipsubConfigBuilder { config }
}
}

impl GossipsubConfigBuilder {
// set default values
pub fn new() -> GossipsubConfigBuilder {
Expand Down
7 changes: 6 additions & 1 deletion protocols/gossipsub/src/gossip_promises.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::error::ValidationError;
use crate::peer_score::RejectReason;
use crate::MessageId;
use libp2p_core::PeerId;
use log::debug;
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
Expand Down Expand Up @@ -56,11 +57,15 @@ impl GossipPromises {
pub fn get_broken_promises(&mut self) -> HashMap<PeerId, usize> {
let now = Instant::now();
let mut result = HashMap::new();
self.promises.retain(|_, peers| {
self.promises.retain(|msg, peers| {
peers.retain(|peer_id, expires| {
if *expires < now {
let count = result.entry(peer_id.clone()).or_insert(0);
*count += 1;
debug!(
"The peer {} broke the promise to deliver message {} in time!",
peer_id, msg
);
false
} else {
true
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod rpc_proto {
include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs"));
}

pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAuthenticity};
pub use self::behaviour::{Gossipsub, GossipsubEvent, MessageAcceptance, MessageAuthenticity};
pub use self::config::{GossipsubConfig, GossipsubConfigBuilder, ValidationMode};
pub use self::peer_score::{
score_parameter_decay, score_parameter_decay_with_base, PeerScoreParams, PeerScoreThresholds,
Expand Down
Loading

0 comments on commit 5d93c0a

Please sign in to comment.