Skip to content

Commit

Permalink
protocols/gossipsub: Add Gossipsub v1.1 support
Browse files Browse the repository at this point in the history
This commit upgrades the current gossipsub implementation to support the [v1.1
spec](https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.1.md).

It adds a number of features, bug fixes and performance improvements. 

Besides support for all new 1.1 features, other improvements that are of particular note: 

- Improved duplicate LRU-time cache (this was previously a severe bottleneck for
  large message throughput topics)
- Extended message validation configuration options
- Arbitrary topics (users can now implement their own hashing schemes)
- Improved message validation handling - Invalid messages are no longer dropped
  but sent to the behaviour for application-level processing (including scoring)
- Support for floodsub, gossipsub v1 and gossipsub v2
- Protobuf encoding has been shifted into the behaviour. This has permitted two
  improvements:
     1. Message size verification during publishing (report to the user if the
        message is too large before attempting to send).
     2. Message fragmentation. If an RPC is too large it is fragmented into its
        sub components and sent in smaller chunks.

Additional Notes

The peer eXchange protocol defined in the v1.1 spec is inactive in its current
form. The current implementation permits sending `PeerId` in `PRUNE` messages,
however a `PeerId` is not sufficient to form a new connection to a peer. A
`Signed Address Record` is required to safely transmit peer identity
information. Once these are confirmed (libp2p/specs#217)
a future PR will implement these and make PX usable.

Co-authored-by: Max Inden <mail@max-inden.de>
Co-authored-by: Rüdiger Klaehn <rklaehn@protonmail.com>
Co-authored-by: blacktemplar <blacktemplar@a1.net>
Co-authored-by: Rüdiger Klaehn <rklaehn@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Roman S. Borschel <roman@parity.io>
Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
Co-authored-by: David Craven <david@craven.ch>
  • Loading branch information
9 people authored Jan 7, 2021
1 parent d918e9a commit df7e73e
Show file tree
Hide file tree
Showing 26 changed files with 12,064 additions and 1,378 deletions.
53 changes: 36 additions & 17 deletions examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! cargo run --example gossipsub-chat
//! ```
//!
//! It will print the PeerId and the listening address, e.g. `Listening on
//! It will print the [`PeerId`] and the listening address, e.g. `Listening on
//! "/ip4/0.0.0.0/tcp/24915"`
//!
//! In the second terminal window, start a new instance of the example with:
Expand All @@ -49,8 +49,10 @@
use async_std::{io, task};
use env_logger::{Builder, Env};
use futures::prelude::*;
use libp2p::gossipsub::protocol::MessageId;
use libp2p::gossipsub::{GossipsubEvent, GossipsubMessage, MessageAuthenticity, Topic};
use libp2p::gossipsub::MessageId;
use libp2p::gossipsub::{
GossipsubEvent, GossipsubMessage, IdentTopic as Topic, MessageAuthenticity, ValidationMode,
};
use libp2p::{gossipsub, identity, PeerId};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
Expand All @@ -72,30 +74,43 @@ fn main() -> Result<(), Box<dyn Error>> {
let transport = libp2p::build_development_transport(local_key.clone())?;

// Create a Gossipsub topic
let topic = Topic::new("test-net".into());
let topic = Topic::new("test-net");

// Create a Swarm to manage peers and events
let mut swarm = {
// to set default parameters for gossipsub use:
// let gossipsub_config = gossipsub::GossipsubConfig::default();

// To content-address message, we can take the hash of message and use it as an ID.
let message_id_fn = |message: &GossipsubMessage| {
let mut s = DefaultHasher::new();
message.data.hash(&mut s);
MessageId::from(s.finish().to_string())
};

// set custom gossipsub
let gossipsub_config = gossipsub::GossipsubConfigBuilder::new()
.heartbeat_interval(Duration::from_secs(10))
// Set a custom gossipsub
let gossipsub_config = gossipsub::GossipsubConfigBuilder::default()
.heartbeat_interval(Duration::from_secs(10)) // This is set to aid debugging by not cluttering the log space
.validation_mode(ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the
//same content will be propagated.
.build();
// same content will be propagated.
.build()
.expect("Valid config");
// build a gossipsub network behaviour
let mut gossipsub =
gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config);
gossipsub.subscribe(topic.clone());
let mut gossipsub: gossipsub::Gossipsub =
gossipsub::Gossipsub::new(MessageAuthenticity::Signed(local_key), gossipsub_config)
.expect("Correct configuration");

// subscribes to our topic
gossipsub.subscribe(&topic).unwrap();

// add an explicit peer if one was provided
if let Some(explicit) = std::env::args().nth(2) {
let explicit = explicit.clone();
match explicit.parse() {
Ok(id) => gossipsub.add_explicit_peer(&id),
Err(err) => println!("Failed to parse explicit peer id: {:?}", err),
}
}

// build the swarm
libp2p::Swarm::new(transport, gossipsub, local_peer_id)
};

Expand All @@ -122,7 +137,7 @@ fn main() -> Result<(), Box<dyn Error>> {
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()),
Poll::Ready(Some(line)) => swarm.publish(topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
} {
Expand All @@ -133,7 +148,11 @@ fn main() -> Result<(), Box<dyn Error>> {
loop {
match swarm.poll_next_unpin(cx) {
Poll::Ready(Some(gossip_event)) => match gossip_event {
GossipsubEvent::Message(peer_id, id, message) => println!(
GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
} => println!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
Expand Down
40 changes: 26 additions & 14 deletions examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
use async_std::{io, task};
use futures::{future, prelude::*};
use libp2p::{
core::{either::EitherTransport, transport, transport::upgrade::Version, muxing::StreamMuxerBox},
core::{
either::EitherTransport, muxing::StreamMuxerBox, transport, transport::upgrade::Version,
},
gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, MessageAuthenticity},
identify::{Identify, IdentifyEvent},
identity,
multiaddr::Protocol,
noise,
ping::{self, Ping, PingConfig, PingEvent},
pnet::{PnetConfig, PreSharedKey},
noise,
swarm::NetworkBehaviourEventProcess,
tcp::TcpConfig,
yamux::YamuxConfig,
Expand All @@ -61,9 +63,10 @@ use std::{
pub fn build_transport(
key_pair: identity::Keypair,
psk: Option<PreSharedKey>,
) -> transport::Boxed<(PeerId, StreamMuxerBox)>
{
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&key_pair).unwrap();
) -> transport::Boxed<(PeerId, StreamMuxerBox)> {
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&key_pair)
.unwrap();
let noise_config = noise::NoiseConfig::xx(noise_keys).into_authenticated();
let yamux_config = YamuxConfig::default();

Expand Down Expand Up @@ -157,7 +160,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let transport = build_transport(local_key.clone(), psk);

// Create a Gosspipsub topic
let gossipsub_topic = gossipsub::Topic::new("chat".into());
let gossipsub_topic = gossipsub::IdentTopic::new("chat");

// We create a custom network behaviour that combines gossipsub, ping and identify.
#[derive(NetworkBehaviour)]
Expand All @@ -178,7 +181,11 @@ fn main() -> Result<(), Box<dyn Error>> {
// Called when `gossipsub` produces an event.
fn inject_event(&mut self, event: GossipsubEvent) {
match event {
GossipsubEvent::Message(peer_id, id, message) => println!(
GossipsubEvent::Message {
propagation_source: peer_id,
message_id: id,
message,
} => println!(
"Got message: {} with id: {} from peer: {:?}",
String::from_utf8_lossy(&message.data),
id,
Expand Down Expand Up @@ -228,11 +235,16 @@ fn main() -> Result<(), Box<dyn Error>> {

// Create a Swarm to manage peers and events
let mut swarm = {
let gossipsub_config = GossipsubConfigBuilder::new()
let gossipsub_config = GossipsubConfigBuilder::default()
.max_transmit_size(262144)
.build();
.build()
.expect("valid config");
let mut behaviour = MyBehaviour {
gossipsub: Gossipsub::new(MessageAuthenticity::Signed(local_key.clone()), gossipsub_config),
gossipsub: Gossipsub::new(
MessageAuthenticity::Signed(local_key.clone()),
gossipsub_config,
)
.expect("Valid configuration"),
identify: Identify::new(
"/ipfs/0.1.0".into(),
"rust-ipfs-example".into(),
Expand All @@ -242,7 +254,7 @@ fn main() -> Result<(), Box<dyn Error>> {
};

println!("Subscribing to {:?}", gossipsub_topic);
behaviour.gossipsub.subscribe(gossipsub_topic.clone());
behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap();
Swarm::new(transport, behaviour, local_peer_id.clone())
};

Expand All @@ -264,9 +276,9 @@ fn main() -> Result<(), Box<dyn Error>> {
task::block_on(future::poll_fn(move |cx: &mut Context<'_>| {
loop {
if let Err(e) = match stdin.try_poll_next_unpin(cx)? {
Poll::Ready(Some(line)) => {
swarm.gossipsub.publish(&gossipsub_topic, line.as_bytes())
}
Poll::Ready(Some(line)) => swarm
.gossipsub
.publish(gossipsub_topic.clone(), line.as_bytes()),
Poll::Ready(None) => panic!("Stdin closed"),
Poll::Pending => break,
} {
Expand Down
25 changes: 14 additions & 11 deletions protocols/gossipsub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,32 @@ categories = ["network-programming", "asynchronous"]
[dependencies]
libp2p-swarm = { version = "0.26.0", path = "../../swarm" }
libp2p-core = { version = "0.26.0", path = "../../core" }
bytes = "0.5.4"
byteorder = "1.3.2"
fnv = "1.0.6"
futures = "0.3.1"
bytes = "0.5.6"
byteorder = "1.3.4"
fnv = "1.0.7"
futures = "0.3.5"
rand = "0.7.3"
futures_codec = "0.4.0"
futures_codec = "0.4.1"
wasm-timer = "0.2.4"
unsigned-varint = { version = "0.5", features = ["futures-codec"] }
log = "0.4.8"
unsigned-varint = { version = "0.5.0", features = ["futures-codec"] }
log = "0.4.11"
sha2 = "0.9.1"
base64 = "0.13.0"
smallvec = "1.1.0"
smallvec = "1.4.2"
prost = "0.6.1"
hex_fmt = "0.3.0"
lru_time_cache = "0.11.0"
regex = "1.4.0"

[dev-dependencies]
async-std = "1.6.2"
async-std = "1.6.3"
env_logger = "0.8.1"
libp2p-plaintext = { path = "../plaintext" }
libp2p-yamux = { path = "../../muxers/yamux" }
libp2p-mplex = { path = "../../muxers/mplex" }
libp2p-noise = { path = "../../protocols/noise" }
quickcheck = "0.9.2"
hex = "0.4.2"
derive_builder = "0.9.0"

[build-dependencies]
prost-build = "0.6"
prost-build = "0.6.1"
2 changes: 1 addition & 1 deletion protocols/gossipsub/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
// DEALINGS IN THE SOFTWARE.

fn main() {
prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap();
prost_build::compile_protos(&["src/rpc.proto", "src/compat.proto"], &["src"]).unwrap();
}
Loading

0 comments on commit df7e73e

Please sign in to comment.