Skip to content

Commit

Permalink
Merge pull request #1186 from mintlayer/use_bounded_channels_for_peer…
Browse files Browse the repository at this point in the history
…_events

Use bounded channels for peer events
  • Loading branch information
ImplOfAnImpl authored Sep 20, 2023
2 parents 694aa1c + b2e9122 commit b7f0b1d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 68 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ testing_logger = "0.1"
thiserror = "1.0"
tokio = { version = "1.27", default-features = false }
tokio-socks = "0.5"
tokio-stream = "0.1"
tokio-util = { version = "0.7", default-features = false }
toml = "0.8"
tower = "0.4"
Expand Down
3 changes: 3 additions & 0 deletions chainstate/tx-verifier/src/transaction_verifier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ where
accounting: A,
verifier_config: TransactionVerifierConfig,
) -> Self {
// TODO: both "expect"s in this function may fire when exiting the node-gui app;
// get rid of them and return a proper Result.
// See https://github.com/mintlayer/mintlayer-core/issues/1221
let best_block = storage
.get_best_block_for_utxos()
.expect("Database error while reading utxos best block");
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ sscanf.workspace = true
tap.workspace = true
thiserror.workspace = true
tokio = { workspace = true, default-features = false, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "sync", "time"] }
tokio-stream.workspace = true
tokio-socks.workspace = true
tokio-util = { workspace = true, default-features = false, features = ["codec"] }

Expand Down
46 changes: 26 additions & 20 deletions p2p/src/net/default_backend/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

use std::{collections::HashMap, sync::Arc};

use futures::{future::BoxFuture, never::Never, stream::FuturesUnordered, FutureExt, StreamExt};
use futures::{future::BoxFuture, never::Never, stream::FuturesUnordered, FutureExt};
use p2p_types::socket_address::SocketAddress;
use tokio::{
sync::{mpsc, oneshot},
time::timeout,
};
use tokio_stream::{wrappers::ReceiverStream, StreamExt, StreamMap};

use common::{
chain::ChainConfig,
Expand Down Expand Up @@ -57,10 +58,19 @@ use super::{
types::{HandshakeNonce, Message, P2pTimestamp},
};

/// Buffer size of the channel to the SyncManager peer task.
/// How many unprocessed messages can be sent before the peer's event loop is blocked.
// TODO: Decide what the optimal value is (for example, by comparing the initial block download time)
const SYNC_CHAN_BUF_SIZE: usize = 20;
/// Buffer sizes for the channels used by Peer to send peer messages to other parts of p2p.
///
/// If the number of unprocessed messages exceeds this limit, the peer's event loop will be
/// blocked; this is needed to prevent DoS attacks where a peer would overload the node with
/// requests, which may lead to memory exhaustion.
/// Note: the values were chosen pretty much arbitrarily; the sync messages channel has a lower
/// limit because it's used to send blocks, so its messages can be up to 1Mb in size; peer events,
/// on the other hand, are small.
/// Also note that basic tests of initial block download time showed that there is no real
/// difference between 20 and 10000 for both of the limits here. These results, of course, depend
/// on the hardware and internet connection, so we've chosen larger limits.
const SYNC_MSG_CHAN_BUF_SIZE: usize = 100;
const PEER_EVENT_CHAN_BUF_SIZE: usize = 1000;

/// Active peer data
struct PeerContext {
Expand Down Expand Up @@ -123,12 +133,8 @@ pub struct Backend<T: TransportSocket> {
/// Pending connections
pending: HashMap<PeerId, PendingPeerContext>,

/// Channel sender for sending events from Peers to Backend; this will be passed to each
/// Peer upon its creation.
peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>,

/// Channel receiver for receiving events from Peers
peer_event_rx: mpsc::UnboundedReceiver<(PeerId, PeerEvent)>,
/// Map of streams for receiving events from peers.
peer_event_stream_map: StreamMap<PeerId, ReceiverStream<PeerEvent>>,

/// Channel sender for sending connectivity events to the frontend
conn_event_tx: mpsc::UnboundedSender<ConnectivityEvent>,
Expand Down Expand Up @@ -171,7 +177,6 @@ where
subscribers_receiver: mpsc::UnboundedReceiver<P2pEventHandler>,
node_protocol_version: ProtocolVersion,
) -> Self {
let (peer_event_tx, peer_event_rx) = mpsc::unbounded_channel();
Self {
transport,
socket,
Expand All @@ -183,8 +188,7 @@ where
syncing_event_tx,
peers: HashMap::new(),
pending: HashMap::new(),
peer_event_tx,
peer_event_rx,
peer_event_stream_map: StreamMap::new(),
command_queue: FuturesUnordered::new(),
shutdown,
shutdown_receiver,
Expand Down Expand Up @@ -234,7 +238,7 @@ where
.get_mut(&peer_id)
.ok_or(P2pError::PeerError(PeerError::PeerDoesntExist))?;

let (sync_msg_tx, sync_msg_rx) = mpsc::channel(SYNC_CHAN_BUF_SIZE);
let (sync_msg_tx, sync_msg_rx) = mpsc::channel(SYNC_MSG_CHAN_BUF_SIZE);
peer.backend_event_tx.send(BackendEvent::Accepted { sync_msg_tx })?;

let old_value = peer.was_accepted.test_and_set();
Expand Down Expand Up @@ -292,13 +296,12 @@ where
self.handle_command(command.ok_or(P2pError::ChannelClosed)?);
},
// Process pending commands
callback = self.command_queue.select_next_some(), if !self.command_queue.is_empty() => {
Some(callback) = self.command_queue.next() => {
callback(&mut self)?;
},
// Handle peer events.
event = self.peer_event_rx.recv() => {
let (peer, event) = event.ok_or(P2pError::ChannelClosed)?;
self.handle_peer_event(peer, event)?;
Some((peer_id, event)) = self.peer_event_stream_map.next() => {
self.handle_peer_event(peer_id, event)?;
},
// Accept a new peer connection.
res = self.socket.accept() => {
Expand Down Expand Up @@ -350,7 +353,10 @@ where
Some(address.as_peer_address())
};

let peer_event_tx = self.peer_event_tx.clone();
let (peer_event_tx, peer_event_rx) = mpsc::channel(PEER_EVENT_CHAN_BUF_SIZE);
let peer_event_stream = ReceiverStream::new(peer_event_rx);

self.peer_event_stream_map.insert(remote_peer_id, peer_event_stream);

let peer = peer::Peer::<T>::new(
remote_peer_id,
Expand Down
102 changes: 54 additions & 48 deletions p2p/src/net/default_backend/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
use std::{sync::Arc, time::Duration};

use p2p_types::services::Services;
use tokio::{
sync::mpsc::{self, Sender},
time::timeout,
};
use tokio::{sync::mpsc, time::timeout};

use common::chain::ChainConfig;
use logging::log;
Expand Down Expand Up @@ -85,7 +82,7 @@ pub struct Peer<T: TransportSocket> {
receiver_address: Option<PeerAddress>,

/// Channel sender for sending events to Backend
peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>,
peer_event_tx: mpsc::Sender<PeerEvent>,

/// Channel receiver for receiving events from Backend.
backend_event_rx: mpsc::UnboundedReceiver<BackendEvent>,
Expand All @@ -108,7 +105,7 @@ where
p2p_config: Arc<P2pConfig>,
socket: T::Stream,
receiver_address: Option<PeerAddress>,
peer_event_tx: mpsc::UnboundedSender<(PeerId, PeerEvent)>,
peer_event_tx: mpsc::Sender<PeerEvent>,
backend_event_rx: mpsc::UnboundedReceiver<BackendEvent>,
node_protocol_version: ProtocolVersion,
) -> Self {
Expand Down Expand Up @@ -178,18 +175,17 @@ where
// Send PeerInfoReceived before sending handshake to remote peer!
// Backend is expected to receive PeerInfoReceived before outgoing connection has chance to complete handshake,
// It's required to reliably detect self-connects.
self.peer_event_tx.send((
self.peer_id,
PeerEvent::PeerInfoReceived {
self.peer_event_tx
.send(PeerEvent::PeerInfoReceived {
protocol_version: common_protocol_version,
network,
common_services,
user_agent,
software_version,
receiver_address,
handshake_nonce,
},
))?;
})
.await?;

self.socket
.send(Message::Handshake(HandshakeMessage::HelloAck {
Expand Down Expand Up @@ -247,60 +243,60 @@ where
let common_protocol_version =
choose_common_protocol_version(protocol_version, self.node_protocol_version)?;

self.peer_event_tx.send((
self.peer_id,
PeerEvent::PeerInfoReceived {
self.peer_event_tx
.send(PeerEvent::PeerInfoReceived {
protocol_version: common_protocol_version,
network,
common_services,
user_agent,
software_version,
receiver_address,
handshake_nonce,
},
))?;
})
.await?;
}
}

Ok(())
}

// Note: the channels used by this function to propagate messages to other parts of p2p
// must be bounded; this is important to prevent DoS attacks.
async fn handle_socket_msg(
peer: PeerId,
peer_id: PeerId,
msg: Message,
peer_event_tx: &mut mpsc::UnboundedSender<(PeerId, PeerEvent)>,
sync_msg_tx: &mut Sender<SyncMessage>,
peer_event_tx: &mut mpsc::Sender<PeerEvent>,
sync_msg_tx: &mut mpsc::Sender<SyncMessage>,
) -> crate::Result<()> {
// TODO: Use a bounded channel to send messages to the peer manager
match msg.categorize() {
CategorizedMessage::Handshake(_) => {
// TODO: this must be reported to the peer manager, so that it can adjust
// the peer's ban score. (We may add a separate PeerEvent for this and Backend
// can then use the now unused ConnectivityEvent::Misbehaved to forward the error
// to the peer manager.)
log::error!("Peer {peer} sent unexpected handshake message");
log::error!("Peer {peer_id} sent unexpected handshake message");
}
CategorizedMessage::PeerManagerMessage(msg) => {
peer_event_tx.send((peer, PeerEvent::MessageReceived { message: msg }))?
peer_event_tx.send(PeerEvent::MessageReceived { message: msg }).await?
}
CategorizedMessage::SyncMessage(msg) => sync_msg_tx.send(msg).await?,
}

Ok(())
}

pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> {
async fn run_impl(&mut self, local_time: P2pTimestamp) -> crate::Result<()> {
// handshake with remote peer and send peer's info to backend
let handshake_res = timeout(PEER_HANDSHAKE_TIMEOUT, self.handshake(local_time)).await;
match handshake_res {
Ok(Ok(())) => {}
Ok(Err(err)) => {
log::debug!("handshake failed for peer {}: {err}", self.peer_id);

let send_result = self.peer_event_tx.send((
self.peer_id,
PeerEvent::HandshakeFailed { error: err.clone() },
));
let send_result = self
.peer_event_tx
.send(PeerEvent::HandshakeFailed { error: err.clone() })
.await;
if let Err(send_error) = send_result {
log::error!(
"Cannot send PeerEvent::HandshakeFailed to peer {}: {}",
Expand Down Expand Up @@ -348,11 +344,22 @@ where
}
}
}
}

impl<T: TransportSocket> Drop for Peer<T> {
fn drop(&mut self) {
let _ = self.peer_event_tx.send((self.peer_id, PeerEvent::ConnectionClosed));
pub async fn run(mut self, local_time: P2pTimestamp) -> crate::Result<()> {
let run_result = self.run_impl(local_time).await;
let send_result = self.peer_event_tx.send(PeerEvent::ConnectionClosed).await;

if let Err(send_error) = send_result {
// Note: this situation is likely to happen if the connection is already closed,
// so it's not really an error.
log::debug!(
"Unable to send PeerEvent::ConnectionClosed to Backend for peer {}: {}",
self.peer_id,
send_error
);
}

run_result
}
}

Expand All @@ -375,6 +382,8 @@ mod tests {
use chainstate::Locator;
use futures::FutureExt;

const TEST_CHAN_BUF_SIZE: usize = 100;

async fn handshake_inbound<A, T>()
where
A: TestTransportMaker<Transport = T>,
Expand All @@ -383,7 +392,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx1, mut rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id2 = PeerId::new();

Expand Down Expand Up @@ -422,7 +431,7 @@ mod tests {

let _peer = handle.await.unwrap();
assert_eq!(
rx1.try_recv().unwrap().1,
rx1.try_recv().unwrap(),
PeerEvent::PeerInfoReceived {
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
Expand Down Expand Up @@ -458,7 +467,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, mut rx1) = mpsc::unbounded_channel();
let (tx1, mut rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id3 = PeerId::new();

Expand Down Expand Up @@ -500,18 +509,15 @@ mod tests {
let _peer = handle.await.unwrap();
assert_eq!(
rx1.try_recv(),
Ok((
peer_id3,
PeerEvent::PeerInfoReceived {
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
common_services: [Service::Blocks, Service::Transactions].as_slice().into(),
user_agent: p2p_config.user_agent.clone(),
software_version: *chain_config.software_version(),
receiver_address: None,
handshake_nonce: 1,
}
))
Ok(PeerEvent::PeerInfoReceived {
protocol_version: TEST_PROTOCOL_VERSION,
network: *chain_config.magic_bytes(),
common_services: [Service::Blocks, Service::Transactions].as_slice().into(),
user_agent: p2p_config.user_agent.clone(),
software_version: *chain_config.software_version(),
receiver_address: None,
handshake_nonce: 1,
})
);
}

Expand All @@ -538,7 +544,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, _rx1) = mpsc::unbounded_channel();
let (tx1, _rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id3 = PeerId::new();

Expand Down Expand Up @@ -599,7 +605,7 @@ mod tests {
let (socket1, socket2) = get_two_connected_sockets::<A, T>().await;
let chain_config = Arc::new(common::chain::config::create_mainnet());
let p2p_config = Arc::new(test_p2p_config());
let (tx1, _rx1) = mpsc::unbounded_channel();
let (tx1, _rx1) = mpsc::channel(TEST_CHAN_BUF_SIZE);
let (_tx2, rx2) = mpsc::unbounded_channel();
let peer_id2 = PeerId::new();

Expand Down

0 comments on commit b7f0b1d

Please sign in to comment.