Skip to content

Commit

Permalink
feature: expand nym-node prometheus metrics (#5298)
Browse files Browse the repository at this point in the history
* fixed bearer auth for prometheus route

* basic prometheus metrics

* added rates on global values

* improved structure on the prometheus metrics

* added additional metrics for ingress websockets and egress mixnet connections

* some channel business metrics

* fixed metrics registration and added additional variants

* added counter for number of disk persisted packets

* counter for pending egress packets

* counter for pending egress forward packets

* clippy
  • Loading branch information
jstuczyn authored Dec 20, 2024
1 parent 4f283f5 commit 7d5e3ef
Show file tree
Hide file tree
Showing 38 changed files with 1,731 additions and 247 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion common/client-libs/mixnet-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dashmap = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tokio = { workspace = true, features = ["time", "sync"] }
tokio-util = { workspace = true, features = ["codec"], optional = true }
tokio-stream = { workspace = true }

# internal
nym-sphinx = { path = "../../nymsphinx" }
Expand Down
217 changes: 136 additions & 81 deletions common/client-libs/mixnet-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
// Copyright 2021 - Nym Technologies SA <contact@nymtech.net>
// Copyright 2021-2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0

use futures::channel::mpsc;
use dashmap::DashMap;
use futures::StreamExt;
use nym_sphinx::addressing::nodes::NymNodeRoutingAddress;
use nym_sphinx::framing::codec::NymCodec;
use nym_sphinx::framing::packet::FramedNymPacket;
use nym_sphinx::params::PacketType;
use nym_sphinx::NymPacket;
use std::collections::HashMap;
use std::io;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::ops::Deref;
use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::codec::Framed;
use tracing::*;

Expand Down Expand Up @@ -55,11 +58,37 @@ pub trait SendWithoutResponse {
}

pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
active_connections: ActiveConnections,
connections_count: Arc<AtomicUsize>,
config: Config,
}

struct ConnectionSender {
#[derive(Default, Clone)]
pub struct ActiveConnections {
inner: Arc<DashMap<NymNodeRoutingAddress, ConnectionSender>>,
}

impl ActiveConnections {
pub fn pending_packets(&self) -> usize {
self.inner
.iter()
.map(|sender| {
let max_capacity = sender.channel.max_capacity();
let capacity = sender.channel.capacity();
max_capacity - capacity
})
.sum()
}
}

impl Deref for ActiveConnections {
type Target = DashMap<NymNodeRoutingAddress, ConnectionSender>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}

pub struct ConnectionSender {
channel: mpsc::Sender<FramedNymPacket>,
current_reconnection_attempt: Arc<AtomicU32>,
}
Expand All @@ -73,62 +102,82 @@ impl ConnectionSender {
}
}

impl Client {
pub fn new(config: Config) -> Client {
Client {
conn_new: HashMap::new(),
config,
}
}
struct ManagedConnection {
address: SocketAddr,
message_receiver: ReceiverStream<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: Arc<AtomicU32>,
}

async fn manage_connection(
impl ManagedConnection {
fn new(
address: SocketAddr,
receiver: mpsc::Receiver<FramedNymPacket>,
message_receiver: mpsc::Receiver<FramedNymPacket>,
connection_timeout: Duration,
current_reconnection: &AtomicU32,
) {
current_reconnection: Arc<AtomicU32>,
) -> Self {
ManagedConnection {
address,
message_receiver: ReceiverStream::new(message_receiver),
connection_timeout,
current_reconnection,
}
}

async fn run(self) {
let address = self.address;
let connection_fut = TcpStream::connect(address);

let conn = match tokio::time::timeout(connection_timeout, connection_fut).await {
let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await {
Ok(stream_res) => match stream_res {
Ok(stream) => {
debug!("Managed to establish connection to {}", address);
debug!("Managed to establish connection to {}", self.address);
// if we managed to connect, reset the reconnection count (whatever it might have been)
current_reconnection.store(0, Ordering::Release);
self.current_reconnection.store(0, Ordering::Release);
Framed::new(stream, NymCodec)
}
Err(err) => {
debug!(
"failed to establish connection to {} (err: {})",
address, err
);
debug!("failed to establish connection to {address} (err: {err})",);
return;
}
},
Err(_) => {
debug!(
"failed to connect to {} within {:?}",
address, connection_timeout
"failed to connect to {address} within {:?}",
self.connection_timeout
);

// we failed to connect - increase reconnection attempt
current_reconnection.fetch_add(1, Ordering::SeqCst);
self.current_reconnection.fetch_add(1, Ordering::SeqCst);
return;
}
};

// Take whatever the receiver channel produces and put it on the connection.
// We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care
// about neither receiver nor the connection, it doesn't matter which one gets consumed
if let Err(err) = receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {} - {err}", address);
if let Err(err) = self.message_receiver.map(Ok).forward(conn).await {
warn!("Failed to forward packets to {address}: {err}");
}

debug!(
"connection manager to {} is finished. Either the connection failed or mixnet client got dropped",
address
"connection manager to {address} is finished. Either the connection failed or mixnet client got dropped",
);
}
}

impl Client {
pub fn new(config: Config, connections_count: Arc<AtomicUsize>) -> Client {
Client {
active_connections: Default::default(),
connections_count,
config,
}
}

pub fn active_connections(&self) -> ActiveConnections {
self.active_connections.clone()
}

/// If we're trying to reconnect, determine how long we should wait.
fn determine_backoff(&self, current_attempt: u32) -> Option<Duration> {
Expand All @@ -148,23 +197,24 @@ impl Client {
}

fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) {
let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);
let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size);

// this CAN'T fail because we just created the channel which has a non-zero capacity
if self.config.maximum_connection_buffer_size > 0 {
sender.try_send(pending_packet).unwrap();
}

// if we already tried to connect to `address` before, grab the current attempt count
let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.conn_new.insert(address, new_entry);
current_attempt
};
let current_reconnection_attempt =
if let Some(mut existing) = self.active_connections.get_mut(&address) {
existing.channel = sender;
Arc::clone(&existing.current_reconnection_attempt)
} else {
let new_entry = ConnectionSender::new(sender);
let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt);
self.active_connections.insert(address, new_entry);
current_attempt
};

// load the actual value.
let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire);
Expand All @@ -173,20 +223,24 @@ impl Client {
// copy the value before moving into another task
let initial_connection_timeout = self.config.initial_connection_timeout;

let connections_count = self.connections_count.clone();
tokio::spawn(async move {
// before executing the manager, wait for what was specified, if anything
if let Some(backoff) = backoff {
trace!("waiting for {:?} before attempting connection", backoff);
sleep(backoff).await;
}

Self::manage_connection(
connections_count.fetch_add(1, Ordering::SeqCst);
ManagedConnection::new(
address.into(),
receiver,
initial_connection_timeout,
&current_reconnection_attempt,
current_reconnection_attempt,
)
.await
.run()
.await;
connections_count.fetch_sub(1, Ordering::SeqCst);
});
}
}
Expand All @@ -201,49 +255,47 @@ impl SendWithoutResponse for Client {
trace!("Sending packet to {address:?}");
let framed_packet = FramedNymPacket::new(packet, packet_type);

if let Some(sender) = self.conn_new.get_mut(&address) {
if let Err(err) = sender.channel.try_send(framed_packet) {
if err.is_full() {
debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address);
let Some(sender) = self.active_connections.get_mut(&address) else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
));
};

let sending_res = sender.channel.try_send(framed_packet);
drop(sender);

sending_res.map_err(|err| {
match err {
TrySendError::Full(_) => {
debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet");
// it's not a 'big' error, but we did not manage to send the packet
// if the queue is full, we can't really do anything but to drop the packet
Err(io::Error::new(
io::Error::new(
io::ErrorKind::WouldBlock,
"connection queue is full",
))
} else if err.is_disconnected() {
)
}
TrySendError::Closed(dropped) => {
debug!(
"Connection to {} seems to be dead. attempting to re-establish it...",
address
"Connection to {address} seems to be dead. attempting to re-establish it...",
);

// it's not a 'big' error, but we did not manage to send the packet, but queue
// it up to send it as soon as the connection is re-established
self.make_connection(address, err.into_inner());
Err(io::Error::new(
self.make_connection(address, dropped);
io::Error::new(
io::ErrorKind::ConnectionAborted,
"reconnection attempt is in progress",
))
} else {
// this can't really happen, but let's safe-guard against it in case something changes in futures library
Err(io::Error::new(
io::ErrorKind::Other,
"unknown connection buffer error",
))
)
}
} else {
Ok(())
}
} else {
// there was never a connection to begin with
debug!("establishing initial connection to {}", address);
// it's not a 'big' error, but we did not manage to send the packet, but queue the packet
// for sending for as soon as the connection is created
self.make_connection(address, framed_packet);
Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection is in progress",
))
}
} )
}
}

Expand All @@ -252,12 +304,15 @@ mod tests {
use super::*;

fn dummy_client() -> Client {
Client::new(Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
})
Client::new(
Config {
initial_reconnection_backoff: Duration::from_millis(10_000),
maximum_reconnection_backoff: Duration::from_millis(300_000),
initial_connection_timeout: Duration::from_millis(1_500),
maximum_connection_buffer_size: 128,
},
Default::default(),
)
}

#[test]
Expand Down
8 changes: 8 additions & 0 deletions common/nonexhaustive-delayqueue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ impl<T> NonExhaustiveDelayQueue<T> {
pub fn remove(&mut self, key: &QueueKey) -> Expired<T> {
self.inner.remove(key)
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}

impl<T> Default for NonExhaustiveDelayQueue<T> {
Expand Down
2 changes: 1 addition & 1 deletion common/nym-metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ license.workspace = true

[dependencies]
prometheus = { workspace = true }
log = { workspace = true }
tracing = { workspace = true }
dashmap = { workspace = true }
lazy_static = { workspace = true }
Loading

0 comments on commit 7d5e3ef

Please sign in to comment.