Skip to content

Commit

Permalink
Handle forwarding of signed messages and reviewers comments
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed May 25, 2020
1 parent d2babd4 commit fb51926
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 178 deletions.
24 changes: 13 additions & 11 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ impl Gossipsub {
keypair.public().into_peer_id()
};

let keypair = if gs_config.sign_messages {
Some(keypair)
} else {
let keypair = if gs_config.disable_message_signing {
None
} else {
Some(keypair)
};

Gossipsub {
Expand Down Expand Up @@ -235,13 +235,15 @@ impl Gossipsub {
// big-endian uint.
sequence_number: rand::random(),
topics: topic.into_iter().map(|t| self.topic_hash(t)).collect(),
signature: None, // signature will get created when being published
key: None,
};

let msg_id = (self.config.message_id_fn)(&message);
// add published message to our received caches
// Add published message to our received caches
if self.mcache.put(message.clone()).is_some() {
// this message has already been seen. We don't re-publish messages that have already
// been published on the network
// This message has already been seen. We don't re-publish messages that have already
// been published on the network.
warn!(
"Not publishing a message that has already been published. Msg-id {}",
msg_id
Expand All @@ -254,17 +256,17 @@ impl Gossipsub {
(self.config.message_id_fn)(&message)
);

// forward the message to mesh peers
// Forward the message to mesh peers
let message_source = &self.message_source_id.clone();
self.forward_msg(message.clone(), message_source);

let mut recipient_peers = HashSet::new();
for topic_hash in &message.topics {
// if not subscribed to the topic, use fanout peers
// If not subscribed to the topic, use fanout peers
if self.mesh.get(&topic_hash).is_none() {
debug!("Topic: {:?} not in the mesh", topic_hash);
// build a list of peers to forward the message to
// if we have fanout peers add them to the map
// Build a list of peers to forward the message to
// if we have fanout peers add them to the map.
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(peer.clone());
Expand Down Expand Up @@ -1018,9 +1020,9 @@ impl NetworkBehaviour for Gossipsub {
fn new_handler(&mut self) -> Self::ProtocolsHandler {
GossipsubHandler::new(
self.config.protocol_id.clone(),
self.message_source_id.clone(),
self.config.max_transmit_size,
self.keypair.clone(),
self.config.allow_unsigned_messages,
)
}

Expand Down
4 changes: 4 additions & 0 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,8 @@ mod tests {
data: vec![1, 2, 3, 4],
sequence_number: 1u64,
topics: Vec::new(),
signature: None,
key: None,
};
let msg_id = id(&message);
gs.mcache.put(message.clone());
Expand Down Expand Up @@ -635,6 +637,8 @@ mod tests {
data: vec![1, 2, 3, 4],
sequence_number: shift,
topics: Vec::new(),
signature: None,
key: None,
};
let msg_id = id(&message);
gs.mcache.put(message.clone());
Expand Down
55 changes: 24 additions & 31 deletions protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,9 @@ pub struct GossipsubConfig {
/// once validated (default is `false`).
pub manual_propagation: bool,

/// When set to `true` all published messages are signed by the libp2p key (default is `true`).
pub sign_messages: bool,

/// Determines whether unsigned messages will be accepted. If set to false, unsigned messages
/// will be dropped. Default value is `true`.
pub allow_unsigned_messages: bool,
/// Message signing is on by default. When this parameter is set,
/// published messages are not signed by the libp2p key.
pub disable_message_signing: bool,

/// A user-defined function allowing the user to specify the message id of a gossipsub message.
/// The default value is to concatenate the source peer id with a sequence number. Setting this
Expand Down Expand Up @@ -111,8 +108,7 @@ impl Default for GossipsubConfig {
hash_topics: false, // default compatibility with floodsub
no_source_id: false,
manual_propagation: false,
sign_messages: true,
allow_unsigned_messages: true,
disable_message_signing: false,
message_id_fn: |message| {
// default message id is: source + sequence number
let mut source_string = message.source.to_base58();
Expand Down Expand Up @@ -229,37 +225,34 @@ impl GossipsubConfigBuilder {
self
}

/// Flag determining if gossipsub topics are hashed or sent as plain strings (default is false).
pub fn hash_topics(&mut self, value: bool) -> &mut Self {
self.config.hash_topics = value;
self
}

/// When set, all published messages will have a 0 source `PeerId` (default is false).
pub fn no_source_id(&mut self, value: bool) -> &mut Self {
self.config.no_source_id = value;
/// When set, gossipsub topics are hashed instead of being sent as plain strings.
pub fn hash_topics(&mut self) -> &mut Self {
self.config.hash_topics = true;
self
}

/// When set to `true`, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set to
/// true, the user must manually call `propagate_message()` on the behaviour to forward message
/// once validated (default is `false`).
pub fn manual_propagation(&mut self, value: bool) -> &mut Self {
self.config.manual_propagation = value;
/// When set, all published messages will have a 0 source `PeerId`
pub fn no_source_id(&mut self) -> &mut Self {
assert!(
self.config.disable_message_signing,
"Message signing must be disabled in order to mask the source peer id. Cannot sign for the 0 peer_id"
);
self.config.no_source_id = true;
self
}

/// When set to `true` all published messages are signed by the libp2p key (default is `true`).
pub fn sign_messages(&mut self, value: bool) -> &mut Self {
self.config.sign_messages = value;
/// When set, prevents automatic forwarding of all received messages. This setting
/// allows a user to validate the messages before propagating them to their peers. If set,
/// the user must manually call `propagate_message()` on the behaviour to forward a message
/// once validated.
pub fn manual_propagation(&mut self) -> &mut Self {
self.config.manual_propagation = true;
self
}

/// Determines whether unsigned messages will be accepted. If set to false, unsigned messages
/// will be dropped. Default value is `true`.
pub fn allow_unsigned_messages(&mut self, value: bool) -> &mut Self {
self.config.allow_unsigned_messages = value;
/// Disables message signing for all published messages.
pub fn disable_message_signing(&mut self) -> &mut Self {
self.config.disable_message_signing = true;
self
}

Expand Down Expand Up @@ -299,7 +292,7 @@ impl std::fmt::Debug for GossipsubConfig {
let _ = builder.field("hash_topics", &self.hash_topics);
let _ = builder.field("no_source_id", &self.no_source_id);
let _ = builder.field("manual_propagation", &self.manual_propagation);
let _ = builder.field("sign_messages", &self.sign_messages);
let _ = builder.field("disable_message_signing", &self.disable_message_signing);
builder.finish()
}
}
17 changes: 3 additions & 14 deletions protocols/gossipsub/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::identity::Keypair;
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade};
use libp2p_core::PeerId;
use libp2p_swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
Expand Down Expand Up @@ -83,16 +84,16 @@ impl GossipsubHandler {
/// Builds a new `GossipsubHandler`.
pub fn new(
protocol_id: impl Into<Cow<'static, [u8]>>,
local_peer_id: PeerId,
max_transmit_size: usize,
keypair: Option<Keypair>,
allow_unsigned: bool,
) -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::new(
protocol_id,
local_peer_id,
max_transmit_size,
keypair,
allow_unsigned,
)),
inbound_substream: None,
outbound_substream: None,
Expand All @@ -102,18 +103,6 @@ impl GossipsubHandler {
}
}

impl Default for GossipsubHandler {
fn default() -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()),
inbound_substream: None,
outbound_substream: None,
send_queue: SmallVec::new(),
keep_alive: KeepAlive::Yes,
}
}
}

impl ProtocolsHandler for GossipsubHandler {
type InEvent = GossipsubRpc;
type OutEvent = GossipsubRpc;
Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/mcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl MessageCache {

let seen_message = self.msgs.insert(message_id, msg);
if seen_message.is_none() {
// don't add duplicates entries to the cache
// Don't add duplicate entries to the cache
self.history[0].push(cache_entry);
}
seen_message
Expand Down Expand Up @@ -147,6 +147,8 @@ mod tests {
data,
sequence_number,
topics,
signature: None,
key: None,
};
m
}
Expand Down
Loading

0 comments on commit fb51926

Please sign in to comment.