Skip to content

Commit

Permalink
remove support for multi-topic messages (#73)
Browse files Browse the repository at this point in the history
* remove support for multi-topic messages

* make message topic required
  • Loading branch information
blacktemplar authored Oct 20, 2020
1 parent c2c4a26 commit 201d533
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 248 deletions.
2 changes: 1 addition & 1 deletion protocols/gossipsub/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
// DEALINGS IN THE SOFTWARE.

fn main() {
prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap();
prost_build::compile_protos(&["src/rpc.proto", "src/compat.proto"], &["src"]).unwrap();
}
85 changes: 33 additions & 52 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
collections::HashSet,
collections::VecDeque,
collections::{BTreeSet, HashMap},
fmt, iter,
fmt,
iter::FromIterator,
net::IpAddr,
sync::Arc,
Expand Down Expand Up @@ -496,23 +496,13 @@ where
Ok(true)
}

/// Publishes a message to the network.
/// Publishes a message with multiple topics to the network.
pub fn publish<H: Hasher>(
&mut self,
topic: Topic<H>,
data: impl Into<T>,
) -> Result<MessageId, PublishError> {
self.publish_many(iter::once(topic), data)
}

/// Publishes a message with multiple topics to the network.
pub fn publish_many<H: Hasher>(
&mut self,
topics: impl IntoIterator<Item = Topic<H>>,
data: impl Into<T>,
) -> Result<MessageId, PublishError> {
let message =
self.build_message(topics.into_iter().map(|t| t.hash()).collect(), data.into())?;
let message = self.build_message(topic.into(), data.into())?;
let msg_id = self.config.message_id(&message);

let event = Arc::new(
Expand Down Expand Up @@ -562,21 +552,19 @@ where
!self.config.flood_publish() && self.forward_msg(message.clone(), None)?;

let mut recipient_peers = HashSet::new();
for topic_hash in &message.topics {
if let Some(set) = self.topic_peers.get(&topic_hash) {
if self.config.flood_publish() {
// Forward to all peers above score and all explicit peers
recipient_peers.extend(
set.iter()
.filter(|p| {
self.explicit_peers.contains(*p)
|| !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
})
.map(|p| p.clone()),
);
continue;
}

let topic_hash = &message.topic;
if let Some(set) = self.topic_peers.get(&topic_hash) {
if self.config.flood_publish() {
// Forward to all peers above score and all explicit peers
recipient_peers.extend(
set.iter()
.filter(|p| {
self.explicit_peers.contains(*p)
|| !self.score_below_threshold(*p, |ts| ts.publish_threshold).0
})
.map(|p| p.clone()),
);
} else {
// Explicit peers
for peer in &self.explicit_peers {
if set.contains(peer) {
Expand Down Expand Up @@ -1520,7 +1508,7 @@ where
self.mcache.put(msg.clone());

// Dispatch the message to the user if we are subscribed to any of the topics
if self.mesh.keys().any(|t| msg.topics.iter().any(|u| t == u)) {
if self.mesh.contains_key(&msg.topic) {
debug!("Sending received message to user");
self.events.push_back(NetworkBehaviourAction::GenerateEvent(
GenericGossipsubEvent::Message {
Expand All @@ -1531,8 +1519,8 @@ where
));
} else {
debug!(
"Received message on a topic we are not subscribed to. Topics {:?}",
msg.topics.iter().collect::<Vec<_>>()
"Received message on a topic we are not subscribed to: {:?}",
msg.topic
);
return;
}
Expand Down Expand Up @@ -2265,15 +2253,12 @@ where
let mut recipient_peers = HashSet::new();

// add mesh peers
for topic in &message.topics {
// mesh
if let Some(mesh_peers) = self.mesh.get(&topic) {
for peer_id in mesh_peers {
if Some(peer_id) != propagation_source
&& Some(peer_id) != message.source.as_ref()
{
recipient_peers.insert(peer_id.clone());
}
let topic = &message.topic;
// mesh
if let Some(mesh_peers) = self.mesh.get(&topic) {
for peer_id in mesh_peers {
if Some(peer_id) != propagation_source && Some(peer_id) != message.source.as_ref() {
recipient_peers.insert(peer_id.clone());
}
}
}
Expand All @@ -2283,7 +2268,7 @@ where
if let Some(topics) = self.peer_topics.get(p) {
if Some(p) != propagation_source
&& Some(p) != message.source.as_ref()
&& message.topics.iter().any(|t| topics.contains(t))
&& topics.contains(&message.topic)
{
recipient_peers.insert(p.clone());
}
Expand Down Expand Up @@ -2315,7 +2300,7 @@ where
/// Constructs a `GenericGossipsubMessage` performing message signing if required.
pub(crate) fn build_message(
&self,
topics: Vec<TopicHash>,
topic: TopicHash,
data: T,
) -> Result<GenericGossipsubMessage<T>, SigningError> {
match &self.publish_config {
Expand All @@ -2332,11 +2317,7 @@ where
from: Some(author.clone().into_bytes()),
data: Some(data.clone().into()),
seqno: Some(sequence_number.to_be_bytes().to_vec()),
topic_ids: topics
.clone()
.into_iter()
.map(TopicHash::into_string)
.collect(),
topic: topic.clone().into_string(),
signature: None,
key: None,
};
Expand All @@ -2358,7 +2339,7 @@ where
// To be interoperable with the go-implementation this is treated as a 64-bit
// big-endian uint.
sequence_number: Some(sequence_number),
topics,
topic,
signature,
key: inline_key.clone(),
validated: true, // all published messages are valid
Expand All @@ -2371,7 +2352,7 @@ where
// To be interoperable with the go-implementation this is treated as a 64-bit
// big-endian uint.
sequence_number: Some(rand::random()),
topics,
topic,
signature: None,
key: None,
validated: true, // all published messages are valid
Expand All @@ -2384,7 +2365,7 @@ where
// To be interoperable with the go-implementation this is treated as a 64-bit
// big-endian uint.
sequence_number: Some(rand::random()),
topics,
topic,
signature: None,
key: None,
validated: true, // all published messages are valid
Expand All @@ -2397,7 +2378,7 @@ where
// To be interoperable with the go-implementation this is treated as a 64-bit
// big-endian uint.
sequence_number: None,
topics,
topic,
signature: None,
key: None,
validated: true, // all published messages are valid
Expand Down Expand Up @@ -3135,7 +3116,7 @@ mod local_test {
source: Some(PeerId::random()),
data: vec![0; 100],
sequence_number: None,
topics: vec![],
topic: TopicHash::from_raw("test_topic"),
signature: None,
key: None,
validated: false,
Expand Down
47 changes: 18 additions & 29 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,7 @@ mod tests {
source: message.from.map(|x| PeerId::from_bytes(x).unwrap()),
data: message.data.unwrap_or_default(),
sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application
topics: message
.topic_ids
.into_iter()
.map(TopicHash::from_raw)
.collect(),
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
key: None,
validated: false,
Expand Down Expand Up @@ -938,7 +934,7 @@ mod tests {
source: Some(peers[11].clone()),
data: vec![1, 2, 3, 4],
sequence_number: Some(1u64),
topics: Vec::new(),
topic: TopicHash::from_raw("topic"),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -987,7 +983,7 @@ mod tests {
source: Some(peers[11].clone()),
data: vec![1, 2, 3, 4],
sequence_number: Some(shift),
topics: Vec::new(),
topic: TopicHash::from_raw("topic"),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -1465,7 +1461,7 @@ mod tests {
source: Some(peers[1].clone()),
data: vec![12],
sequence_number: Some(0),
topics: vec![topic_hashes[0].clone()],
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -1623,7 +1619,7 @@ mod tests {
source: Some(peers[1].clone()),
data: vec![],
sequence_number: Some(0),
topics: vec![topic_hashes[0].clone()],
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -1980,16 +1976,9 @@ mod tests {
.to_subscribe(true)
.create_network();

let other_topic = Topic::new("test2");

// subscribe an additional new peer to test2
gs.subscribe(&other_topic).unwrap();
add_peer(&mut gs, &vec![other_topic.hash()], false, false);

//publish message
let publish_data = vec![0; 42];
gs.publish_many(vec![Topic::new(topic), other_topic.clone()], publish_data)
.unwrap();
gs.publish(Topic::new(topic), publish_data).unwrap();

// Collect all publish messages
let publishes = gs
Expand All @@ -2013,7 +2002,7 @@ mod tests {
let config = GossipsubConfig::default();
assert_eq!(
publishes.len(),
config.mesh_n_high() + 10 + 1,
config.mesh_n_high() + 10,
"Should send a publish message to all known peers"
);

Expand All @@ -2040,7 +2029,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topic_hashes[0].clone()],
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -2082,7 +2071,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topic_hashes[0].clone()],
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -2450,7 +2439,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topics[0].clone()],
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -2521,7 +2510,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topics[0].clone()],
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -2598,7 +2587,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![],
sequence_number: Some(0),
topics: vec![topics[0].clone()],
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -2771,7 +2760,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4],
sequence_number: Some(1u64),
topics: topics.clone(),
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand All @@ -2781,7 +2770,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5],
sequence_number: Some(2u64),
topics: topics.clone(),
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand All @@ -2791,7 +2780,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6],
sequence_number: Some(3u64),
topics: topics.clone(),
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand All @@ -2801,7 +2790,7 @@ mod tests {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6, 7],
sequence_number: Some(4u64),
topics: topics.clone(),
topic: topics[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -3071,7 +3060,7 @@ mod tests {
.map(|_| rng.gen())
.collect(),
sequence_number: Some(*seq),
topics: topics.clone(),
topic: topics[rng.gen_range(0, topics.len())].clone(),
signature: None,
key: None,
validated: true,
Expand Down Expand Up @@ -4969,7 +4958,7 @@ mod tests {
source: None,
data: counters_address.to_be_bytes().to_vec(),
sequence_number: None,
topics: vec![topic_hashes[0].clone()],
topic: topic_hashes[0].clone(),
signature: None,
key: None,
validated: true,
Expand Down
12 changes: 12 additions & 0 deletions protocols/gossipsub/src/compat.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto2";

package compat.pb;

message Message {
optional bytes from = 1;
optional bytes data = 2;
optional bytes seqno = 3;
repeated string topic_ids = 4;
optional bytes signature = 5;
optional bytes key = 6;
}
4 changes: 1 addition & 3 deletions protocols/gossipsub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ mod types;
#[macro_use]
extern crate derive_builder;

mod rpc_proto {
include!(concat!(env!("OUT_DIR"), "/gossipsub.pb.rs"));
}
mod rpc_proto;

pub use self::behaviour::{
GenericGossipsub, GenericGossipsubEvent, Gossipsub, GossipsubEvent, MessageAuthenticity,
Expand Down
Loading

0 comments on commit 201d533

Please sign in to comment.