Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split NetworkEventListenerProvider #12972

Merged
218 changes: 109 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/e2e-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
reth-chainspec.workspace = true
reth-tracing.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-network-api.workspace = true
reth-rpc-layer.workspace = true
reth-rpc-server-types.workspace = true
reth-rpc-eth-api.workspace = true
Expand All @@ -23,7 +24,6 @@ reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-network-api.workspace = true
reth-network.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
Expand Down
10 changes: 6 additions & 4 deletions crates/e2e-test-utils/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use futures_util::StreamExt;
use reth_network_api::{
test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, PeersInfo,
events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider,
PeersInfo,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_tokio_util::EventStream;
use reth_tracing::tracing::info;

/// Helper for network operations
#[derive(Debug)]
pub struct NetworkTestContext<Network> {
Expand All @@ -28,7 +28,7 @@ where
self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr());

match self.network_events.next().await {
Some(NetworkEvent::PeerAdded(_)) => (),
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (),
ev => panic!("Expected a peer added event, got: {ev:?}"),
}
}
Expand All @@ -42,7 +42,9 @@ where
pub async fn next_session_established(&mut self) -> Option<PeerId> {
while let Some(ev) = self.network_events.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => {
NetworkEvent::ActivePeerSession { info, .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
let peer_id = info.peer_id;
info!("Session established with peer: {:?}", peer_id);
return Some(peer_id)
}
Expand Down
152 changes: 100 additions & 52 deletions crates/net/network-api/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! API related to listening for network events.

use std::{fmt, net::SocketAddr, sync::Arc};

use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
Expand All @@ -13,26 +11,67 @@ use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_network_types::PeerAddr;
use reth_tokio_util::EventStream;
use std::{fmt, net::SocketAddr, pin::Pin, sync::Arc};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};

/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
/// A boxed stream of network peer events that provides a type-erased interface.
pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);

impl fmt::Debug for PeerEventStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerEventStream").finish_non_exhaustive()
}
}

/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers.
impl PeerEventStream {
/// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer
/// events [`PeerEvent`]
pub fn new<S, T>(stream: S) -> Self
where
S: Stream<Item = T> + Send + Sync + 'static,
T: Into<PeerEvent> + 'static,
{
let mapped_stream = stream.map(Into::into);
Self(Box::pin(mapped_stream))
}
}

impl Stream for PeerEventStream {
type Item = PeerEvent;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.as_mut().poll_next(cx)
}
}

/// Represents information about an established peer session.
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// The identifier of the peer to which a session was established.
pub peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
pub remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
pub client_version: Arc<str>,
/// Capabilities the peer announced.
pub capabilities: Arc<Capabilities>,
/// The status of the peer to which a session was established.
pub status: Arc<Status>,
/// Negotiated eth version of the session.
pub version: EthVersion,
}

/// (Non-exhaustive) List of the different events emitted by the network that are of interest for
/// subscribers.
///
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
/// etc.
#[derive(Debug)]
pub enum NetworkEvent<R = PeerRequest> {
#[derive(Debug, Clone)]
pub enum PeerEvent {
/// Closed the peer session.
SessionClosed {
/// The identifier of the peer to which a session was closed.
Expand All @@ -41,57 +80,66 @@ pub enum NetworkEvent<R = PeerRequest> {
reason: Option<DisconnectReason>,
},
/// Established a new session with the given peer.
SessionEstablished {
/// The identifier of the peer to which a session was established.
peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
client_version: Arc<str>,
/// Capabilities the peer announced
capabilities: Arc<Capabilities>,
/// A request channel to the session task.
messages: PeerRequestSender<R>,
/// The status of the peer to which a session was established.
status: Arc<Status>,
/// negotiated eth version of the session
version: EthVersion,
},
SessionEstablished(SessionInfo),
/// Event emitted when a new peer is added
PeerAdded(PeerId),
/// Event emitted when a new peer is removed
PeerRemoved(PeerId),
}

/// (Non-exhaustive) Network events representing peer lifecycle events and session requests.
/// sessions
#[derive(Debug)]
pub enum NetworkEvent<R = PeerRequest> {
/// Basic peer lifecycle event.
Peer(PeerEvent),
/// Session established with requests.
ActivePeerSession {
/// Session information
info: SessionInfo,
/// A request channel to the session task.
messages: PeerRequestSender<R>,
},
}

impl<R> Clone for NetworkEvent<R> {
fn clone(&self) -> Self {
match self {
Self::SessionClosed { peer_id, reason } => {
Self::SessionClosed { peer_id: *peer_id, reason: *reason }
Self::Peer(event) => Self::Peer(event.clone()),
Self::ActivePeerSession { info, messages } => {
Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
}
Self::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => Self::SessionEstablished {
peer_id: *peer_id,
remote_addr: *remote_addr,
client_version: client_version.clone(),
capabilities: capabilities.clone(),
messages: messages.clone(),
status: status.clone(),
version: *version,
},
Self::PeerAdded(peer) => Self::PeerAdded(*peer),
Self::PeerRemoved(peer) => Self::PeerRemoved(*peer),
}
}
}

impl<R> From<NetworkEvent<R>> for PeerEvent {
fn from(event: NetworkEvent<R>) -> Self {
match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
}
}
}

/// Provides peer event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkPeersEvents: Send + Sync {
/// Creates a new peer event listener stream.
fn peer_events(&self) -> PeerEventStream;
}

/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider<R = PeerRequest>: NetworkPeersEvents {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent<R>>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
}

/// Events produced by the `Discovery` manager.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
Expand Down
21 changes: 13 additions & 8 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use reth_eth_wire::{
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
events::{PeerEvent, SessionInfo},
test_utils::PeersHandle,
EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
Expand Down Expand Up @@ -712,24 +714,26 @@ impl<N: NetworkPrimitives> NetworkManager<N> {

self.update_active_connection_metrics();

self.event_sender.notify(NetworkEvent::SessionEstablished {
let session_info = SessionInfo {
peer_id,
remote_addr,
client_version,
capabilities,
version,
status,
messages,
});
version,
};

self.event_sender
.notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_sender.notify(NetworkEvent::PeerAdded(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
Expand Down Expand Up @@ -772,7 +776,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.saturating_sub(1)
as f64,
);
self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason });
self.event_sender
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
trace!(
Expand Down
13 changes: 13 additions & 0 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
};
use alloy_primitives::B256;
use enr::Enr;
use futures::StreamExt;
use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
Expand All @@ -13,6 +14,7 @@ use reth_eth_wire::{
};
use reth_ethereum_forks::Head;
use reth_network_api::{
events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
Expand Down Expand Up @@ -192,6 +194,17 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {

// === API Implementations ===

impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
/// Returns an event stream of peer-specific network events.
fn peer_events(&self) -> PeerEventStream {
let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
});
PeerEventStream::new(peer_events)
}
}

impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
self.inner.event_sender.new_listener()
Expand Down
Loading
Loading