From a7b3347c7b56563f1d26c43c775cf04b39cac8cb Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Thu, 28 Nov 2024 13:44:25 +0100 Subject: [PATCH 1/8] refactor: split NetworkEventListenerProvider with network and peer events --- Cargo.lock | 2 + crates/e2e-test-utils/Cargo.toml | 1 + crates/e2e-test-utils/src/network.rs | 7 +- crates/net/network-api/src/events.rs | 106 +++++++++--------- crates/net/network/src/manager.rs | 24 ++-- crates/net/network/src/network.rs | 11 ++ crates/net/network/src/test_utils/testnet.rs | 30 +++-- crates/net/network/src/transactions/mod.rs | 112 +++++-------------- crates/net/network/tests/it/connect.rs | 28 +++-- crates/net/network/tests/it/session.rs | 15 ++- crates/net/network/tests/it/txgossip.rs | 11 +- examples/bsc-p2p/src/main.rs | 10 +- examples/polygon-p2p/Cargo.toml | 1 + examples/polygon-p2p/src/main.rs | 4 +- 14 files changed, 185 insertions(+), 177 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33d60eac3f0d..36c2e771d45c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3035,6 +3035,7 @@ dependencies = [ "reth-chainspec", "reth-discv4", "reth-network", + "reth-network-api", "reth-primitives", "reth-tracing", "secp256k1", @@ -7109,6 +7110,7 @@ dependencies = [ "reth-chainspec", "reth-db", "reth-engine-local", + "reth-network-api", "reth-network-peers", "reth-node-api", "reth-node-builder", diff --git a/crates/e2e-test-utils/Cargo.toml b/crates/e2e-test-utils/Cargo.toml index 77b19085d401..bf24cfe7286d 100644 --- a/crates/e2e-test-utils/Cargo.toml +++ b/crates/e2e-test-utils/Cargo.toml @@ -16,6 +16,7 @@ reth-chainspec.workspace = true reth-tracing.workspace = true reth-db = { workspace = true, features = ["test-utils"] } reth-rpc-layer.workspace = true +reth-network-api.workspace = true reth-payload-builder = { workspace = true, features = ["test-utils"] } reth-payload-builder-primitives.workspace = true reth-payload-primitives.workspace = true diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 3f25915b35b4..bf29b019c02a 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,9 +1,9 @@ use futures_util::StreamExt; use reth::network::{NetworkEvent, NetworkEventListenerProvider, PeersHandleProvider, PeersInfo}; +use reth_network_api::events::PeerEvent; use reth_network_peers::{NodeRecord, PeerId}; use reth_tokio_util::EventStream; use reth_tracing::tracing::info; - /// Helper for network operations #[derive(Debug)] pub struct NetworkTestContext { @@ -26,7 +26,7 @@ where self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr()); match self.network_events.next().await { - Some(NetworkEvent::PeerAdded(_)) => (), + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (), ev => panic!("Expected a peer added event, got: {ev:?}"), } } @@ -40,7 +40,8 @@ where pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.network_events.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; info!("Session established with peer: {:?}", peer_id); return Some(peer_id) } diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 624c43f5e1ba..6e811ed51f3c 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -16,23 +16,30 @@ use reth_tokio_util::EventStream; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; -/// Provides event subscription for the network. -#[auto_impl::auto_impl(&, Arc)] -pub trait NetworkEventListenerProvider: Send + Sync { - /// Creates a new [`NetworkEvent`] listener channel. - fn event_listener(&self) -> EventStream; - /// Returns a new [`DiscoveryEvent`] stream. - /// - /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. - fn discovery_listener(&self) -> UnboundedReceiverStream; +/// Represents information about an established peer session. +#[derive(Debug, Clone)] +pub struct SessionInfo { + /// The identifier of the peer to which a session was established. + pub peer_id: PeerId, + /// The remote addr of the peer to which a session was established. + pub remote_addr: SocketAddr, + /// The client version of the peer to which a session was established. + pub client_version: Arc, + /// Capabilities the peer announced. + pub capabilities: Arc, + /// The status of the peer to which a session was established. + pub status: Arc, + /// Negotiated eth version of the session. + pub version: EthVersion, } -/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers. +/// (Non-exhaustive) List of the different events emitted by the network that are of interest for +/// subscribers. /// /// This includes any event types that may be relevant to tasks, for metrics, keep track of peers /// etc. -#[derive(Debug)] -pub enum NetworkEvent { +#[derive(Debug, Clone)] +pub enum PeerEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -40,58 +47,57 @@ pub enum NetworkEvent { /// Why the disconnect was triggered reason: Option, }, - /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// The remote addr of the peer to which a session was established. - remote_addr: SocketAddr, - /// The client version of the peer to which a session was established. - client_version: Arc, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Arc, - /// negotiated eth version of the session - version: EthVersion, - }, + /// Established a new session with the given peer + SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), /// Event emitted when a new peer is removed PeerRemoved(PeerId), } +/// (Non-exhaustive) Events that combine peer lifecycle with request handling capabilities. +#[derive(Debug)] +pub enum NetworkEvent { + /// Basic peer lifecycle event. + Peer(PeerEvent), + /// Session established with request capabilities. + RequestCapableSession { + /// Session information + info: SessionInfo, + /// A request channel to the session task. + messages: PeerRequestSender, + }, +} + impl Clone for NetworkEvent { fn clone(&self) -> Self { match self { - Self::SessionClosed { peer_id, reason } => { - Self::SessionClosed { peer_id: *peer_id, reason: *reason } + Self::Peer(event) => Self::Peer(event.clone()), + Self::RequestCapableSession { info, messages } => { + Self::RequestCapableSession { info: info.clone(), messages: messages.clone() } } - Self::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => Self::SessionEstablished { - peer_id: *peer_id, - remote_addr: *remote_addr, - client_version: client_version.clone(), - capabilities: capabilities.clone(), - messages: messages.clone(), - status: status.clone(), - version: *version, - }, - Self::PeerAdded(peer) => Self::PeerAdded(*peer), - Self::PeerRemoved(peer) => Self::PeerRemoved(*peer), } } } +/// Provides peer event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkPeersEvents: Send + Sync { + /// Creates a new [`PeerEvent`] listener channel. + fn peer_events(&self) -> EventStream; +} + +/// Provides event subscription for the network. +#[auto_impl::auto_impl(&, Arc)] +pub trait NetworkEventListenerProvider: NetworkPeersEvents { + /// Creates a new [`NetworkEvent`] listener channel. + fn event_listener(&self) -> EventStream>; + /// Returns a new [`DiscoveryEvent`] stream. + /// + /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. + fn discovery_listener(&self) -> UnboundedReceiverStream; +} + /// Events produced by the `Discovery` manager. #[derive(Debug, Clone, PartialEq, Eq)] pub enum DiscoveryEvent { diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index c1db91773e38..172cb56d501b 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -44,7 +44,9 @@ use reth_eth_wire::{ use reth_fs_util::{self as fs, FsPathError}; use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_network_api::{ - test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, + events::{PeerEvent, SessionInfo}, + test_utils::PeersHandle, + EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_network_types::ReputationChangeKind; @@ -254,6 +256,8 @@ impl NetworkManager { let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel(); + let peers_sender: EventSender = Default::default(); + let event_sender: EventSender>> = Default::default(); let handle = NetworkHandle::new( @@ -268,6 +272,7 @@ impl NetworkManager { tx_gossip_disabled, discv4, discv5, + peers_sender, event_sender.clone(), nat, ); @@ -709,24 +714,26 @@ impl NetworkManager { self.update_active_connection_metrics(); - self.event_sender.notify(NetworkEvent::SessionEstablished { + let session_info = SessionInfo { peer_id, remote_addr, client_version, capabilities, - version, status, - messages, - }); + version, + }; + + self.event_sender + .notify(NetworkEvent::RequestCapableSession { info: session_info, messages }); } SwarmEvent::PeerAdded(peer_id) => { trace!(target: "net", ?peer_id, "Peer added"); - self.event_sender.notify(NetworkEvent::PeerAdded(peer_id)); + self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))); self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::PeerRemoved(peer_id) => { trace!(target: "net", ?peer_id, "Peer dropped"); - self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id)); + self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))); self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64); } SwarmEvent::SessionClosed { peer_id, remote_addr, error } => { @@ -769,7 +776,8 @@ impl NetworkManager { .saturating_sub(1) as f64, ); - self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason }); + self.event_sender + .notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason })); } SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => { trace!( diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index eadeccb15493..dd908e5bb268 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -13,6 +13,7 @@ use reth_eth_wire::{ }; use reth_ethereum_forks::Head; use reth_network_api::{ + events::{NetworkPeersEvents, PeerEvent}, test_utils::{PeersHandle, PeersHandleProvider}, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers, @@ -62,6 +63,7 @@ impl NetworkHandle { tx_gossip_disabled: bool, discv4: Option, discv5: Option, + peer_events_sender: EventSender, event_sender: EventSender>>, nat: Option, ) -> Self { @@ -79,6 +81,7 @@ impl NetworkHandle { tx_gossip_disabled, discv4, discv5, + peer_events_sender, event_sender, nat, }; @@ -187,6 +190,12 @@ impl NetworkHandle { // === API Implementations === +impl NetworkPeersEvents for NetworkHandle { + fn peer_events(&self) -> EventStream { + self.inner.peer_events_sender.new_listener() + } +} + impl NetworkEventListenerProvider for NetworkHandle { fn event_listener(&self) -> EventStream>> { self.inner.event_sender.new_listener() @@ -435,6 +444,8 @@ struct NetworkInner { discv4: Option, /// The instance of the discv5 service discv5: Option, + /// Sender for basic peer lifecycle events. + peer_events_sender: EventSender, /// Sender for high level network events. event_sender: EventSender>>, /// The NAT resolver diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 9801ecf9293a..cb021d620d4a 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -13,6 +13,7 @@ use pin_project::pin_project; use reth_chainspec::{Hardforks, MAINNET}; use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols}; use reth_network_api::{ + events::{PeerEvent, SessionInfo}, test_utils::{PeersHandle, PeersHandleProvider}, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers, }; @@ -625,7 +626,9 @@ impl NetworkEventStream { pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option)> { while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)), + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => { + return Some((peer_id, reason)) + } _ => continue, } } @@ -635,9 +638,8 @@ impl NetworkEventStream { /// Awaits the next event for an established session pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.inner.next().await { - match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id), - _ => continue, + if let NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) = ev { + return Some(info.peer_id) } } None @@ -646,12 +648,14 @@ impl NetworkEventStream { /// Awaits the next `num` events for an established session pub async fn take_session_established(&mut self, mut num: usize) -> Vec { if num == 0 { - return Vec::new() + return Vec::new(); } let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::SessionEstablished { peer_id, .. } => { + NetworkEvent::RequestCapableSession { + info: SessionInfo { peer_id, .. }, .. + } => { peers.push(peer_id); num -= 1; if num == 0 { @@ -664,18 +668,24 @@ impl NetworkEventStream { peers } - /// Ensures that the first two events are a [`NetworkEvent::PeerAdded`] and + /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and /// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established /// session. pub async fn peer_added_and_established(&mut self) -> Option { let peer_id = match self.inner.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => peer_id, + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id, _ => return None, }; match self.inner.next().await { - Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => { - debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"); + Some(NetworkEvent::RequestCapableSession { + info: SessionInfo { peer_id: peer_id2, .. }, + .. + }) => { + debug_assert_eq!( + peer_id, peer_id2, + "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}" + ); Some(peer_id) } _ => None, diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index d533aee102b3..18c3c6df7843 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -40,6 +40,7 @@ use reth_eth_wire::{ }; use reth_metrics::common::mpsc::UnboundedMeteredReceiver; use reth_network_api::{ + events::{PeerEvent, SessionInfo}, NetworkEvent, NetworkEventListenerProvider, PeerRequest, PeerRequestSender, Peers, }; use reth_network_p2p::{ @@ -1076,13 +1077,14 @@ where /// Handles a received event related to common network events. fn on_network_event(&mut self, event_result: NetworkEvent) { match event_result { - NetworkEvent::SessionClosed { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::SessionEstablished { - peer_id, client_version, messages, version, .. + NetworkEvent::RequestCapableSession { + info: SessionInfo { peer_id, client_version, version, .. }, + messages, } => { // Insert a new peer into the peerset. let peer = PeerMetadata::new( @@ -1969,27 +1971,12 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions + .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2055,27 +2042,13 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { - // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => + // to insert a new peer in transactions peerset + { + transactions + .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2139,27 +2112,12 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => { + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { // to insert a new peer in transactions peerset - transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }) + transactions + .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) } - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2230,24 +2188,9 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - } => transactions.on_network_event(NetworkEvent::SessionEstablished { - peer_id, - remote_addr, - client_version, - capabilities, - messages, - status, - version, - }), - NetworkEvent::PeerAdded(_peer_id) => continue, + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => transactions + .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))), + NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") } @@ -2477,17 +2420,18 @@ mod tests { network.handle().update_sync_state(SyncState::Idle); // mock a peer - let (tx, _rx) = mpsc::channel(1); - tx_manager.on_network_event(NetworkEvent::SessionEstablished { + let (tx, _rx) = mpsc::channel::(1); + let session_info = SessionInfo { peer_id, remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0), client_version: Arc::from(""), capabilities: Arc::new(vec![].into()), - messages: PeerRequestSender::new(peer_id, tx), status: Arc::new(Default::default()), version: EthVersion::Eth68, - }); - + }; + let messages: PeerRequestSender = PeerRequestSender::new(peer_id, tx); + tx_manager + .on_network_event(NetworkEvent::RequestCapableSession { info: session_info, messages }); let mut propagate = vec![]; let mut factory = MockTransactionFactory::default(); let eip1559_tx = Arc::new(factory.create_eip1559()); diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 0a17cbd563ed..295785004ac5 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -15,7 +15,10 @@ use reth_network::{ BlockDownloaderProvider, NetworkConfigBuilder, NetworkEvent, NetworkEventListenerProvider, NetworkManager, PeersConfig, }; -use reth_network_api::{NetworkInfo, Peers, PeersInfo}; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + NetworkInfo, Peers, PeersInfo, +}; use reth_network_p2p::{ headers::client::{HeadersClient, HeadersRequest}, sync::{NetworkSyncUpdater, SyncState}, @@ -59,13 +62,15 @@ async fn test_establish_connections() { let mut established = listener0.take(4); while let Some(ev) = established.next().await { match ev { - NetworkEvent::SessionClosed { .. } | NetworkEvent::PeerRemoved(_) => { + NetworkEvent::Peer(PeerEvent::SessionClosed { .. } | PeerEvent::PeerRemoved(_)) => { panic!("unexpected event") } - NetworkEvent::SessionEstablished { peer_id, .. } => { - assert!(expected_connections.remove(&peer_id)) + NetworkEvent::RequestCapableSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let SessionInfo { peer_id, .. } = info; + assert!(expected_connections.remove(&peer_id)); } - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert!(expected_peers.remove(&peer_id)) } } @@ -496,11 +501,16 @@ async fn test_geth_disconnect() { handle.add_peer(geth_peer_id, geth_socket); match events.next().await { - Some(NetworkEvent::PeerAdded(peer_id)) => assert_eq!(peer_id, geth_peer_id), + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => { + assert_eq!(peer_id, geth_peer_id) + } _ => panic!("Expected a peer added event"), } - if let Some(NetworkEvent::SessionEstablished { peer_id, .. }) = events.next().await { + if let Some(NetworkEvent::Peer(PeerEvent::SessionEstablished(session_info))) = + events.next().await + { + let SessionInfo { peer_id, .. } = session_info; assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session established event"); @@ -510,7 +520,9 @@ async fn test_geth_disconnect() { handle.disconnect_peer(geth_peer_id); // wait for a disconnect from geth - if let Some(NetworkEvent::SessionClosed { peer_id, .. }) = events.next().await { + if let Some(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. })) = + events.next().await + { assert_eq!(peer_id, geth_peer_id); } else { panic!("Expected a session closed event"); diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index 3f74db3d37f1..fdf7c967f965 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -6,7 +6,10 @@ use reth_network::{ test_utils::{PeerConfig, Testnet}, NetworkEvent, NetworkEventListenerProvider, }; -use reth_network_api::{NetworkInfo, Peers}; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + NetworkInfo, Peers, +}; use reth_provider::test_utils::NoopProvider; #[tokio::test(flavor = "multi_thread")] @@ -28,10 +31,11 @@ async fn test_session_established_with_highest_version() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + NetworkEvent::RequestCapableSession { info, .. } => { + let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth68); } @@ -66,10 +70,11 @@ async fn test_session_established_with_different_capability() { while let Some(event) = events.next().await { match event { - NetworkEvent::PeerAdded(peer_id) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::SessionEstablished { peer_id, status, .. } => { + NetworkEvent::RequestCapableSession { info, .. } => { + let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth66); } diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index ebde61ef8ea1..ce6c7ef677aa 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -7,7 +7,7 @@ use alloy_primitives::{PrimitiveSignature as Signature, U256}; use futures::StreamExt; use rand::thread_rng; use reth_network::{test_utils::Testnet, NetworkEvent, NetworkEventListenerProvider}; -use reth_network_api::PeersInfo; +use reth_network_api::{events::PeerEvent, PeersInfo}; use reth_primitives::TransactionSigned; use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; @@ -139,16 +139,17 @@ async fn test_sending_invalid_transactions() { // await disconnect for bad tx spam if let Some(ev) = peer1_events.next().await { match ev { - NetworkEvent::SessionClosed { peer_id, .. } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { assert_eq!(peer_id, *peer0.peer_id()); } - NetworkEvent::SessionEstablished { .. } => { + NetworkEvent::RequestCapableSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished { .. }) => { panic!("unexpected SessionEstablished event") } - NetworkEvent::PeerAdded(_) => { + NetworkEvent::Peer(PeerEvent::PeerAdded(_)) => { panic!("unexpected PeerAdded event") } - NetworkEvent::PeerRemoved(_) => { + NetworkEvent::Peer(PeerEvent::PeerRemoved(_)) => { panic!("unexpected PeerRemoved event") } } diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index 9e83f34e92f3..55aa9e2e142d 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -17,7 +17,10 @@ use reth_discv4::Discv4ConfigBuilder; use reth_network::{ EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; -use reth_network_api::PeersInfo; +use reth_network_api::{ + events::{PeerEvent, SessionInfo}, + PeersInfo, +}; use reth_primitives::{ForkHash, ForkId}; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, @@ -78,10 +81,11 @@ async fn main() { // For the sake of the example we only print the session established event // with the chain specific details match evt { - NetworkEvent::SessionEstablished { status, client_version, peer_id, .. } => { + NetworkEvent::RequestCapableSession { info, .. } => { + let SessionInfo { status, client_version, peer_id, .. } = info; info!(peers=%net_handle.num_connected_peers() , %peer_id, chain = %status.chain, ?client_version, "Session established with a new peer."); } - NetworkEvent::SessionClosed { peer_id, reason } => { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => { info!(peers=%net_handle.num_connected_peers() , %peer_id, ?reason, "Session closed."); } diff --git a/examples/polygon-p2p/Cargo.toml b/examples/polygon-p2p/Cargo.toml index e18f32a64737..34536ed52d71 100644 --- a/examples/polygon-p2p/Cargo.toml +++ b/examples/polygon-p2p/Cargo.toml @@ -16,6 +16,7 @@ secp256k1 = { workspace = true, features = [ tokio.workspace = true reth-network.workspace = true reth-chainspec.workspace = true +reth-network-api.workspace = true reth-primitives.workspace = true serde_json.workspace = true reth-tracing.workspace = true diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index bcc17a24f8d2..3e78465e699c 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -15,6 +15,7 @@ use reth_network::{ config::NetworkMode, EthNetworkPrimitives, NetworkConfig, NetworkEvent, NetworkEventListenerProvider, NetworkManager, }; +use reth_network_api::events::SessionInfo; use reth_tracing::{ tracing::info, tracing_subscriber::filter::LevelFilter, LayerInfo, LogFormat, RethTracer, Tracer, @@ -71,7 +72,8 @@ async fn main() { while let Some(evt) = events.next().await { // For the sake of the example we only print the session established event // with the chain specific details - if let NetworkEvent::SessionEstablished { status, client_version, .. } = evt { + if let NetworkEvent::RequestCapableSession { info, .. } = evt { + let SessionInfo { status, client_version, .. } = info; let chain = status.chain; info!(?chain, ?client_version, "Session established with a new peer."); } From bdfd6391d3d0ed04e60e2a10d019fda9c82db037 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Thu, 28 Nov 2024 16:42:27 +0100 Subject: [PATCH 2/8] style: rename variant --- crates/net/network-api/src/events.rs | 11 ++++++----- crates/net/network/src/manager.rs | 2 +- crates/net/network/src/test_utils/testnet.rs | 6 ++---- crates/net/network/src/transactions/mod.rs | 4 ++-- crates/net/network/tests/it/connect.rs | 2 +- crates/net/network/tests/it/session.rs | 4 ++-- crates/net/network/tests/it/txgossip.rs | 2 +- examples/bsc-p2p/src/main.rs | 2 +- examples/polygon-p2p/src/main.rs | 2 +- 9 files changed, 17 insertions(+), 18 deletions(-) diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 6e811ed51f3c..e753b7e6440a 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -55,13 +55,14 @@ pub enum PeerEvent { PeerRemoved(PeerId), } -/// (Non-exhaustive) Events that combine peer lifecycle with request handling capabilities. +/// (Non-exhaustive) Network events representing peer lifecycle events and session requests. +/// sessions #[derive(Debug)] pub enum NetworkEvent { /// Basic peer lifecycle event. Peer(PeerEvent), - /// Session established with request capabilities. - RequestCapableSession { + /// Session established with requests. + ActivePeerSession { /// Session information info: SessionInfo, /// A request channel to the session task. @@ -73,8 +74,8 @@ impl Clone for NetworkEvent { fn clone(&self) -> Self { match self { Self::Peer(event) => Self::Peer(event.clone()), - Self::RequestCapableSession { info, messages } => { - Self::RequestCapableSession { info: info.clone(), messages: messages.clone() } + Self::ActivePeerSession { info, messages } => { + Self::ActivePeerSession { info: info.clone(), messages: messages.clone() } } } } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 172cb56d501b..53e7cff9fdd8 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -724,7 +724,7 @@ impl NetworkManager { }; self.event_sender - .notify(NetworkEvent::RequestCapableSession { info: session_info, messages }); + .notify(NetworkEvent::ActivePeerSession { info: session_info, messages }); } SwarmEvent::PeerAdded(peer_id) => { trace!(target: "net", ?peer_id, "Peer added"); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index cb021d620d4a..5255e826ea8d 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -653,9 +653,7 @@ impl NetworkEventStream { let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { match ev { - NetworkEvent::RequestCapableSession { - info: SessionInfo { peer_id, .. }, .. - } => { + NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } => { peers.push(peer_id); num -= 1; if num == 0 { @@ -678,7 +676,7 @@ impl NetworkEventStream { }; match self.inner.next().await { - Some(NetworkEvent::RequestCapableSession { + Some(NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id: peer_id2, .. }, .. }) => { diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 18c3c6df7843..691c6a469048 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1082,7 +1082,7 @@ where self.peers.remove(&peer_id); self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::RequestCapableSession { + NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, client_version, version, .. }, messages, } => { @@ -2431,7 +2431,7 @@ mod tests { }; let messages: PeerRequestSender = PeerRequestSender::new(peer_id, tx); tx_manager - .on_network_event(NetworkEvent::RequestCapableSession { info: session_info, messages }); + .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages }); let mut propagate = vec![]; let mut factory = MockTransactionFactory::default(); let eip1559_tx = Arc::new(factory.create_eip1559()); diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 295785004ac5..77044f4b72d2 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -65,7 +65,7 @@ async fn test_establish_connections() { NetworkEvent::Peer(PeerEvent::SessionClosed { .. } | PeerEvent::PeerRemoved(_)) => { panic!("unexpected event") } - NetworkEvent::RequestCapableSession { info, .. } | + NetworkEvent::ActivePeerSession { info, .. } | NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { let SessionInfo { peer_id, .. } = info; assert!(expected_connections.remove(&peer_id)); diff --git a/crates/net/network/tests/it/session.rs b/crates/net/network/tests/it/session.rs index fdf7c967f965..71152c29bb83 100644 --- a/crates/net/network/tests/it/session.rs +++ b/crates/net/network/tests/it/session.rs @@ -34,7 +34,7 @@ async fn test_session_established_with_highest_version() { NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::RequestCapableSession { info, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth68); @@ -73,7 +73,7 @@ async fn test_session_established_with_different_capability() { NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)) => { assert_eq!(handle1.peer_id(), &peer_id); } - NetworkEvent::RequestCapableSession { info, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { let SessionInfo { peer_id, status, .. } = info; assert_eq!(handle1.peer_id(), &peer_id); assert_eq!(status.version, EthVersion::Eth66); diff --git a/crates/net/network/tests/it/txgossip.rs b/crates/net/network/tests/it/txgossip.rs index ce6c7ef677aa..c9911885ad87 100644 --- a/crates/net/network/tests/it/txgossip.rs +++ b/crates/net/network/tests/it/txgossip.rs @@ -142,7 +142,7 @@ async fn test_sending_invalid_transactions() { NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { assert_eq!(peer_id, *peer0.peer_id()); } - NetworkEvent::RequestCapableSession { .. } | + NetworkEvent::ActivePeerSession { .. } | NetworkEvent::Peer(PeerEvent::SessionEstablished { .. }) => { panic!("unexpected SessionEstablished event") } diff --git a/examples/bsc-p2p/src/main.rs b/examples/bsc-p2p/src/main.rs index 55aa9e2e142d..cea87918322b 100644 --- a/examples/bsc-p2p/src/main.rs +++ b/examples/bsc-p2p/src/main.rs @@ -81,7 +81,7 @@ async fn main() { // For the sake of the example we only print the session established event // with the chain specific details match evt { - NetworkEvent::RequestCapableSession { info, .. } => { + NetworkEvent::ActivePeerSession { info, .. } => { let SessionInfo { status, client_version, peer_id, .. } = info; info!(peers=%net_handle.num_connected_peers() , %peer_id, chain = %status.chain, ?client_version, "Session established with a new peer."); } diff --git a/examples/polygon-p2p/src/main.rs b/examples/polygon-p2p/src/main.rs index 3e78465e699c..bae5399d9cd6 100644 --- a/examples/polygon-p2p/src/main.rs +++ b/examples/polygon-p2p/src/main.rs @@ -72,7 +72,7 @@ async fn main() { while let Some(evt) = events.next().await { // For the sake of the example we only print the session established event // with the chain specific details - if let NetworkEvent::RequestCapableSession { info, .. } = evt { + if let NetworkEvent::ActivePeerSession { info, .. } = evt { let SessionInfo { status, client_version, .. } = info; let chain = status.chain; info!(?chain, ?client_version, "Session established with a new peer."); From 908ff228780bf3523b93e6f7e3676be743af4475 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Thu, 28 Nov 2024 17:18:22 +0100 Subject: [PATCH 3/8] chore: fmt + docs --- crates/e2e-test-utils/src/network.rs | 4 ++-- crates/net/network/src/test_utils/testnet.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index c924185906a3..995ef1ed9cff 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -1,7 +1,7 @@ use futures_util::StreamExt; use reth_network_api::{ - test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, PeersInfo, - events::PeerEvent + events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, + PeersInfo, }; use reth_network_peers::{NodeRecord, PeerId}; use reth_tokio_util::EventStream; diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 5255e826ea8d..683df13dcd8c 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -667,7 +667,7 @@ impl NetworkEventStream { } /// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and - /// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established + /// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established /// session. pub async fn peer_added_and_established(&mut self) -> Option { let peer_id = match self.inner.next().await { From c46b5fc1157d12184d1dceb4dc45e26cf74b2bc6 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 29 Nov 2024 12:36:24 +0100 Subject: [PATCH 4/8] fix: fix tests by pushing whole event for both cases --- crates/e2e-test-utils/src/network.rs | 1 + crates/net/network/src/test_utils/testnet.rs | 10 +- crates/net/network/src/transactions/mod.rs | 123 +++++++++++-------- 3 files changed, 80 insertions(+), 54 deletions(-) diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 995ef1ed9cff..516cb5fb52ca 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -42,6 +42,7 @@ where pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.network_events.next().await { match ev { + NetworkEvent::ActivePeerSession { info, .. } | NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { let peer_id = info.peer_id; info!("Session established with peer: {:?}", peer_id); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 683df13dcd8c..61bfafb6c253 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -638,8 +638,12 @@ impl NetworkEventStream { /// Awaits the next event for an established session pub async fn next_session_established(&mut self) -> Option { while let Some(ev) = self.inner.next().await { - if let NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) = ev { - return Some(info.peer_id) + match ev { + NetworkEvent::ActivePeerSession { info, .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + return Some(info.peer_id) + } + _ => continue, } } None @@ -648,7 +652,7 @@ impl NetworkEventStream { /// Awaits the next `num` events for an established session pub async fn take_session_established(&mut self, mut num: usize) -> Vec { if num == 0 { - return Vec::new(); + return Vec::new() } let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 691c6a469048..e55602814d25 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1074,6 +1074,54 @@ where } } + /// Handles session establishment and peer transactions initialization. + fn handle_peer_session(&mut self, info: SessionInfo, messages: PeerRequestSender) { + let SessionInfo { peer_id, client_version, version, .. } = info; + + // Insert a new peer into the peerset. + let peer = PeerMetadata::new( + messages, + version, + client_version, + self.config.max_transactions_seen_by_peer_history, + ); + let peer = match self.peers.entry(peer_id) { + Entry::Occupied(mut entry) => { + entry.insert(peer); + entry.into_mut() + } + Entry::Vacant(entry) => entry.insert(peer), + }; + + // Send a `NewPooledTransactionHashes` to the peer with up to + // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` + // transactions in the pool. + if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { + trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled"); + return + } + + // Get transactions to broadcast + let pooled_txs = self.pool.pooled_transactions_max( + SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, + ); + if pooled_txs.is_empty() { + trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast"); + return; + } + + // Build and send transaction hashes message + let mut msg_builder = PooledTransactionsHashesBuilder::new(version); + for pooled_tx in pooled_txs { + peer.seen_transactions.insert(*pooled_tx.hash()); + msg_builder.push_pooled(pooled_tx); + } + + debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes"); + let msg = msg_builder.build(); + self.network.send_transactions_hashes(peer_id, msg); + } + /// Handles a received event related to common network events. fn on_network_event(&mut self, event_result: NetworkEvent) { match event_result { @@ -1082,48 +1130,20 @@ where self.peers.remove(&peer_id); self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::ActivePeerSession { - info: SessionInfo { peer_id, client_version, version, .. }, - messages, - } => { - // Insert a new peer into the peerset. - let peer = PeerMetadata::new( - messages, - version, - client_version, - self.config.max_transactions_seen_by_peer_history, - ); - let peer = match self.peers.entry(peer_id) { - Entry::Occupied(mut entry) => { - entry.insert(peer); - entry.into_mut() + NetworkEvent::ActivePeerSession { info, messages } => { + self.handle_peer_session(info, messages); + } + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; + // get messages from existing peer + let messages = match self.peers.get(&peer_id) { + Some(p) => p.request_tx.clone(), + None => { + debug!(target: "net::tx", ?peer_id, "No peer request sender found"); + return; } - Entry::Vacant(entry) => entry.insert(peer), }; - - // Send a `NewPooledTransactionHashes` to the peer with up to - // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE` - // transactions in the pool. - if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() { - return - } - - let pooled_txs = self.pool.pooled_transactions_max( - SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE, - ); - if pooled_txs.is_empty() { - // do not send a message if there are no transactions in the pool - return - } - - let mut msg_builder = PooledTransactionsHashesBuilder::new(version); - for pooled_tx in pooled_txs { - peer.seen_transactions.insert(*pooled_tx.hash()); - msg_builder.push_pooled(pooled_tx); - } - - let msg = msg_builder.build(); - self.network.send_transactions_hashes(peer_id, msg); + self.handle_peer_session(info, messages); } _ => {} } @@ -2042,14 +2062,13 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => - // to insert a new peer in transactions peerset - { - transactions - .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { + // to insert a new peer in transactions peerset + transactions.on_network_event(ev); } NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, - ev => { + _ => { error!("unexpected event {ev:?}") } } @@ -2112,10 +2131,10 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { // to insert a new peer in transactions peerset - transactions - .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))) + transactions.on_network_event(ev); } NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { @@ -2188,8 +2207,10 @@ mod tests { let mut established = listener0.take(2); while let Some(ev) = established.next().await { match ev { - NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => transactions - .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info))), + NetworkEvent::ActivePeerSession { .. } | + NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => { + transactions.on_network_event(ev); + } NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => continue, ev => { error!("unexpected event {ev:?}") From e89b7e2a58799c88fa52f9baf0de3b0c7f567ebd Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Fri, 29 Nov 2024 16:30:17 +0100 Subject: [PATCH 5/8] docs: update network.md --- crates/net/network-api/src/events.rs | 2 +- crates/net/network/src/transactions/mod.rs | 1 + docs/crates/network.md | 87 +++++++++++++--------- 3 files changed, 52 insertions(+), 38 deletions(-) diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index e753b7e6440a..102a67c2d21a 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -47,7 +47,7 @@ pub enum PeerEvent { /// Why the disconnect was triggered reason: Option, }, - /// Established a new session with the given peer + /// Established a new session with the given peer. SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index e55602814d25..d1ee1556f280 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1131,6 +1131,7 @@ where self.transaction_fetcher.remove_peer(&peer_id); } NetworkEvent::ActivePeerSession { info, messages } => { + // process active peer session and broadcast available transaction from the pool self.handle_peer_session(info, messages); } NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { diff --git a/docs/crates/network.md b/docs/crates/network.md index be2c7cb3b143..7e38ac5d6014 100644 --- a/docs/crates/network.md +++ b/docs/crates/network.md @@ -787,8 +787,24 @@ The `TransactionsManager.network_events` stream is the first to have all of its The events received in this channel are of type `NetworkEvent`: [File: crates/net/network/src/manager.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/manager.rs) + +```rust,ignore +pub enum NetworkEvent { + /// Basic peer lifecycle event. + Peer(PeerEvent), + /// Session established with requests. + ActivePeerSession { + /// Session information + info: SessionInfo, + /// A request channel to the session task. + messages: PeerRequestSender, + }, +} +``` + +and with ```rust,ignore -pub enum NetworkEvent { +pub enum PeerEvent { /// Closed the peer session. SessionClosed { /// The identifier of the peer to which a session was closed. @@ -797,29 +813,29 @@ pub enum NetworkEvent { reason: Option, }, /// Established a new session with the given peer. - SessionEstablished { - /// The identifier of the peer to which a session was established. - peer_id: PeerId, - /// Capabilities the peer announced - capabilities: Arc, - /// A request channel to the session task. - messages: PeerRequestSender, - /// The status of the peer to which a session was established. - status: Status, - }, + SessionEstablished(SessionInfo), /// Event emitted when a new peer is added PeerAdded(PeerId), /// Event emitted when a new peer is removed PeerRemoved(PeerId), } ``` +[File: crates/net/network-api/src/events.rs](https://github.com/paradigmxyz/reth/blob/c46b5fc1157d12184d1dceb4dc45e26cf74b2bc6/crates/net/network-api/src/events.rs) -They're handled with the `on_network_event` method, which responds to the two variants of the `NetworkEvent` enum in the following ways: +They're handled with the `on_network_event` method, which processes session events through both `NetworkEvent::Peer(PeerEvent::SessionClosed)`, `NetworkEvent::Peer(PeerEvent::SessionEstablished)`, and `NetworkEvent::ActivePeerSession` for initializing peer connections and transaction broadcasting. -**`NetworkEvent::SessionClosed`** +Variants of the `PeerEvent` enum are defined in the following ways: + +**`PeerEvent::PeerAdded`** +Adds a peer to the network node via network handle + +**`PeerEvent::PeerRemoved`** Removes the peer given by `NetworkEvent::SessionClosed.peer_id` from the `TransactionsManager.peers` map. -**`NetworkEvent::SessionEstablished`** +**`PeerEvent::SessionClosed`** +Closes the peer session after disconnection + +**`PeerEvent::SessionEstablished`** Begins by inserting a `Peer` into `TransactionsManager.peers` by `peer_id`, which is a struct of the following form: [File: crates/net/network/src/transactions.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/transactions.rs) @@ -840,33 +856,30 @@ After the `Peer` is added to `TransactionsManager.peers`, the hashes of all of t [File: crates/net/network/src/transactions.rs](https://github.com/paradigmxyz/reth/blob/1563506aea09049a85e5cc72c2894f3f7a371581/crates/net/network/src/transactions.rs) ```rust,ignore -fn on_network_event(&mut self, event: NetworkEvent) { - match event { - NetworkEvent::SessionClosed { peer_id, .. } => { +fn on_network_event(&mut self, event_result: NetworkEvent) { + match event_result { + NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => { // remove the peer self.peers.remove(&peer_id); + self.transaction_fetcher.remove_peer(&peer_id); } - NetworkEvent::SessionEstablished { peer_id, messages, .. } => { - // insert a new peer - self.peers.insert( - peer_id, - Peer { - transactions: LruCache::new( - NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(), - ), - request_tx: messages, - }, - ); - - // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the - // pool - let msg = NewPooledTransactionHashes(self.pool.pooled_transactions()); - self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { - peer_id, - msg, - }) + NetworkEvent::ActivePeerSession { info, messages } => { + // process active peer session and broadcast available transaction from the pool + self.handle_peer_session(info, messages); + } + NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => { + let peer_id = info.peer_id; + // get messages from existing peer + let messages = match self.peers.get(&peer_id) { + Some(p) => p.request_tx.clone(), + None => { + debug!(target: "net::tx", ?peer_id, "No peer request sender found"); + return; + } + }; + self.handle_peer_session(info, messages); } - _ => {} + _ => {} } } ``` From 1507e68894bc0f27d31fff6f044949635a9d611f Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Wed, 4 Dec 2024 14:47:55 +0100 Subject: [PATCH 6/8] refactor: merge peer events streams --- crates/net/network-api/src/events.rs | 51 +++++++++++++++++++--- crates/net/network/src/manager.rs | 3 -- crates/net/network/src/network.rs | 16 ++++--- crates/net/network/src/transactions/mod.rs | 8 +++- 4 files changed, 61 insertions(+), 17 deletions(-) diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 102a67c2d21a..b365cf49aff4 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -1,7 +1,5 @@ //! API related to listening for network events. -use std::{fmt, net::SocketAddr, sync::Arc}; - use reth_eth_wire_types::{ message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage, EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData, @@ -13,8 +11,42 @@ use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_network_types::PeerAddr; use reth_tokio_util::EventStream; +use std::{fmt, net::SocketAddr, pin::Pin, sync::Arc}; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt}; + +/// A boxed stream of network peer events that provides a type-erased interface. +pub struct PeerEventStream(Pin + Send + Sync>>); + +impl fmt::Debug for PeerEventStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PeerEventStream").finish_non_exhaustive() + } +} + +impl PeerEventStream { + /// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer + /// events [`PeerEvent`] + pub fn new(stream: S) -> Self + where + S: Stream + Send + Sync + 'static, + T: Into + 'static, + { + let mapped_stream = stream.map(Into::into); + Self(Box::pin(mapped_stream)) + } +} + +impl Stream for PeerEventStream { + type Item = PeerEvent; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.0.as_mut().poll_next(cx) + } +} /// Represents information about an established peer session. #[derive(Debug, Clone)] @@ -81,11 +113,20 @@ impl Clone for NetworkEvent { } } +impl From> for PeerEvent { + fn from(event: NetworkEvent) -> Self { + match event { + NetworkEvent::Peer(peer_event) => peer_event, + NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info), + } + } +} + /// Provides peer event subscription for the network. #[auto_impl::auto_impl(&, Arc)] pub trait NetworkPeersEvents: Send + Sync { - /// Creates a new [`PeerEvent`] listener channel. - fn peer_events(&self) -> EventStream; + /// Creates a new peer event listener stream. + fn peer_events(&self) -> PeerEventStream; } /// Provides event subscription for the network. diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 3719e003b98d..89e21b9dd2db 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -256,8 +256,6 @@ impl NetworkManager { let (to_manager_tx, from_handle_rx) = mpsc::unbounded_channel(); - let peers_sender: EventSender = Default::default(); - let event_sender: EventSender>> = Default::default(); let handle = NetworkHandle::new( @@ -272,7 +270,6 @@ impl NetworkManager { tx_gossip_disabled, discv4, discv5, - peers_sender, event_sender.clone(), nat, ); diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 1ecf17369272..225b6332e0eb 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -4,6 +4,7 @@ use crate::{ }; use alloy_primitives::B256; use enr::Enr; +use futures::StreamExt; use parking_lot::Mutex; use reth_discv4::{Discv4, NatResolver}; use reth_discv5::Discv5; @@ -13,7 +14,7 @@ use reth_eth_wire::{ }; use reth_ethereum_forks::Head; use reth_network_api::{ - events::{NetworkPeersEvents, PeerEvent}, + events::{NetworkPeersEvents, PeerEvent, PeerEventStream}, test_utils::{PeersHandle, PeersHandleProvider}, BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers, @@ -63,7 +64,6 @@ impl NetworkHandle { tx_gossip_disabled: bool, discv4: Option, discv5: Option, - peer_events_sender: EventSender, event_sender: EventSender>>, nat: Option, ) -> Self { @@ -81,7 +81,6 @@ impl NetworkHandle { tx_gossip_disabled, discv4, discv5, - peer_events_sender, event_sender, nat, }; @@ -196,8 +195,13 @@ impl NetworkHandle { // === API Implementations === impl NetworkPeersEvents for NetworkHandle { - fn peer_events(&self) -> EventStream { - self.inner.peer_events_sender.new_listener() + /// Returns an event stream of peer-specific network events. + fn peer_events(&self) -> PeerEventStream { + let peer_events = self.inner.event_sender.new_listener().map(|event| match event { + NetworkEvent::Peer(peer_event) => peer_event, + NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info), + }); + PeerEventStream::new(peer_events) } } @@ -449,8 +453,6 @@ struct NetworkInner { discv4: Option, /// The instance of the discv5 service discv5: Option, - /// Sender for basic peer lifecycle events. - peer_events_sender: EventSender, /// Sender for high level network events. event_sender: EventSender>>, /// The NAT resolver diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index cd9ff4161aa1..2e6e2f08b65c 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1052,11 +1052,15 @@ where } /// Handles session establishment and peer transactions initialization. - fn handle_peer_session(&mut self, info: SessionInfo, messages: PeerRequestSender) { + fn handle_peer_session( + &mut self, + info: SessionInfo, + messages: PeerRequestSender>, + ) { let SessionInfo { peer_id, client_version, version, .. } = info; // Insert a new peer into the peerset. - let peer = PeerMetadata::new( + let peer = PeerMetadata::::new( messages, version, client_version, From 40695c105743142d9f9684b11828c431cde254f0 Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:12:02 +0100 Subject: [PATCH 7/8] refactor: merge peer events streams --- crates/e2e-test-utils/src/network.rs | 1 + crates/net/network-api/src/events.rs | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/e2e-test-utils/src/network.rs b/crates/e2e-test-utils/src/network.rs index 516cb5fb52ca..ce9d0b94612b 100644 --- a/crates/e2e-test-utils/src/network.rs +++ b/crates/e2e-test-utils/src/network.rs @@ -6,6 +6,7 @@ use reth_network_api::{ use reth_network_peers::{NodeRecord, PeerId}; use reth_tokio_util::EventStream; use reth_tracing::tracing::info; + /// Helper for network operations #[derive(Debug)] pub struct NetworkTestContext { diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index b365cf49aff4..45389ce00f42 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -88,7 +88,6 @@ pub enum PeerEvent { } /// (Non-exhaustive) Network events representing peer lifecycle events and session requests. -/// sessions #[derive(Debug)] pub enum NetworkEvent { /// Basic peer lifecycle event. From 855fa72cb32c741cf1b98c6e79fbff297223327c Mon Sep 17 00:00:00 2001 From: lean-apple <78718413+lean-apple@users.noreply.github.com> Date: Wed, 4 Dec 2024 16:48:57 +0100 Subject: [PATCH 8/8] style: clean imports --- crates/net/network-api/src/events.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/net/network-api/src/events.rs b/crates/net/network-api/src/events.rs index 45389ce00f42..e17cedef11fc 100644 --- a/crates/net/network-api/src/events.rs +++ b/crates/net/network-api/src/events.rs @@ -11,7 +11,13 @@ use reth_network_p2p::error::{RequestError, RequestResult}; use reth_network_peers::PeerId; use reth_network_types::PeerAddr; use reth_tokio_util::EventStream; -use std::{fmt, net::SocketAddr, pin::Pin, sync::Arc}; +use std::{ + fmt, + net::SocketAddr, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt}; @@ -40,10 +46,7 @@ impl PeerEventStream { impl Stream for PeerEventStream { type Item = PeerEvent; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.0.as_mut().poll_next(cx) } }