diff --git a/Cargo.lock b/Cargo.lock index b10c4d55d1c8..ea4ffec0dbcc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3067,6 +3067,7 @@ dependencies = [ "reth-chainspec", "reth-discv4", "reth-network", + "reth-network-api", "reth-primitives", "reth-tracing", "secp256k1", @@ -9560,9 +9561,9 @@ dependencies = [ [[package]] name = "revm-inspectors" -version = "0.12.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41bbeb6004cc4ed48d27756f0479011df91a6f5642a3abab9309eda5ce67c4ad" +checksum = "0b7f5f8a2deafb3c76f357bbf9e71b73bddb915c4994bbbe3208fbfbe8fc7f8e" dependencies = [ "alloy-primitives", "alloy-rpc-types-eth", diff --git a/crates/e2e-test-utils/Cargo.toml b/crates/e2e-test-utils/Cargo.toml index bedacbecd759..7cb8516816b8 100644 --- a/crates/e2e-test-utils/Cargo.toml +++ b/crates/e2e-test-utils/Cargo.toml @@ -14,6 +14,7 @@ workspace = true reth-chainspec.workspace = true reth-tracing.workspace = true reth-db = { workspace = true, features = ["test-utils"] } +reth-network-api.workspace = true reth-rpc-layer.workspace = true reth-rpc-server-types.workspace = true reth-rpc-eth-api.workspace = true @@ -23,7 +24,6 @@ reth-payload-builder-primitives.workspace = true reth-payload-primitives.workspace = true reth-primitives.workspace = true reth-provider.workspace = true -reth-network-api.workspace = true reth-network.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 2efc8d47f2d7..ce9d0b94612b 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,6 +1,7 @@ use futures_util::StreamExt; use reth_network_api::{ - test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, PeersInfo, + events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, + PeersInfo, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_tokio_util::EventStream; @@ -28,7 +29,7 @@ where self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr()); match self.network_events.next().await { - Some(NetworkEvent::PeerAdded(_)) => (), + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (), ev => panic!("Expected a peer added event, got: {ev:?}"), } } @@ -42,7 +43,9 @@ where pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.network_events.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; info!("Session established with peer: {:?}", peer_id); return Some(peer_id) } diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 624c43f5e1ba..e17cedef11fc 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -1,7 +1,5 @@ //! API related to listening for network events. -use std::{fmt, net::SocketAddr, sync::Arc}; - use reth_eth_wire_types::{ message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, @@ -13,26 +11,70 @@ use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_network_types::PeerAddr; use reth_tokio_util::EventStream; +use std::{ + fmt, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt}; -/// Provides event subscription for the network. -#[auto_impl::auto_impl(&, Arc)] -pub trait NetworkEventListenerProvider: Send + Sync { - /// Creates a new [`NetworkEvent`] listener channel. - fn event_listener(&self) -> EventStream; - /// Returns a new [`DiscoveryEvent`] stream. - /// - /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. - fn discovery_listener(&self) -> UnboundedReceiverStream; +/// A boxed stream of network peer events that provides a type-erased interface. +pub struct PeerEventStream(Pin + Send + Sync>>); + +impl fmt::Debug for PeerEventStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeerEventStream").finish_non_exhaustive() + } +} + +impl PeerEventStream { + /// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer + /// events [`PeerEvent`] + pub fn new(stream: S) -> Self + where + S: Stream + Send + Sync + 'static, + T: Into + 'static, + { + let mapped_stream = stream.map(Into::into); + Self(Box::pin(mapped_stream)) + } } -/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers. +impl Stream for PeerEventStream { + type Item = PeerEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_next(cx) + } +} + +/// Represents information about an established peer session. +#[derive(Debug, Clone)] +pub struct SessionInfo { + /// The identifier of the peer to which a session was established. + pub peer_id: PeerId, + /// The remote addr of the peer to which a session was established. + pub remote_addr: SocketAddr, + /// The client version of the peer to which a session was established. + pub client_version: Arc, + /// Capabilities the peer announced. + pub capabilities: Arc, + /// The status of the peer to which a session was established. + pub status: Arc, + /// Negotiated eth version of the session. + pub version: EthVersion, +} + +/// (Non-exhaustive) List of the different events emitted by the network that are of interest for +/// subscribers. /// /// This includes any event types that may be relevant to tasks, for metrics, keep track of peers /// etc. -#[derive(Debug)] -pub enum NetworkEvent { +#[derive(Debug, Clone)] +pub enum PeerEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -41,57 +83,65 @@ pub enum NetworkEvent { reason: Option, }, /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// The remote addr of the peer to which a session was established. - remote_addr: SocketAddr, - /// The client version of the peer to which a session was established. - client_version: Arc, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Arc, - /// negotiated eth version of the session - version: EthVersion, - }, + SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), /// Event emitted when a new peer is removed PeerRemoved(PeerId), } +/// (Non-exhaustive) Network events representing peer lifecycle events and session requests. +#[derive(Debug)] +pub enum NetworkEvent { + /// Basic peer lifecycle event. + Peer(PeerEvent), + /// Session established with requests. + ActivePeerSession { + /// Session information + info: SessionInfo, + /// A request channel to the session task. + messages: PeerRequestSender, + }, +} + impl Clone for NetworkEvent { fn clone(&self) -> Self { match self { - Self::SessionClosed { peer_id, reason } => { - Self::SessionClosed { peer_id: *peer_id, reason: *reason } + Self::Peer(event) => Self::Peer(event.clone()), + Self::ActivePeerSession { info, messages } => { + Self::ActivePeerSession { info: info.clone(), messages: messages.clone() } } - Self::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => Self::SessionEstablished { - peer_id: *peer_id, - remote_addr: *remote_addr, - client_version: client_version.clone(), - capabilities: capabilities.clone(), - messages: messages.clone(), - status: status.clone(), - version: *version, - }, - Self::PeerAdded(peer) => Self::PeerAdded(*peer), - Self::PeerRemoved(peer) => Self::PeerRemoved(*peer), } } } +impl From> for PeerEvent { + fn from(event: NetworkEvent) -> Self { + match event { + NetworkEvent::Peer(peer_event) => peer_event, + NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info), + } + } +} + +/// Provides peer event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkPeersEvents: Send + Sync { + /// Creates a new peer event listener stream. + fn peer_events(&self) -> PeerEventStream; +} + +/// Provides event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkEventListenerProvider: NetworkPeersEvents { + /// Creates a new [`NetworkEvent`] listener channel. + fn event_listener(&self) -> EventStream>; + /// Returns a new [`DiscoveryEvent`] stream. + /// + /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. + fn discovery_listener(&self) -> UnboundedReceiverStream; +} + /// Events produced by the `Discovery` manager. #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveryEvent { diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index bad6ecba5fad..89e21b9dd2db 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -44,7 +44,9 @@ use reth_eth_wire::{ use reth_fs_util::{self as fs, FsPathError}; use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_network_api::{ - test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, + events::{PeerEvent, SessionInfo}, + test_utils::PeersHandle, + EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::ReputationChangeKind; @@ -712,24 +714,26 @@ impl NetworkManager { self.update_active_connection_metrics(); - self.event_sender.notify(NetworkEvent::SessionEstablished { + let session_info = SessionInfo { peer_id, remote_addr, client_version, capabilities, - version, status, - messages, - }); + version, + }; + + self.event_sender + .notify(NetworkEvent::ActivePeerSession { info: session_info, messages }); } SwarmEvent::PeerAdded(peer_id) => { trace!(target: "net", ?peer_id, "Peer added"); - self.event_sender.notify(NetworkEvent::PeerAdded(peer_id)); + self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))); self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::PeerRemoved(peer_id) => { trace!(target: "net", ?peer_id, "Peer dropped"); - self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id)); + self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))); self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { @@ -772,7 +776,8 @@ impl NetworkManager { .saturating_sub(1) as f64, ); - self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason }); + self.event_sender + .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason })); } SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { trace!( diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 7e0b000cf343..225b6332e0eb 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -4,6 +4,7 @@ use crate::{ }; use alloy_primitives::B256; use enr::Enr; +use futures::StreamExt; use parking_lot::Mutex; use reth_discv4::{Discv4, NatResolver}; use reth_discv5::Discv5; @@ -13,6 +14,7 @@ use reth_eth_wire::{ }; use reth_ethereum_forks::Head; use reth_network_api::{ + events::{NetworkPeersEvents, PeerEvent, PeerEventStream}, test_utils::{PeersHandle, PeersHandleProvider}, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers, @@ -192,6 +194,17 @@ impl NetworkHandle { // === API Implementations === +impl NetworkPeersEvents for NetworkHandle { + /// Returns an event stream of peer-specific network events. + fn peer_events(&self) -> PeerEventStream { + let peer_events = self.inner.event_sender.new_listener().map(|event| match event { + NetworkEvent::Peer(peer_event) => peer_event, + NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info), + }); + PeerEventStream::new(peer_events) + } +} + impl NetworkEventListenerProvider for NetworkHandle { fn event_listener(&self) -> EventStream>> { self.inner.event_sender.new_listener() diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 08bf24b88532..a27df7e7202a 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -13,6 +13,7 @@ use pin_project::pin_project; use reth_chainspec::{Hardforks, MAINNET}; use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols}; use reth_network_api::{ + events::{PeerEvent, SessionInfo}, test_utils::{PeersHandle, PeersHandleProvider}, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers, }; @@ -641,7 +642,9 @@ impl NetworkEventStream { pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option)> { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)), + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => { + return Some((peer_id, reason)) + } _ => continue, } } @@ -652,7 +655,10 @@ impl NetworkEventStream { pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id), + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + return Some(info.peer_id) + } _ => continue, } } @@ -667,7 +673,7 @@ impl NetworkEventStream { let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } => { peers.push(peer_id); num -= 1; if num == 0 { @@ -680,18 +686,24 @@ impl NetworkEventStream { peers } - /// Ensures that the first two events are a [`NetworkEvent::PeerAdded`] and - /// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established + /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and + /// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established /// session. pub async fn peer_added_and_established(&mut self) -> Option { let peer_id = match self.inner.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => peer_id, + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id, _ => return None, }; match self.inner.next().await { - Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => { - debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"); + Some(NetworkEvent::ActivePeerSession { + info: SessionInfo { peer_id: peer_id2, .. }, + .. + }) => { + debug_assert_eq!( + peer_id, peer_id2, + "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}" + ); Some(peer_id) } _ => None, diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index a1097dacf550..2e6e2f08b65c 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -40,6 +40,7 @@ use reth_eth_wire::{ }; use reth_metrics::common::mpsc::UnboundedMeteredReceiver; use reth_network_api::{ + events::{PeerEvent, SessionInfo}, NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers, }; use reth_network_p2p::{ @@ -1050,55 +1051,81 @@ where } } + /// Handles session establishment and peer transactions initialization. + fn handle_peer_session( + &mut self, + info: SessionInfo, + messages: PeerRequestSender>, + ) { + let SessionInfo { peer_id, client_version, version, .. } = info; + + // Insert a new peer into the peerset. + let peer = PeerMetadata::::new( + messages, + version, + client_version, + self.config.max_transactions_seen_by_peer_history, + ); + let peer = match self.peers.entry(peer_id) { + Entry::Occupied(mut entry) => { + entry.insert(peer); + entry.into_mut() + } + Entry::Vacant(entry) => entry.insert(peer), + }; + + // Send a `NewPooledTransactionHashes` to the peer with up to + // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` + // transactions in the pool. + if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { + trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled"); + return + } + + // Get transactions to broadcast + let pooled_txs = self.pool.pooled_transactions_max( + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, + ); + if pooled_txs.is_empty() { + trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast"); + return; + } + + // Build and send transaction hashes message + let mut msg_builder = PooledTransactionsHashesBuilder::new(version); + for pooled_tx in pooled_txs { + peer.seen_transactions.insert(*pooled_tx.hash()); + msg_builder.push_pooled(pooled_tx); + } + + debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes"); + let msg = msg_builder.build(); + self.network.send_transactions_hashes(peer_id, msg); + } + /// Handles a received event related to common network events. fn on_network_event(&mut self, event_result: NetworkEvent>) { match event_result { - NetworkEvent::SessionClosed { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::SessionEstablished { - peer_id, client_version, messages, version, .. - } => { - // Insert a new peer into the peerset. - let peer = PeerMetadata::new( - messages, - version, - client_version, - self.config.max_transactions_seen_by_peer_history, - ); - let peer = match self.peers.entry(peer_id) { - Entry::Occupied(mut entry) => { - entry.insert(peer); - entry.into_mut() + NetworkEvent::ActivePeerSession { info, messages } => { + // process active peer session and broadcast available transaction from the pool + self.handle_peer_session(info, messages); + } + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; + // get messages from existing peer + let messages = match self.peers.get(&peer_id) { + Some(p) => p.request_tx.clone(), + None => { + debug!(target: "net::tx", ?peer_id, "No peer request sender found"); + return; } - Entry::Vacant(entry) => entry.insert(peer), }; - - // Send a `NewPooledTransactionHashes` to the peer with up to - // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` - // transactions in the pool. - if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { - return - } - - let pooled_txs = self.pool.pooled_transactions_max( - SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, - ); - if pooled_txs.is_empty() { - // do not send a message if there are no transactions in the pool - return - } - - let mut msg_builder = PooledTransactionsHashesBuilder::new(version); - for pooled_tx in pooled_txs { - peer.seen_transactions.insert(*pooled_tx.hash()); - msg_builder.push_pooled(pooled_tx); - } - - let msg = msg_builder.build(); - self.network.send_transactions_hashes(peer_id, msg); + self.handle_peer_session(info, messages); } _ => {} } @@ -1987,27 +2014,12 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions + .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2073,28 +2085,13 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions.on_network_event(ev); } - NetworkEvent::PeerAdded(_peer_id) => continue, - ev => { + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, + _ => { error!("unexpected event {ev:?}") } } @@ -2157,27 +2154,12 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions.on_network_event(ev); } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2248,24 +2230,11 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }), - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { + transactions.on_network_event(ev); + } + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2495,17 +2464,18 @@ mod tests { network.handle().update_sync_state(SyncState::Idle); // mock a peer - let (tx, _rx) = mpsc::channel(1); - tx_manager.on_network_event(NetworkEvent::SessionEstablished { + let (tx, _rx) = mpsc::channel::(1); + let session_info = SessionInfo { peer_id, remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), client_version: Arc::from(""), capabilities: Arc::new(vec![].into()), - messages: PeerRequestSender::new(peer_id, tx), status: Arc::new(Default::default()), version: EthVersion::Eth68, - }); - + }; + let messages: PeerRequestSender = PeerRequestSender::new(peer_id, tx); + tx_manager + .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages }); let mut propagate = vec![]; let mut factory = MockTransactionFactory::default(); let eip1559_tx = Arc::new(factory.create_eip1559()); diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 0a17cbd563ed..77044f4b72d2 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -15,7 +15,10 @@ use reth_network::{ BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider, NetworkManager, PeersConfig, }; -use reth_network_api::{NetworkInfo, Peers, PeersInfo}; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + NetworkInfo, Peers, PeersInfo, +}; use reth_network_p2p::{ headers::client::{HeadersClient, HeadersRequest}, sync::{NetworkSyncUpdater, SyncState}, @@ -59,13 +62,15 @@ async fn test_establish_connections() { let mut established = listener0.take(4); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionClosed { .. } | NetworkEvent::PeerRemoved(_) => { + NetworkEvent::Peer(PeerEvent::SessionClosed { .. } | PeerEvent::PeerRemoved(_)) => { panic!("unexpected event") } - NetworkEvent::SessionEstablished { peer_id, .. } => { - assert!(expected_connections.remove(&peer_id)) + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let SessionInfo { peer_id, .. } = info; + assert!(expected_connections.remove(&peer_id)); } - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert!(expected_peers.remove(&peer_id)) } } @@ -496,11 +501,16 @@ async fn test_geth_disconnect() { handle.add_peer(geth_peer_id, geth_socket); match events.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id), + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => { + assert_eq!(peer_id, geth_peer_id) + } _ => panic!("Expected a peer added event"), } - if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await { + if let Some(NetworkEvent::Peer(PeerEvent::SessionEstablished(session_info))) = + events.next().await + { + let SessionInfo { peer_id, .. } = session_info; assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session established event"); @@ -510,7 +520,9 @@ async fn test_geth_disconnect() { handle.disconnect_peer(geth_peer_id); // wait for a disconnect from geth - if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await { + if let Some(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. })) = + events.next().await + { assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session closed event"); diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index 3f74db3d37f1..71152c29bb83 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -6,7 +6,10 @@ use reth_network::{ test_utils::{PeerConfig, Testnet}, NetworkEvent, NetworkEventListenerProvider, }; -use reth_network_api::{NetworkInfo, Peers}; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + NetworkInfo, Peers, +}; use reth_provider::test_utils::NoopProvider; #[tokio::test(flavor = "multi_thread")] @@ -28,10 +31,11 @@ async fn test_session_established_with_highest_version() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { + let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth68); } @@ -66,10 +70,11 @@ async fn test_session_established_with_different_capability() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { + let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth66); } diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index ebde61ef8ea1..c9911885ad87 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -7,7 +7,7 @@ use alloy_primitives::{PrimitiveSignature as Signature, U256}; use futures::StreamExt; use rand::thread_rng; use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider}; -use reth_network_api::PeersInfo; +use reth_network_api::{events::PeerEvent, PeersInfo}; use reth_primitives::TransactionSigned; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; @@ -139,16 +139,17 @@ async fn test_sending_invalid_transactions() { // await disconnect for bad tx spam if let Some(ev) = peer1_events.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { assert_eq!(peer_id, *peer0.peer_id()); } - NetworkEvent::SessionEstablished { .. } => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished { .. }) => { panic!("unexpected SessionEstablished event") } - NetworkEvent::PeerAdded(_) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(_)) => { panic!("unexpected PeerAdded event") } - NetworkEvent::PeerRemoved(_) => { + NetworkEvent::Peer(PeerEvent::PeerRemoved(_)) => { panic!("unexpected PeerRemoved event") } } diff --git a/docs/crates/network.md b/docs/crates/network.md index be2c7cb3b143..7e38ac5d6014 100644 --- a/docs/crates/network.md +++ b/docs/crates/network.md @@ -787,8 +787,24 @@ The `TransactionsManager.network_events` stream is the first to have all of its The events received in this channel are of type `NetworkEvent`: [File: crates/net/network/src/manager.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/manager.rs) + +```rust,ignore +pub enum NetworkEvent { + /// Basic peer lifecycle event. + Peer(PeerEvent), + /// Session established with requests. + ActivePeerSession { + /// Session information + info: SessionInfo, + /// A request channel to the session task. + messages: PeerRequestSender, + }, +} +``` + +and with ```rust,ignore -pub enum NetworkEvent { +pub enum PeerEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -797,29 +813,29 @@ pub enum NetworkEvent { reason: Option, }, /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Status, - }, + SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), /// Event emitted when a new peer is removed PeerRemoved(PeerId), } ``` +[File: crates/net/network-api/src/events.rs](https://github.com/paradigmxyz/reth/blob/c46b5fc1157d12184d1dceb4dc45e26cf74b2bc6/crates/net/network-api/src/events.rs) -They're handled with the `on_network_event` method, which responds to the two variants of the `NetworkEvent` enum in the following ways: +They're handled with the `on_network_event` method, which processes session events through both `NetworkEvent::Peer(PeerEvent::SessionClosed)`, `NetworkEvent::Peer(PeerEvent::SessionEstablished)`, and `NetworkEvent::ActivePeerSession` for initializing peer connections and transaction broadcasting. -**`NetworkEvent::SessionClosed`** +Variants of the `PeerEvent` enum are defined in the following ways: + +**`PeerEvent::PeerAdded`** +Adds a peer to the network node via network handle + +**`PeerEvent::PeerRemoved`** Removes the peer given by `NetworkEvent::SessionClosed.peer_id` from the `TransactionsManager.peers` map. -**`NetworkEvent::SessionEstablished`** +**`PeerEvent::SessionClosed`** +Closes the peer session after disconnection + +**`PeerEvent::SessionEstablished`** Begins by inserting a `Peer` into `TransactionsManager.peers` by `peer_id`, which is a struct of the following form: [File: crates/net/network/src/transactions.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/transactions.rs) @@ -840,33 +856,30 @@ After the `Peer` is added to `TransactionsManager.peers`, the hashes of all of t [File: crates/net/network/src/transactions.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/transactions.rs) ```rust,ignore -fn on_network_event(&mut self, event: NetworkEvent) { - match event { - NetworkEvent::SessionClosed { peer_id, .. } => { +fn on_network_event(&mut self, event_result: NetworkEvent) { + match event_result { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); + self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::SessionEstablished { peer_id, messages, .. } => { - // insert a new peer - self.peers.insert( - peer_id, - Peer { - transactions: LruCache::new( - NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(), - ), - request_tx: messages, - }, - ); - - // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the - // pool - let msg = NewPooledTransactionHashes(self.pool.pooled_transactions()); - self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { - peer_id, - msg, - }) + NetworkEvent::ActivePeerSession { info, messages } => { + // process active peer session and broadcast available transaction from the pool + self.handle_peer_session(info, messages); + } + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; + // get messages from existing peer + let messages = match self.peers.get(&peer_id) { + Some(p) => p.request_tx.clone(), + None => { + debug!(target: "net::tx", ?peer_id, "No peer request sender found"); + return; + } + }; + self.handle_peer_session(info, messages); } - _ => {} + _ => {} } } ``` diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index 9e83f34e92f3..cea87918322b 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -17,7 +17,10 @@ use reth_discv4::Discv4ConfigBuilder; use reth_network::{ EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; -use reth_network_api::PeersInfo; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + PeersInfo, +}; use reth_primitives::{ForkHash, ForkId}; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, @@ -78,10 +81,11 @@ async fn main() { // For the sake of the example we only print the session established event // with the chain specific details match evt { - NetworkEvent::SessionEstablished { status, client_version, peer_id, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { + let SessionInfo { status, client_version, peer_id, .. } = info; info!(peers=%net_handle.num_connected_peers() , %peer_id, chain = %status.chain, ?client_version, "Session established with a new peer."); } - NetworkEvent::SessionClosed { peer_id, reason } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => { info!(peers=%net_handle.num_connected_peers() , %peer_id, ?reason, "Session closed."); } diff --git a/examples/polygon-p2p/Cargo.toml b/examples/polygon-p2p/Cargo.toml index e18f32a64737..34536ed52d71 100644 --- a/examples/polygon-p2p/Cargo.toml +++ b/examples/polygon-p2p/Cargo.toml @@ -16,6 +16,7 @@ secp256k1 = { workspace = true, features = [ tokio.workspace = true reth-network.workspace = true reth-chainspec.workspace = true +reth-network-api.workspace = true reth-primitives.workspace = true serde_json.workspace = true reth-tracing.workspace = true diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index bcc17a24f8d2..bae5399d9cd6 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -15,6 +15,7 @@ use reth_network::{ config::NetworkMode, EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; +use reth_network_api::events::SessionInfo; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, Tracer, @@ -71,7 +72,8 @@ async fn main() { while let Some(evt) = events.next().await { // For the sake of the example we only print the session established event // with the chain specific details - if let NetworkEvent::SessionEstablished { status, client_version, .. } = evt { + if let NetworkEvent::ActivePeerSession { info, .. } = evt { + let SessionInfo { status, client_version, .. } = info; let chain = status.chain; info!(?chain, ?client_version, "Session established with a new peer."); }