Skip to content

Commit

Permalink
refactor: split NetworkEventListenerProvider (#12972)
Browse files Browse the repository at this point in the history
  • Loading branch information
lean-apple authored Dec 4, 2024
1 parent 0daa456 commit fbd2d6e
Show file tree
Hide file tree
Showing 15 changed files with 346 additions and 254 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/e2e-test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
reth-chainspec.workspace = true
reth-tracing.workspace = true
reth-db = { workspace = true, features = ["test-utils"] }
reth-network-api.workspace = true
reth-rpc-layer.workspace = true
reth-rpc-server-types.workspace = true
reth-rpc-eth-api.workspace = true
Expand All @@ -23,7 +24,6 @@ reth-payload-builder-primitives.workspace = true
reth-payload-primitives.workspace = true
reth-primitives.workspace = true
reth-provider.workspace = true
reth-network-api.workspace = true
reth-network.workspace = true
reth-node-api.workspace = true
reth-node-core.workspace = true
Expand Down
9 changes: 6 additions & 3 deletions crates/e2e-test-utils/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use futures_util::StreamExt;
use reth_network_api::{
test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider, PeersInfo,
events::PeerEvent, test_utils::PeersHandleProvider, NetworkEvent, NetworkEventListenerProvider,
PeersInfo,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_tokio_util::EventStream;
Expand Down Expand Up @@ -28,7 +29,7 @@ where
self.network.peers_handle().add_peer(node_record.id, node_record.tcp_addr());

match self.network_events.next().await {
Some(NetworkEvent::PeerAdded(_)) => (),
Some(NetworkEvent::Peer(PeerEvent::PeerAdded(_))) => (),
ev => panic!("Expected a peer added event, got: {ev:?}"),
}
}
Expand All @@ -42,7 +43,9 @@ where
pub async fn next_session_established(&mut self) -> Option<PeerId> {
while let Some(ev) = self.network_events.next().await {
match ev {
NetworkEvent::SessionEstablished { peer_id, .. } => {
NetworkEvent::ActivePeerSession { info, .. } |
NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
let peer_id = info.peer_id;
info!("Session established with peer: {:?}", peer_id);
return Some(peer_id)
}
Expand Down
154 changes: 102 additions & 52 deletions crates/net/network-api/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! API related to listening for network events.
use std::{fmt, net::SocketAddr, sync::Arc};

use reth_eth_wire_types::{
message::RequestPair, BlockBodies, BlockHeaders, Capabilities, DisconnectReason, EthMessage,
EthNetworkPrimitives, EthVersion, GetBlockBodies, GetBlockHeaders, GetNodeData,
Expand All @@ -13,26 +11,70 @@ use reth_network_p2p::error::{RequestError, RequestResult};
use reth_network_peers::PeerId;
use reth_network_types::PeerAddr;
use reth_tokio_util::EventStream;
use std::{
fmt,
net::SocketAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::{wrappers::UnboundedReceiverStream, Stream, StreamExt};

/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider: Send + Sync {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
/// A boxed stream of network peer events that provides a type-erased interface.
pub struct PeerEventStream(Pin<Box<dyn Stream<Item = PeerEvent> + Send + Sync>>);

impl fmt::Debug for PeerEventStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeerEventStream").finish_non_exhaustive()
}
}

impl PeerEventStream {
/// Create a new stream [`PeerEventStream`] by converting the provided stream's items into peer
/// events [`PeerEvent`]
pub fn new<S, T>(stream: S) -> Self
where
S: Stream<Item = T> + Send + Sync + 'static,
T: Into<PeerEvent> + 'static,
{
let mapped_stream = stream.map(Into::into);
Self(Box::pin(mapped_stream))
}
}

/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers.
impl Stream for PeerEventStream {
type Item = PeerEvent;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.as_mut().poll_next(cx)
}
}

/// Represents information about an established peer session.
#[derive(Debug, Clone)]
pub struct SessionInfo {
/// The identifier of the peer to which a session was established.
pub peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
pub remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
pub client_version: Arc<str>,
/// Capabilities the peer announced.
pub capabilities: Arc<Capabilities>,
/// The status of the peer to which a session was established.
pub status: Arc<Status>,
/// Negotiated eth version of the session.
pub version: EthVersion,
}

/// (Non-exhaustive) List of the different events emitted by the network that are of interest for
/// subscribers.
///
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
/// etc.
#[derive(Debug)]
pub enum NetworkEvent<R = PeerRequest> {
#[derive(Debug, Clone)]
pub enum PeerEvent {
/// Closed the peer session.
SessionClosed {
/// The identifier of the peer to which a session was closed.
Expand All @@ -41,57 +83,65 @@ pub enum NetworkEvent<R = PeerRequest> {
reason: Option<DisconnectReason>,
},
/// Established a new session with the given peer.
SessionEstablished {
/// The identifier of the peer to which a session was established.
peer_id: PeerId,
/// The remote addr of the peer to which a session was established.
remote_addr: SocketAddr,
/// The client version of the peer to which a session was established.
client_version: Arc<str>,
/// Capabilities the peer announced
capabilities: Arc<Capabilities>,
/// A request channel to the session task.
messages: PeerRequestSender<R>,
/// The status of the peer to which a session was established.
status: Arc<Status>,
/// negotiated eth version of the session
version: EthVersion,
},
SessionEstablished(SessionInfo),
/// Event emitted when a new peer is added
PeerAdded(PeerId),
/// Event emitted when a new peer is removed
PeerRemoved(PeerId),
}

/// (Non-exhaustive) Network events representing peer lifecycle events and session requests.
#[derive(Debug)]
pub enum NetworkEvent<R = PeerRequest> {
/// Basic peer lifecycle event.
Peer(PeerEvent),
/// Session established with requests.
ActivePeerSession {
/// Session information
info: SessionInfo,
/// A request channel to the session task.
messages: PeerRequestSender<R>,
},
}

impl<R> Clone for NetworkEvent<R> {
fn clone(&self) -> Self {
match self {
Self::SessionClosed { peer_id, reason } => {
Self::SessionClosed { peer_id: *peer_id, reason: *reason }
Self::Peer(event) => Self::Peer(event.clone()),
Self::ActivePeerSession { info, messages } => {
Self::ActivePeerSession { info: info.clone(), messages: messages.clone() }
}
Self::SessionEstablished {
peer_id,
remote_addr,
client_version,
capabilities,
messages,
status,
version,
} => Self::SessionEstablished {
peer_id: *peer_id,
remote_addr: *remote_addr,
client_version: client_version.clone(),
capabilities: capabilities.clone(),
messages: messages.clone(),
status: status.clone(),
version: *version,
},
Self::PeerAdded(peer) => Self::PeerAdded(*peer),
Self::PeerRemoved(peer) => Self::PeerRemoved(*peer),
}
}
}

impl<R> From<NetworkEvent<R>> for PeerEvent {
fn from(event: NetworkEvent<R>) -> Self {
match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => Self::SessionEstablished(info),
}
}
}

/// Provides peer event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkPeersEvents: Send + Sync {
/// Creates a new peer event listener stream.
fn peer_events(&self) -> PeerEventStream;
}

/// Provides event subscription for the network.
#[auto_impl::auto_impl(&, Arc)]
pub trait NetworkEventListenerProvider<R = PeerRequest>: NetworkPeersEvents {
/// Creates a new [`NetworkEvent`] listener channel.
fn event_listener(&self) -> EventStream<NetworkEvent<R>>;
/// Returns a new [`DiscoveryEvent`] stream.
///
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent>;
}

/// Events produced by the `Discovery` manager.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DiscoveryEvent {
Expand Down
21 changes: 13 additions & 8 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use reth_eth_wire::{
use reth_fs_util::{self as fs, FsPathError};
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_network_api::{
test_utils::PeersHandle, EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
events::{PeerEvent, SessionInfo},
test_utils::PeersHandle,
EthProtocolInfo, NetworkEvent, NetworkStatus, PeerInfo, PeerRequest,
};
use reth_network_peers::{NodeRecord, PeerId};
use reth_network_types::ReputationChangeKind;
Expand Down Expand Up @@ -712,24 +714,26 @@ impl<N: NetworkPrimitives> NetworkManager<N> {

self.update_active_connection_metrics();

self.event_sender.notify(NetworkEvent::SessionEstablished {
let session_info = SessionInfo {
peer_id,
remote_addr,
client_version,
capabilities,
version,
status,
messages,
});
version,
};

self.event_sender
.notify(NetworkEvent::ActivePeerSession { info: session_info, messages });
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
self.event_sender.notify(NetworkEvent::PeerAdded(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
self.event_sender.notify(NetworkEvent::PeerRemoved(peer_id));
self.event_sender.notify(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id)));
self.metrics.tracked_peers.set(self.swarm.state().peers().num_known_peers() as f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
Expand Down Expand Up @@ -772,7 +776,8 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
.saturating_sub(1)
as f64,
);
self.event_sender.notify(NetworkEvent::SessionClosed { peer_id, reason });
self.event_sender
.notify(NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, reason }));
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
trace!(
Expand Down
13 changes: 13 additions & 0 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
};
use alloy_primitives::B256;
use enr::Enr;
use futures::StreamExt;
use parking_lot::Mutex;
use reth_discv4::{Discv4, NatResolver};
use reth_discv5::Discv5;
Expand All @@ -13,6 +14,7 @@ use reth_eth_wire::{
};
use reth_ethereum_forks::Head;
use reth_network_api::{
events::{NetworkPeersEvents, PeerEvent, PeerEventStream},
test_utils::{PeersHandle, PeersHandleProvider},
BlockDownloaderProvider, DiscoveryEvent, NetworkError, NetworkEvent,
NetworkEventListenerProvider, NetworkInfo, NetworkStatus, PeerInfo, PeerRequest, Peers,
Expand Down Expand Up @@ -192,6 +194,17 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {

// === API Implementations ===

impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
/// Returns an event stream of peer-specific network events.
fn peer_events(&self) -> PeerEventStream {
let peer_events = self.inner.event_sender.new_listener().map(|event| match event {
NetworkEvent::Peer(peer_event) => peer_event,
NetworkEvent::ActivePeerSession { info, .. } => PeerEvent::SessionEstablished(info),
});
PeerEventStream::new(peer_events)
}
}

impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
self.inner.event_sender.new_listener()
Expand Down
Loading

0 comments on commit fbd2d6e

Please sign in to comment.