Skip to content

Commit

Permalink
Fix peer count metrics (#5404)
Browse files Browse the repository at this point in the history
* Set the peers_per_client metrics directly, rather than using increment/decrement

* Move PEERS_CONNECTED related update to the same place

* Move PEERS_CONNECTED_MULTI related update to the same place

* Rename

* Remove unused variables
  • Loading branch information
ackintosh committed Mar 20, 2024
1 parent aa59285 commit 7117772
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 68 deletions.
84 changes: 68 additions & 16 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ pub use libp2p::identity::Keypair;
#[allow(clippy::mutable_key_type)] // PeerId in hashmaps are no longer permitted by clippy
pub mod peerdb;

use crate::peer_manager::peerdb::client::ClientKind;
use libp2p::multiaddr;
pub use peerdb::peer_info::{
ConnectionDirection, PeerConnectionStatus, PeerConnectionStatus::*, PeerInfo,
};
use peerdb::score::{PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{hash_map::Entry, HashMap};
use std::net::IpAddr;
use strum::IntoEnumIterator;

pub mod config;
mod network_behaviour;

Expand Down Expand Up @@ -464,19 +468,6 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
"observed_address" => ?info.observed_addr,
"protocols" => ?info.protocols
);

// update the peer client kind metric if the peer is connected
if matches!(
peer_info.connection_status(),
PeerConnectionStatus::Connected { .. }
| PeerConnectionStatus::Disconnecting { .. }
) {
metrics::inc_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[peer_info.client().kind.as_ref()],
);
metrics::dec_gauge_vec(&metrics::PEERS_PER_CLIENT, &[previous_kind.as_ref()]);
}
}
} else {
error!(self.log, "Received an Identify response from an unknown peer"; "peer_id" => peer_id.to_string());
Expand Down Expand Up @@ -812,11 +803,8 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// start a ping and status timer for the peer
self.status_peers.insert(*peer_id);

let connected_peers = self.network_globals.connected_peers() as i64;

// increment prometheus metrics
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);
metrics::set_gauge(&metrics::PEERS_CONNECTED, connected_peers);

true
}
Expand Down Expand Up @@ -1267,6 +1255,70 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
);
}
}

// Update peer count related metrics.
fn update_peer_count_metrics(&self) {
let mut peers_connected = 0;
let mut clients_per_peer = HashMap::new();
let mut peers_connected_mutli: HashMap<(&str, &str), i32> = HashMap::new();

for (_, peer_info) in self.network_globals.peers.read().connected_peers() {
peers_connected += 1;

*clients_per_peer
.entry(peer_info.client().kind.to_string())
.or_default() += 1;

let direction = match peer_info.connection_direction() {
Some(ConnectionDirection::Incoming) => "inbound",
Some(ConnectionDirection::Outgoing) => "outbound",
None => "none",
};
// Note: the `transport` is set to `unknown` if the `listening_addresses` list is empty.
// This situation occurs when the peer is initially registered in PeerDB, but the peer
// info has not yet been updated at `PeerManager::identify`.
let transport = peer_info
.listening_addresses()
.iter()
.find_map(|addr| {
addr.iter().find_map(|proto| match proto {
multiaddr::Protocol::QuicV1 => Some("quic"),
multiaddr::Protocol::Tcp(_) => Some("tcp"),
_ => None,
})
})
.unwrap_or("unknown");
*peers_connected_mutli
.entry((direction, transport))
.or_default() += 1;
}

// PEERS_CONNECTED
metrics::set_gauge(&metrics::PEERS_CONNECTED, peers_connected);

// PEERS_PER_CLIENT
for client_kind in ClientKind::iter() {
let value = clients_per_peer.get(&client_kind.to_string()).unwrap_or(&0);
metrics::set_gauge_vec(
&metrics::PEERS_PER_CLIENT,
&[client_kind.as_ref()],
*value as i64,
);
}

// PEERS_CONNECTED_MULTI
for direction in ["inbound", "outbound", "none"] {
for transport in ["quic", "tcp", "unknown"] {
metrics::set_gauge_vec(
&metrics::PEERS_CONNECTED_MULTI,
&[direction, transport],
*peers_connected_mutli
.get(&(direction, transport))
.unwrap_or(&0) as i64,
);
}
}
}
}

enum ConnectingType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::net::IpAddr;
use std::task::{Context, Poll};

use futures::StreamExt;
use libp2p::core::{multiaddr, ConnectedPoint};
use libp2p::core::ConnectedPoint;
use libp2p::identity::PeerId;
use libp2p::swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
Expand Down Expand Up @@ -243,35 +243,11 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
self.events.push(PeerManagerEvent::MetaData(peer_id));
}

// increment prometheus metrics
// Update the prometheus metrics
if self.metrics_enabled {
let remote_addr = endpoint.get_remote_address();
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::inc_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
Some(_) => unreachable!(),
None => {
error!(self.log, "Connection established via unknown transport"; "addr" => %remote_addr)
}
};

metrics::inc_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_CONNECT_EVENT_COUNT);

self.update_peer_count_metrics();
}

// Count dialing peers in the limit if the peer dialed us.
Expand Down Expand Up @@ -309,7 +285,7 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
fn on_connection_closed(
&mut self,
peer_id: PeerId,
endpoint: &ConnectedPoint,
_endpoint: &ConnectedPoint,
remaining_established: usize,
) {
if remaining_established > 0 {
Expand Down Expand Up @@ -337,33 +313,12 @@ impl<TSpec: EthSpec> PeerManager<TSpec> {
// reference so that peer manager can track this peer.
self.inject_disconnect(&peer_id);

let remote_addr = endpoint.get_remote_address();
// Update the prometheus metrics
if self.metrics_enabled {
let direction = if endpoint.is_dialer() {
"outbound"
} else {
"inbound"
};

match remote_addr.iter().find(|proto| {
matches!(
proto,
multiaddr::Protocol::QuicV1 | multiaddr::Protocol::Tcp(_)
)
}) {
Some(multiaddr::Protocol::QuicV1) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "quic"]);
}
Some(multiaddr::Protocol::Tcp(_)) => {
metrics::dec_gauge_vec(&metrics::PEERS_CONNECTED_MULTI, &[direction, "tcp"]);
}
// If it's an unknown protocol we already logged when connection was established.
_ => {}
};
// Legacy standard metrics.
metrics::dec_gauge(&metrics::PEERS_CONNECTED);
metrics::inc_counter(&metrics::PEER_DISCONNECT_EVENT_COUNT);

self.update_peer_count_metrics();
}
}

Expand Down

0 comments on commit 7117772

Please sign in to comment.