diff --git a/engine/src/p2p/core/socket.rs b/engine/src/p2p/core/socket.rs index 90b344c8d6..16c21e40a9 100644 --- a/engine/src/p2p/core/socket.rs +++ b/engine/src/p2p/core/socket.rs @@ -26,6 +26,9 @@ const CONNECTION_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30); /// socket pub const DO_NOT_LINGER: i32 = 0; +/// How many messages to keep in a "resend" buffer per peer +const OUTGOING_MESSAGES_BUFFER_SIZE: i32 = 10; + /// Socket to be used for connecting to peer on the network pub struct OutgoingSocket { socket: zmq::Socket, @@ -38,6 +41,17 @@ impl OutgoingSocket { // Discard any pending messages when disconnecting a socket socket.set_linger(DO_NOT_LINGER).unwrap(); + // Do not buffer outgoing messages before a connection is + // established (this means the messages will be lost, but + // this prevents us from buffering messages to peers that + // are misconfigured, for example). + socket.set_immediate(true).unwrap(); + + // Buffer at most OUTGOING_MESSAGES_BUFFER_SIZE messages + // per peer (this minimises how much memory we might "leak" + // if they never come online again). + socket.set_sndhwm(OUTGOING_MESSAGES_BUFFER_SIZE).unwrap(); + socket.set_ipv6(true).unwrap(); socket.set_reconnect_ivl(RECONNECT_INTERVAL.as_millis() as i32).unwrap(); socket.set_reconnect_ivl_max(RECONNECT_INTERVAL_MAX.as_millis() as i32).unwrap(); diff --git a/engine/src/p2p/core/tests.rs b/engine/src/p2p/core/tests.rs index 756ec560f2..540662d622 100644 --- a/engine/src/p2p/core/tests.rs +++ b/engine/src/p2p/core/tests.rs @@ -130,12 +130,6 @@ async fn connect_two_nodes() { } async fn send_and_receive_message(from: &Node, to: &mut Node) -> Option<(AccountId, Vec)> { - println!( - "[{:?}] Sending from {:?} to {:?}", - std::time::Instant::now(), - from.account_id, - to.account_id, - ); from.msg_sender .send(OutgoingMultisigStageMessages::Private(vec![( to.account_id.clone(), @@ -159,6 +153,11 @@ async fn can_connect_after_pubkey_change() { let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]); let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]); + // Since we no longer buffer messages until nodes connect, we + // need to explicitly wait for them to connect (this might take a + // while since one of them is likely to fail on the first try) + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + // Check that node 2 can communicate with node 1: send_and_receive_message(&node2, &mut node1).await.unwrap(); send_and_receive_message(&node1, &mut node2).await.unwrap(); @@ -174,6 +173,10 @@ async fn can_connect_after_pubkey_change() { // Node 1 learn about Node 2's new key: node1.peer_update_sender.send(PeerUpdate::Registered(pi2.clone())).unwrap(); + // Wait for Node 1 to connect (this shouldn't take long since + // Node 2 is already up and we should succeed on first try) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Node 2 should be able to send messages again: send_and_receive_message(&node2b, &mut node1).await.unwrap(); send_and_receive_message(&node1, &mut node2b).await.unwrap();