Skip to content

Commit

Permalink
hotfix(gossipsub): use Arc to avoid clonning messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Nov 7, 2023
1 parent 157d712 commit d28ff63
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 13 deletions.
14 changes: 8 additions & 6 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::{
collections::{BTreeSet, HashMap},
fmt,
net::IpAddr,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
Expand Down Expand Up @@ -2920,11 +2921,7 @@ where
return Ok(vec![rpc]);
}

let new_rpc = proto::RPC {
subscriptions: Vec::new(),
publish: Vec::new(),
control: None,
};
let new_rpc = proto::RPCInner::default();

let mut rpc_list = vec![new_rpc.clone()];

Expand Down Expand Up @@ -3031,7 +3028,12 @@ where
}
}

Ok(rpc_list)
Ok(rpc_list
.into_iter()
.map(|rpc| proto::RPC {
inner: Arc::new(rpc),
})
.collect())
}

fn on_connection_established(
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ where
}

// Converts a protobuf message into a gossipsub message for reading the Gossipsub event queue.
fn proto_to_message(rpc: &proto::RPC) -> Rpc {
fn proto_to_message(rpc: &proto::RPCInner) -> Rpc {
// Store valid messages.
let mut messages = Vec::with_capacity(rpc.publish.len());
let rpc = rpc.clone();
Expand Down
39 changes: 36 additions & 3 deletions protocols/gossipsub/src/generated/gossipsub/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,52 @@
#![cfg_attr(rustfmt, rustfmt_skip)]


use std::ops::Deref;
use std::sync::Arc;

use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::super::*;

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
#[derive(Debug, Default, PartialEq)]
pub struct RPC {
pub(crate) inner: Arc<RPCInner>,
}

impl Clone for RPC {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
}
}

impl Deref for RPC {
type Target=RPCInner;

fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}

impl MessageWrite for RPC {
fn write_message<W: WriterBackend>(&self, w: &mut Writer<W>) -> Result<()> {
self.inner.write_message(w)
}

fn get_size(&self) -> usize {
self.inner.get_size()
}
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Debug, Default, PartialEq, Clone)]
pub struct RPCInner {
pub subscriptions: Vec<gossipsub::pb::mod_RPC::SubOpts>,
pub publish: Vec<gossipsub::pb::Message>,
pub control: Option<gossipsub::pb::ControlMessage>,
}

impl<'a> MessageRead<'a> for RPC {
impl<'a> MessageRead<'a> for RPCInner {
fn from_reader(r: &mut BytesReader, bytes: &'a [u8]) -> Result<Self> {
let mut msg = Self::default();
while !r.is_eof() {
Expand All @@ -37,7 +70,7 @@ impl<'a> MessageRead<'a> for RPC {
}
}

impl MessageWrite for RPC {
impl MessageWrite for RPCInner {
fn get_size(&self) -> usize {
0
+ self.subscriptions.iter().map(|s| 1 + sizeof_len((s).get_size())).sum::<usize>()
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub struct GossipsubCodec {
/// Determines the level of validation performed on incoming messages.
validation_mode: ValidationMode,
/// The codec to handle common encoding/decoding of protobuf messages
codec: quick_protobuf_codec::Codec<proto::RPC>,
codec: quick_protobuf_codec::Codec<proto::RPC, proto::RPCInner>,
}

impl GossipsubCodec {
Expand Down
7 changes: 5 additions & 2 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
use quick_protobuf::MessageWrite;
use std::fmt;
use std::fmt::Debug;
use std::{fmt, sync::Arc};

use crate::rpc_proto::proto;
#[cfg(feature = "serde")]
Expand Down Expand Up @@ -341,14 +341,17 @@ impl From<Rpc> for proto::RPC {
}
}

proto::RPC {
let inner = proto::RPCInner {
subscriptions,
publish,
control: if empty_control_msg {
None
} else {
Some(control)
},
};
Self {
inner: Arc::new(inner),
}
}
}
Expand Down

0 comments on commit d28ff63

Please sign in to comment.