Skip to content

Commit

Permalink
Fix: Limit ZMQ Buffer Size for Outgoing Messages (#4051)
Browse files Browse the repository at this point in the history
* fix: limit zmq message buffer size

* chore: fix test by waiting for connection
  • Loading branch information
msgmaxim committed Sep 28, 2023
1 parent c1b7f15 commit 406f481
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
14 changes: 14 additions & 0 deletions engine/src/p2p/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const CONNECTION_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
/// socket
pub const DO_NOT_LINGER: i32 = 0;

/// How many messages to keep in a "resend" buffer per peer
const OUTGOING_MESSAGES_BUFFER_SIZE: i32 = 10;

/// Socket to be used for connecting to peer on the network
pub struct OutgoingSocket {
socket: zmq::Socket,
Expand All @@ -38,6 +41,17 @@ impl OutgoingSocket {
// Discard any pending messages when disconnecting a socket
socket.set_linger(DO_NOT_LINGER).unwrap();

// Do not buffer outgoing messages before a connection is
// established (this means the messages will be lost, but
// this prevents us from buffering messages to peers that
// are misconfigured, for example).
socket.set_immediate(true).unwrap();

// Buffer at most OUTGOING_MESSAGES_BUFFER_SIZE messages
// per peer (this minimises how much memory we might "leak"
// if they never come online again).
socket.set_sndhwm(OUTGOING_MESSAGES_BUFFER_SIZE).unwrap();

socket.set_ipv6(true).unwrap();
socket.set_reconnect_ivl(RECONNECT_INTERVAL.as_millis() as i32).unwrap();
socket.set_reconnect_ivl_max(RECONNECT_INTERVAL_MAX.as_millis() as i32).unwrap();
Expand Down
15 changes: 9 additions & 6 deletions engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ async fn connect_two_nodes() {
}

async fn send_and_receive_message(from: &Node, to: &mut Node) -> Option<(AccountId, Vec<u8>)> {
println!(
"[{:?}] Sending from {:?} to {:?}",
std::time::Instant::now(),
from.account_id,
to.account_id,
);
from.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
to.account_id.clone(),
Expand All @@ -159,6 +153,11 @@ async fn can_connect_after_pubkey_change() {
let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Since we no longer buffer messages until nodes connect, we
// need to explicitly wait for them to connect (this might take a
// while since one of them is likely to fail on the first try)
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

// Check that node 2 can communicate with node 1:
send_and_receive_message(&node2, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2).await.unwrap();
Expand All @@ -174,6 +173,10 @@ async fn can_connect_after_pubkey_change() {
// Node 1 learn about Node 2's new key:
node1.peer_update_sender.send(PeerUpdate::Registered(pi2.clone())).unwrap();

// Wait for Node 1 to connect (this shouldn't take long since
// Node 2 is already up and we should succeed on first try)
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Node 2 should be able to send messages again:
send_and_receive_message(&node2b, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2b).await.unwrap();
Expand Down

0 comments on commit 406f481

Please sign in to comment.