Skip to content

Commit

Permalink
feat: p2p stale connections (#4189)
Browse files Browse the repository at this point in the history
* feat: p2p stale connections

* chore: fix clippy

* chore: address review

* feat: increase send buffer to 100

* chore: remove stale (and broken) test
  • Loading branch information
msgmaxim authored Nov 3, 2023
1 parent 9cd112f commit e10d14a
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 50 deletions.
99 changes: 75 additions & 24 deletions engine/src/p2p/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod socket;
mod tests;

use std::{
cell::Cell,
collections::{BTreeMap, HashMap},
net::Ipv6Addr,
sync::Arc,
Expand All @@ -17,6 +18,7 @@ use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use utilities::{
make_periodic_tick,
metrics::{
P2P_ACTIVE_CONNECTIONS, P2P_BAD_MSG, P2P_MSG_RECEIVED, P2P_MSG_SENT, P2P_RECONNECT_PEERS,
},
Expand All @@ -36,6 +38,12 @@ use super::{EdPublicKey, P2PKey, XPublicKey};
/// this somewhat short to mitigate some attacks where clients
/// can use system resources without authenticating.
const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(3);
/// How long to wait until some activity on a socket (defined by a need to
/// send a message) before deeming the connection "stale" (the state in which
/// we drop the socket and are not actively trying to reconnect)
pub const MAX_INACTIVITY_THRESHOLD: Duration = Duration::from_secs(60 * 60);
/// How often to check for "stale" connections
pub const ACTIVITY_CHECK_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Clone)]
pub struct X25519KeyPair {
Expand Down Expand Up @@ -175,10 +183,17 @@ enum ConnectionState {
// want ZMQ's default behavior yet), but we have arranged
// for a ZMQ socket to be created again in the future.
ReconnectionScheduled,
// There hasn't been recent interaction with the node, so we
// don't maintain an active connection with it. We will connect
// to it lazily if needed.
Stale,
}

struct ConnectionStateInfo {
state: ConnectionState,
// Last time we received an instruction to send a message
// to this node
last_activity: Cell<tokio::time::Instant>,
info: PeerInfo,
}

Expand Down Expand Up @@ -252,10 +267,6 @@ pub(super) async fn start(

zmq_context.set_max_sockets(65536).expect("should update socket limit");

// TODO: consider keeping track of "last activity" on any outgoing
// socket connection and disconnecting inactive peers (see proxy_expire_idle_peers
// in OxenMQ)

let authenticator = auth::start_authentication_thread(zmq_context.clone());

let (reconnect_sender, reconnect_receiver) = tokio::sync::mpsc::unbounded_channel();
Expand Down Expand Up @@ -307,6 +318,8 @@ impl P2PContext {
mut monitor_event_receiver: UnboundedReceiver<MonitorEvent>,
mut reconnect_receiver: UnboundedReceiver<AccountId>,
) {
let mut check_activity_interval = make_periodic_tick(ACTIVITY_CHECK_INTERVAL, false);

loop {
tokio::select! {
Some(messages) = outgoing_message_receiver.recv() => {
Expand All @@ -326,11 +339,14 @@ impl P2PContext {
Some(account_id) = reconnect_receiver.recv() => {
self.reconnect_to_peer(&account_id);
}
_ = check_activity_interval.tick() => {
self.check_activity();
}
}
}
}

fn send_messages(&self, messages: OutgoingMultisigStageMessages) {
fn send_messages(&mut self, messages: OutgoingMultisigStageMessages) {
match messages {
OutgoingMultisigStageMessages::Broadcast(account_ids, payload) => {
trace!("Broadcasting a message to all {} peers", account_ids.len());
Expand All @@ -347,8 +363,10 @@ impl P2PContext {
}
}

fn send_message(&self, account_id: AccountId, payload: Vec<u8>) {
fn send_message(&mut self, account_id: AccountId, payload: Vec<u8>) {
if let Some(peer) = self.active_connections.get(&account_id) {
peer.last_activity.set(tokio::time::Instant::now());

match &peer.state {
ConnectionState::Connected(socket) => {
socket.send(payload);
Expand All @@ -360,6 +378,16 @@ impl P2PContext {
"Failed to send message. Peer is scheduled for reconnection: {account_id}"
);
},
ConnectionState::Stale => {
// Connect and try again (there is no infinite loop here
// since the state will be `Connected` after this)

// This is guaranteed by construction of `active_connections`:
assert_eq!(peer.info.account_id, account_id);

self.connect_to_peer(peer.info.clone());
self.send_message(account_id, payload);
},
}
} else {
warn!("Failed to send message. Peer not registered: {account_id}")
Expand Down Expand Up @@ -400,7 +428,6 @@ impl P2PContext {
// already connected to our listening ZMQ socket, we can only
// prevent future connections from being established and rely
// on peer from disconnecting from "client side".
// TODO: ensure that stale/inactive connections are terminated

if account_id == self.our_account_id {
warn!("Received peer info deregistration of our own node!");
Expand All @@ -415,6 +442,9 @@ impl P2PContext {
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&account_id);
},
ConnectionState::Stale => {
// Nothing to do
},
}

self.clean_up_for_peer_pubkey(&peer.info.pubkey);
Expand Down Expand Up @@ -448,7 +478,7 @@ impl P2PContext {
if let Some(peer) = self.active_connections.remove(account_id) {
match peer.state {
ConnectionState::ReconnectionScheduled => {
info!("Reconnecting to peer: {}", account_id);
info!("Reconnecting to peer: {account_id}");
self.connect_to_peer(peer.info.clone());
},
ConnectionState::Connected(_) => {
Expand All @@ -464,6 +494,12 @@ impl P2PContext {
account_id
);
},
ConnectionState::Stale => {
debug!(
"Reconnection attempt to {} cancelled: connection is stale.",
account_id
);
},
}
} else {
debug!("Will not reconnect to now deregistered peer: {}", account_id);
Expand All @@ -479,22 +515,21 @@ impl P2PContext {

let connected_socket = socket.connect(peer.clone());

if self
.active_connections
.insert(
account_id.clone(),
ConnectionStateInfo {
state: ConnectionState::Connected(connected_socket),
info: peer,
},
)
.is_some()
{
// This should not happen because we always remove existing connection/socket
// prior to connecting, but even if it does, it should be OK to replace the
// connection (this doesn't break any invariants and the new peer info is
// likely to be more up-to-date).
error!("Unexpected existing connection while connecting to {account_id}");
if let Some(connection) = self.active_connections.insert(
account_id.clone(),
ConnectionStateInfo {
state: ConnectionState::Connected(connected_socket),
info: peer,
last_activity: Cell::new(tokio::time::Instant::now()),
},
) {
if !matches!(connection.state, ConnectionState::Stale) {
// This should not happen for non-stale sockets because we always remove
// existing connection/socket prior to connecting, but even if it does,
// it should be OK to replace the connection (this doesn't break any
// invariants and the new peer info is likely to be more up-to-date).
error!("Unexpected existing connection while connecting to {account_id}");
}
}
}

Expand All @@ -518,6 +553,9 @@ impl P2PContext {
ConnectionState::ReconnectionScheduled => {
self.reconnect_context.reset(&peer.account_id);
},
ConnectionState::Stale => {
// nothing to do
},
}
// Remove any state from previous peer info in case of update:
self.clean_up_for_peer_pubkey(&existing_peer_state.info.pubkey);
Expand Down Expand Up @@ -588,6 +626,19 @@ impl P2PContext {

incoming_message_receiver
}

fn check_activity(&mut self) {
for (account_id, state) in &mut self.active_connections.map {
if !matches!(state.state, ConnectionState::Stale) &&
state.last_activity.get().elapsed() > MAX_INACTIVITY_THRESHOLD
{
debug!("Peer connection is deemed stale due to inactivity: {}", account_id);
self.reconnect_context.reset(account_id);
// ZMQ socket is dropped here
state.state = ConnectionState::Stale;
}
}
}
}

/// Unlike recv_multipart available on zmq::Socket, this collects
Expand Down
8 changes: 1 addition & 7 deletions engine/src/p2p/core/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const CONNECTION_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(30);
pub const DO_NOT_LINGER: i32 = 0;

/// How many messages to keep in a "resend" buffer per peer
const OUTGOING_MESSAGES_BUFFER_SIZE: i32 = 10;
const OUTGOING_MESSAGES_BUFFER_SIZE: i32 = 100;

/// Socket to be used for connecting to peer on the network
pub struct OutgoingSocket {
Expand All @@ -41,12 +41,6 @@ impl OutgoingSocket {
// Discard any pending messages when disconnecting a socket
socket.set_linger(DO_NOT_LINGER).unwrap();

// Do not buffer outgoing messages before a connection is
// established (this means the messages will be lost, but
// this prevents us from buffering messages to peers that
// are misconfigured, for example).
socket.set_immediate(true).unwrap();

// Buffer at most OUTGOING_MESSAGES_BUFFER_SIZE messages
// per peer (this minimises how much memory we might "leak"
// if they never come online again).
Expand Down
38 changes: 19 additions & 19 deletions engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::{PeerInfo, PeerUpdate};
use crate::p2p::{OutgoingMultisigStageMessages, P2PKey};
use crate::p2p::{
core::{ACTIVITY_CHECK_INTERVAL, MAX_INACTIVITY_THRESHOLD},
OutgoingMultisigStageMessages, P2PKey,
};
use sp_core::ed25519::Public;
use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
Expand Down Expand Up @@ -196,30 +199,27 @@ async fn can_connect_after_pubkey_change() {
send_and_receive_message(&node1, &mut node2b).await.unwrap();
}

/// Test the behaviour around receiving own registration: at first, if our node
/// is not registered, we delay connecting to other nodes; once we receive our
/// own registration, we connect to other registered nodes.
#[tokio::test]
async fn connects_after_registration() {
#[tokio::test(start_paused = true)]
async fn stale_connections() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8092);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8093);
let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8094);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8095);

// Node 1 doesn't get its own peer info at first and will wait for registration
let node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi2.clone()]);
let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// For sanity, check that node 1 can't yet communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_none());
// Sleep long enough for nodes to deem connections "stale" (due to inactivity)
tokio::time::sleep(
ACTIVITY_CHECK_INTERVAL + MAX_INACTIVITY_THRESHOLD + std::time::Duration::from_secs(1),
)
.await;

// Update node 1 with its own peer info
node1.peer_update_sender.send(PeerUpdate::Registered(pi1.clone())).unwrap();
// Resuming is necessary for timeouts to work correctly
tokio::time::resume();

// Allow some time for the above command to propagate through the channel
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// It should now be able to communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_some());
// Ensure that we can re-activate stale connections when needed
send_and_receive_message(&node1, &mut node2).await.unwrap();
send_and_receive_message(&node2, &mut node1).await.unwrap();
}

0 comments on commit e10d14a

Please sign in to comment.