Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: split NetworkEventListenerProvider #12972

Merged
218 changes: 109 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/e2e-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
reth-chainspec.workspace = true
reth-tracing.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-network-api.workspace = true
reth-rpc-layer.workspace = true
reth-rpc-server-types.workspace = true
reth-rpc-eth-api.workspace = true
Expand All @@ -23,7 +24,6 @@ reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-network-api.workspace = true
reth-network.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
Expand Down
9 changes: 6 additions & 3 deletions crates/e2e-test-utils/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use futures_util::StreamExt;
use reth_network_api::{
test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, PeersInfo,
events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider,
PeersInfo,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_tokio_util::EventStream;
Expand Down Expand Up @@ -28,7 +29,7 @@ where
self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr());

match self.network_events.next().await {
Some(NetworkEvent::PeerAdded(_)) => (),
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (),
ev => panic!("Expected a peer added event, got: {ev:?}"),
}
}
Expand All @@ -42,7 +43,9 @@ where
pub async fn next_session_established(&mut self) -> Option<PeerId> {
while let Some(ev) = self.network_events.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => {
NetworkEvent::ActivePeerSession { info, .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
let peer_id = info.peer_id;
info!("Session established with peer: {:?}", peer_id);
return Some(peer_id)
}
Expand Down
154 changes: 102 additions & 52 deletions crates/net/network-api/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! API related to listening for network events.

use std::{fmt, net::SocketAddr, sync::Arc};

use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
Expand All @@ -13,26 +11,70 @@ use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_network_types::PeerAddr;
use reth_tokio_util::EventStream;
use std::{
fmt,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};

/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
/// A boxed stream of network peer events that provides a type-erased interface.
pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);

impl fmt::Debug for PeerEventStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerEventStream").finish_non_exhaustive()
}
}

impl PeerEventStream {
/// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer
/// events [`PeerEvent`]
pub fn new<S, T>(stream: S) -> Self
where
S: Stream<Item = T> + Send + Sync + 'static,
T: Into<PeerEvent> + 'static,
{
let mapped_stream = stream.map(Into::into);
Self(Box::pin(mapped_stream))
}
}

/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers.
impl Stream for PeerEventStream {
type Item = PeerEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.as_mut().poll_next(cx)
}
}

/// Represents information about an established peer session.
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// The identifier of the peer to which a session was established.
pub peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
pub remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
pub client_version: Arc<str>,
/// Capabilities the peer announced.
pub capabilities: Arc<Capabilities>,
/// The status of the peer to which a session was established.
pub status: Arc<Status>,
/// Negotiated eth version of the session.
pub version: EthVersion,
}

/// (Non-exhaustive) List of the different events emitted by the network that are of interest for
/// subscribers.
///
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
/// etc.
#[derive(Debug)]
pub enum NetworkEvent<R = PeerRequest> {
#[derive(Debug, Clone)]
pub enum PeerEvent {
/// Closed the peer session.
SessionClosed {
/// The identifier of the peer to which a session was closed.
Expand All @@ -41,57 +83,65 @@ pub enum NetworkEvent<R = PeerRequest> {
reason: Option<DisconnectReason>,
},
/// Established a new session with the given peer.
SessionEstablished {
/// The identifier of the peer to which a session was established.
peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
client_version: Arc<str>,
/// Capabilities the peer announced
capabilities: Arc<Capabilities>,
/// A request channel to the session task.
messages: PeerRequestSender<R>,
/// The status of the peer to which a session was established.
status: Arc<Status>,
/// negotiated eth version of the session
version: EthVersion,
},
SessionEstablished(SessionInfo),
/// Event emitted when a new peer is added
PeerAdded(PeerId),
/// Event emitted when a new peer is removed
PeerRemoved(PeerId),
}

/// (Non-exhaustive) Network events representing peer lifecycle events and session requests.
#[derive(Debug)]
pub enum NetworkEvent<R = PeerRequest> {
/// Basic peer lifecycle event.
Peer(PeerEvent),
/// Session established with requests.
ActivePeerSession {
/// Session information
info: SessionInfo,
/// A request channel to the session task.
messages: PeerRequestSender<R>,
},
}

impl<R> Clone for NetworkEvent<R> {
fn clone(&self) -> Self {
match self {
Self::SessionClosed { peer_id, reason } => {
Self::SessionClosed { peer_id: *peer_id, reason: *reason }
Self::Peer(event) => Self::Peer(event.clone()),
Self::ActivePeerSession { info, messages } => {
Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
}
Self::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => Self::SessionEstablished {
peer_id: *peer_id,
remote_addr: *remote_addr,
client_version: client_version.clone(),
capabilities: capabilities.clone(),
messages: messages.clone(),
status: status.clone(),
version: *version,
},
Self::PeerAdded(peer) => Self::PeerAdded(*peer),
Self::PeerRemoved(peer) => Self::PeerRemoved(*peer),
}
}
}

impl<R> From<NetworkEvent<R>> for PeerEvent {
fn from(event: NetworkEvent<R>) -> Self {
match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
}
}
}

/// Provides peer event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkPeersEvents: Send + Sync {
/// Creates a new peer event listener stream.
fn peer_events(&self) -> PeerEventStream;
}

/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider<R = PeerRequest>: NetworkPeersEvents {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent<R>>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
}

/// Events produced by the `Discovery` manager.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
Expand Down
21 changes: 13 additions & 8 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use reth_eth_wire::{
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
events::{PeerEvent, SessionInfo},
test_utils::PeersHandle,
EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
Expand Down Expand Up @@ -712,24 +714,26 @@ impl<N: NetworkPrimitives> NetworkManager<N> {

self.update_active_connection_metrics();

self.event_sender.notify(NetworkEvent::SessionEstablished {
let session_info = SessionInfo {
peer_id,
remote_addr,
client_version,
capabilities,
version,
status,
messages,
});
version,
};

self.event_sender
.notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_sender.notify(NetworkEvent::PeerAdded(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
Expand Down Expand Up @@ -772,7 +776,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.saturating_sub(1)
as f64,
);
self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason });
self.event_sender
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
trace!(
Expand Down
13 changes: 13 additions & 0 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
};
use alloy_primitives::B256;
use enr::Enr;
use futures::StreamExt;
use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
Expand All @@ -13,6 +14,7 @@ use reth_eth_wire::{
};
use reth_ethereum_forks::Head;
use reth_network_api::{
events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
Expand Down Expand Up @@ -192,6 +194,17 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {

// === API Implementations ===

impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
/// Returns an event stream of peer-specific network events.
fn peer_events(&self) -> PeerEventStream {
let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
});
PeerEventStream::new(peer_events)
}
}

impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
self.inner.event_sender.new_listener()
Expand Down
28 changes: 20 additions & 8 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use pin_project::pin_project;
use reth_chainspec::{Hardforks, MAINNET};
use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
};
Expand Down Expand Up @@ -641,7 +642,9 @@ impl NetworkEventStream {
pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)),
NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }) => {
return Some((peer_id, reason))
}
_ => continue,
}
}
Expand All @@ -652,7 +655,10 @@ impl NetworkEventStream {
pub async fn next_session_established(&mut self) -> Option<PeerId> {
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id),
NetworkEvent::ActivePeerSession { info, .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
return Some(info.peer_id)
}
_ => continue,
}
}
Expand All @@ -667,7 +673,7 @@ impl NetworkEventStream {
let mut peers = Vec::with_capacity(num);
while let Some(ev) = self.inner.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => {
NetworkEvent::ActivePeerSession { info: SessionInfo { peer_id, .. }, .. } => {
peers.push(peer_id);
num -= 1;
if num == 0 {
Expand All @@ -680,18 +686,24 @@ impl NetworkEventStream {
peers
}

/// Ensures that the first two events are a [`NetworkEvent::PeerAdded`] and
/// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established
/// Ensures that the first two events are a [`NetworkEvent::Peer(PeerEvent::PeerAdded`] and
/// [`NetworkEvent::ActivePeerSession`], returning the [`PeerId`] of the established
/// session.
pub async fn peer_added_and_established(&mut self) -> Option<PeerId> {
let peer_id = match self.inner.next().await {
Some(NetworkEvent::PeerAdded(peer_id)) => peer_id,
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id,
_ => return None,
};

match self.inner.next().await {
Some(NetworkEvent::SessionEstablished { peer_id: peer_id2, .. }) => {
debug_assert_eq!(peer_id, peer_id2, "PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}");
Some(NetworkEvent::ActivePeerSession {
info: SessionInfo { peer_id: peer_id2, .. },
..
}) => {
debug_assert_eq!(
peer_id, peer_id2,
"PeerAdded peer_id {peer_id} does not match SessionEstablished peer_id {peer_id2}"
);
Some(peer_id)
}
_ => None,
Expand Down
Loading
Loading