diff --git a/Cargo.lock b/Cargo.lock index 6e8b6602d6e..df18e606368 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5655,18 +5655,20 @@ version = "0.1.0" dependencies = [ "dashmap", "lazy_static", - "log", "prometheus", + "tracing", ] [[package]] name = "nym-mixnet-client" version = "0.1.0" dependencies = [ + "dashmap", "futures", "nym-sphinx", "nym-task", "tokio", + "tokio-stream", "tokio-util", "tracing", ] diff --git a/common/client-libs/mixnet-client/Cargo.toml b/common/client-libs/mixnet-client/Cargo.toml index 25dd62f7023..b240aab5131 100644 --- a/common/client-libs/mixnet-client/Cargo.toml +++ b/common/client-libs/mixnet-client/Cargo.toml @@ -8,10 +8,12 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dashmap = { workspace = true } futures = { workspace = true } tracing = { workspace = true } -tokio = { workspace = true, features = ["time"] } +tokio = { workspace = true, features = ["time", "sync"] } tokio-util = { workspace = true, features = ["codec"], optional = true } +tokio-stream = { workspace = true } # internal nym-sphinx = { path = "../../nymsphinx" } diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index b8eebcdcc58..94e32f84041 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -1,21 +1,24 @@ -// Copyright 2021 - Nym Technologies SA +// Copyright 2021-2024 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use futures::channel::mpsc; +use dashmap::DashMap; use futures::StreamExt; use nym_sphinx::addressing::nodes::NymNodeRoutingAddress; use nym_sphinx::framing::codec::NymCodec; use nym_sphinx::framing::packet::FramedNymPacket; use nym_sphinx::params::PacketType; use nym_sphinx::NymPacket; -use std::collections::HashMap; use std::io; use std::net::SocketAddr; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::ops::Deref; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio::time::sleep; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::codec::Framed; use tracing::*; @@ -55,11 +58,37 @@ pub trait SendWithoutResponse { } pub struct Client { - conn_new: HashMap, + active_connections: ActiveConnections, + connections_count: Arc, config: Config, } -struct ConnectionSender { +#[derive(Default, Clone)] +pub struct ActiveConnections { + inner: Arc>, +} + +impl ActiveConnections { + pub fn pending_packets(&self) -> usize { + self.inner + .iter() + .map(|sender| { + let max_capacity = sender.channel.max_capacity(); + let capacity = sender.channel.capacity(); + max_capacity - capacity + }) + .sum() + } +} + +impl Deref for ActiveConnections { + type Target = DashMap; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +pub struct ConnectionSender { channel: mpsc::Sender, current_reconnection_attempt: Arc, } @@ -73,46 +102,53 @@ impl ConnectionSender { } } -impl Client { - pub fn new(config: Config) -> Client { - Client { - conn_new: HashMap::new(), - config, - } - } +struct ManagedConnection { + address: SocketAddr, + message_receiver: ReceiverStream, + connection_timeout: Duration, + current_reconnection: Arc, +} - async fn manage_connection( +impl ManagedConnection { + fn new( address: SocketAddr, - receiver: mpsc::Receiver, + message_receiver: mpsc::Receiver, connection_timeout: Duration, - current_reconnection: &AtomicU32, - ) { + current_reconnection: Arc, + ) -> Self { + ManagedConnection { + address, + message_receiver: ReceiverStream::new(message_receiver), + connection_timeout, + current_reconnection, + } + } + + async fn run(self) { + let address = self.address; let connection_fut = TcpStream::connect(address); - let conn = match tokio::time::timeout(connection_timeout, connection_fut).await { + let conn = match tokio::time::timeout(self.connection_timeout, connection_fut).await { Ok(stream_res) => match stream_res { Ok(stream) => { - debug!("Managed to establish connection to {}", address); + debug!("Managed to establish connection to {}", self.address); // if we managed to connect, reset the reconnection count (whatever it might have been) - current_reconnection.store(0, Ordering::Release); + self.current_reconnection.store(0, Ordering::Release); Framed::new(stream, NymCodec) } Err(err) => { - debug!( - "failed to establish connection to {} (err: {})", - address, err - ); + debug!("failed to establish connection to {address} (err: {err})",); return; } }, Err(_) => { debug!( - "failed to connect to {} within {:?}", - address, connection_timeout + "failed to connect to {address} within {:?}", + self.connection_timeout ); // we failed to connect - increase reconnection attempt - current_reconnection.fetch_add(1, Ordering::SeqCst); + self.current_reconnection.fetch_add(1, Ordering::SeqCst); return; } }; @@ -120,15 +156,28 @@ impl Client { // Take whatever the receiver channel produces and put it on the connection. // We could have as well used conn.send_all(receiver.map(Ok)), but considering we don't care // about neither receiver nor the connection, it doesn't matter which one gets consumed - if let Err(err) = receiver.map(Ok).forward(conn).await { - warn!("Failed to forward packets to {} - {err}", address); + if let Err(err) = self.message_receiver.map(Ok).forward(conn).await { + warn!("Failed to forward packets to {address}: {err}"); } debug!( - "connection manager to {} is finished. Either the connection failed or mixnet client got dropped", - address + "connection manager to {address} is finished. Either the connection failed or mixnet client got dropped", ); } +} + +impl Client { + pub fn new(config: Config, connections_count: Arc) -> Client { + Client { + active_connections: Default::default(), + connections_count, + config, + } + } + + pub fn active_connections(&self) -> ActiveConnections { + self.active_connections.clone() + } /// If we're trying to reconnect, determine how long we should wait. fn determine_backoff(&self, current_attempt: u32) -> Option { @@ -148,7 +197,7 @@ impl Client { } fn make_connection(&mut self, address: NymNodeRoutingAddress, pending_packet: FramedNymPacket) { - let (mut sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size); + let (sender, receiver) = mpsc::channel(self.config.maximum_connection_buffer_size); // this CAN'T fail because we just created the channel which has a non-zero capacity if self.config.maximum_connection_buffer_size > 0 { @@ -156,15 +205,16 @@ impl Client { } // if we already tried to connect to `address` before, grab the current attempt count - let current_reconnection_attempt = if let Some(existing) = self.conn_new.get_mut(&address) { - existing.channel = sender; - Arc::clone(&existing.current_reconnection_attempt) - } else { - let new_entry = ConnectionSender::new(sender); - let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt); - self.conn_new.insert(address, new_entry); - current_attempt - }; + let current_reconnection_attempt = + if let Some(mut existing) = self.active_connections.get_mut(&address) { + existing.channel = sender; + Arc::clone(&existing.current_reconnection_attempt) + } else { + let new_entry = ConnectionSender::new(sender); + let current_attempt = Arc::clone(&new_entry.current_reconnection_attempt); + self.active_connections.insert(address, new_entry); + current_attempt + }; // load the actual value. let reconnection_attempt = current_reconnection_attempt.load(Ordering::Acquire); @@ -173,6 +223,7 @@ impl Client { // copy the value before moving into another task let initial_connection_timeout = self.config.initial_connection_timeout; + let connections_count = self.connections_count.clone(); tokio::spawn(async move { // before executing the manager, wait for what was specified, if anything if let Some(backoff) = backoff { @@ -180,13 +231,16 @@ impl Client { sleep(backoff).await; } - Self::manage_connection( + connections_count.fetch_add(1, Ordering::SeqCst); + ManagedConnection::new( address.into(), receiver, initial_connection_timeout, - ¤t_reconnection_attempt, + current_reconnection_attempt, ) - .await + .run() + .await; + connections_count.fetch_sub(1, Ordering::SeqCst); }); } } @@ -201,49 +255,47 @@ impl SendWithoutResponse for Client { trace!("Sending packet to {address:?}"); let framed_packet = FramedNymPacket::new(packet, packet_type); - if let Some(sender) = self.conn_new.get_mut(&address) { - if let Err(err) = sender.channel.try_send(framed_packet) { - if err.is_full() { - debug!("Connection to {} seems to not be able to handle all the traffic - dropping the current packet", address); + let Some(sender) = self.active_connections.get_mut(&address) else { + // there was never a connection to begin with + debug!("establishing initial connection to {}", address); + // it's not a 'big' error, but we did not manage to send the packet, but queue the packet + // for sending for as soon as the connection is created + self.make_connection(address, framed_packet); + return Err(io::Error::new( + io::ErrorKind::NotConnected, + "connection is in progress", + )); + }; + + let sending_res = sender.channel.try_send(framed_packet); + drop(sender); + + sending_res.map_err(|err| { + match err { + TrySendError::Full(_) => { + debug!("Connection to {address} seems to not be able to handle all the traffic - dropping the current packet"); // it's not a 'big' error, but we did not manage to send the packet // if the queue is full, we can't really do anything but to drop the packet - Err(io::Error::new( + io::Error::new( io::ErrorKind::WouldBlock, "connection queue is full", - )) - } else if err.is_disconnected() { + ) + } + TrySendError::Closed(dropped) => { debug!( - "Connection to {} seems to be dead. attempting to re-establish it...", - address + "Connection to {address} seems to be dead. attempting to re-establish it...", ); + // it's not a 'big' error, but we did not manage to send the packet, but queue // it up to send it as soon as the connection is re-established - self.make_connection(address, err.into_inner()); - Err(io::Error::new( + self.make_connection(address, dropped); + io::Error::new( io::ErrorKind::ConnectionAborted, "reconnection attempt is in progress", - )) - } else { - // this can't really happen, but let's safe-guard against it in case something changes in futures library - Err(io::Error::new( - io::ErrorKind::Other, - "unknown connection buffer error", - )) + ) } - } else { - Ok(()) } - } else { - // there was never a connection to begin with - debug!("establishing initial connection to {}", address); - // it's not a 'big' error, but we did not manage to send the packet, but queue the packet - // for sending for as soon as the connection is created - self.make_connection(address, framed_packet); - Err(io::Error::new( - io::ErrorKind::NotConnected, - "connection is in progress", - )) - } + } ) } } @@ -252,12 +304,15 @@ mod tests { use super::*; fn dummy_client() -> Client { - Client::new(Config { - initial_reconnection_backoff: Duration::from_millis(10_000), - maximum_reconnection_backoff: Duration::from_millis(300_000), - initial_connection_timeout: Duration::from_millis(1_500), - maximum_connection_buffer_size: 128, - }) + Client::new( + Config { + initial_reconnection_backoff: Duration::from_millis(10_000), + maximum_reconnection_backoff: Duration::from_millis(300_000), + initial_connection_timeout: Duration::from_millis(1_500), + maximum_connection_buffer_size: 128, + }, + Default::default(), + ) } #[test] diff --git a/common/nonexhaustive-delayqueue/src/lib.rs b/common/nonexhaustive-delayqueue/src/lib.rs index ee8c9f214c8..6c4853f6758 100644 --- a/common/nonexhaustive-delayqueue/src/lib.rs +++ b/common/nonexhaustive-delayqueue/src/lib.rs @@ -65,6 +65,14 @@ impl NonExhaustiveDelayQueue { pub fn remove(&mut self, key: &QueueKey) -> Expired { self.inner.remove(key) } + + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } } impl Default for NonExhaustiveDelayQueue { diff --git a/common/nym-metrics/Cargo.toml b/common/nym-metrics/Cargo.toml index 4310ae28f66..1ac3bce16af 100644 --- a/common/nym-metrics/Cargo.toml +++ b/common/nym-metrics/Cargo.toml @@ -12,6 +12,6 @@ license.workspace = true [dependencies] prometheus = { workspace = true } -log = { workspace = true } +tracing = { workspace = true } dashmap = { workspace = true } lazy_static = { workspace = true } diff --git a/common/nym-metrics/src/lib.rs b/common/nym-metrics/src/lib.rs index 565d62cd5fc..7c6373ccc78 100644 --- a/common/nym-metrics/src/lib.rs +++ b/common/nym-metrics/src/lib.rs @@ -1,14 +1,18 @@ use dashmap::DashMap; -pub use log::error; -use log::{debug, warn}; use std::fmt; -pub use std::time::Instant; +use tracing::{debug, error, warn}; + +use prometheus::{ + core::Collector, Encoder as _, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Registry, + TextEncoder, +}; -use prometheus::{core::Collector, Encoder as _, IntCounter, IntGauge, Registry, TextEncoder}; +pub use prometheus::HistogramTimer; +pub use std::time::Instant; #[macro_export] macro_rules! prepend_package_name { - ($name: literal) => { + ($name: tt) => { &format!( "{}_{}", std::module_path!() @@ -23,15 +27,29 @@ macro_rules! prepend_package_name { #[macro_export] macro_rules! inc_by { + ($name:literal, $x:expr, $help: expr) => { + $crate::REGISTRY.maybe_register_and_inc_by( + $crate::prepend_package_name!($name), + $x as i64, + $help, + ); + }; ($name:literal, $x:expr) => { - $crate::REGISTRY.inc_by($crate::prepend_package_name!($name), $x as i64); + $crate::REGISTRY.maybe_register_and_inc_by( + $crate::prepend_package_name!($name), + $x as i64, + None, + ); }; } #[macro_export] macro_rules! inc { + ($name:literal, $help: expr) => { + $crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), $help); + }; ($name:literal) => { - $crate::REGISTRY.inc($crate::prepend_package_name!($name)); + $crate::REGISTRY.maybe_register_and_inc($crate::prepend_package_name!($name), None); }; } @@ -42,6 +60,71 @@ macro_rules! metrics { }; } +#[macro_export] +macro_rules! set_metric { + ($name:literal, $x:expr, $help: expr) => { + $crate::REGISTRY.maybe_register_and_set( + $crate::prepend_package_name!($name), + $x as i64, + $help, + ); + }; + ($name:literal, $x:expr) => { + $crate::REGISTRY.maybe_register_and_set( + $crate::prepend_package_name!($name), + $x as i64, + None, + ); + }; +} + +#[macro_export] +macro_rules! set_metric_float { + ($name:literal, $x:expr, $help: expr) => { + $crate::REGISTRY.maybe_register_and_set_float( + $crate::prepend_package_name!($name), + $x as f64, + $help, + ); + }; + ($name:literal, $x:expr) => { + $crate::REGISTRY.maybe_register_and_set_float( + $crate::prepend_package_name!($name), + $x as f64, + None, + ); + }; +} + +#[macro_export] +macro_rules! add_histogram_obs { + ($name:expr, $x:expr, $b:expr, $help:expr) => { + $crate::REGISTRY.maybe_register_and_add_to_histogram( + $crate::prepend_package_name!($name), + $x as f64, + Some($b), + $help, + ); + }; + + ($name:expr, $x:expr, $b:expr) => { + $crate::REGISTRY.maybe_register_and_add_to_histogram( + $crate::prepend_package_name!($name), + $x as f64, + Some($b), + None, + ); + }; + ($name:expr, $x:expr) => { + $crate::REGISTRY.maybe_register_and_add_to_histogram( + $crate::prepend_package_name!($name), + $x as f64, + None, + None, + ); + }; +} + #[macro_export] macro_rules! nanos { ( $name:literal, $x:expr ) => {{ @@ -50,7 +133,7 @@ macro_rules! nanos { let r = $x; let duration = start.elapsed().as_nanos() as i64; let name = $crate::prepend_package_name!($name); - $crate::REGISTRY.inc_by(&format!("{}_nanos", $name), duration); + $crate::REGISTRY.maybe_register_and_inc_by(&format!("{}_nanos", $name), duration, None); r }}; } @@ -59,15 +142,100 @@ lazy_static::lazy_static! { pub static ref REGISTRY: MetricsController = MetricsController::default(); } +pub fn metrics_registry() -> &'static MetricsController { + ®ISTRY +} + #[derive(Default)] pub struct MetricsController { registry: Registry, registry_index: DashMap, } -enum Metric { - C(Box), - G(Box), +pub enum Metric { + IntCounter(Box), + IntGauge(Box), + FloatGauge(Box), + Histogram(Box), +} + +impl Metric { + pub fn new_int_counter(name: &str, help: &str) -> Option { + match IntCounter::new(sanitize_metric_name(name), help) { + Ok(c) => Some(c.into()), + Err(err) => { + error!("Failed to create counter {name:?}: {err}"); + None + } + } + } + + pub fn new_int_gauge(name: &str, help: &str) -> Option { + match IntGauge::new(sanitize_metric_name(name), help) { + Ok(g) => Some(g.into()), + Err(err) => { + error!("Failed to create gauge {name:?}: {err}"); + None + } + } + } + + pub fn new_float_gauge(name: &str, help: &str) -> Option { + match Gauge::new(sanitize_metric_name(name), help) { + Ok(g) => Some(g.into()), + Err(err) => { + error!("Failed to create gauge {name:?}: {err}"); + None + } + } + } + + pub fn new_histogram(name: &str, help: &str, buckets: Option<&[f64]>) -> Option { + let mut opts = HistogramOpts::new(sanitize_metric_name(name), help); + if let Some(buckets) = buckets { + opts = opts.buckets(buckets.to_vec()) + } + match Histogram::with_opts(opts) { + Ok(h) => Some(Metric::Histogram(Box::new(h))), + Err(err) => { + error!("failed to create histogram {name:?}: {err}"); + None + } + } + } + + fn as_collector(&self) -> Box { + match self { + Metric::IntCounter(c) => c.clone(), + Metric::IntGauge(g) => g.clone(), + Metric::FloatGauge(g) => g.clone(), + Metric::Histogram(h) => h.clone(), + } + } +} + +impl From for Metric { + fn from(v: IntCounter) -> Self { + Metric::IntCounter(Box::new(v)) + } +} + +impl From for Metric { + fn from(v: IntGauge) -> Self { + Metric::IntGauge(Box::new(v)) + } +} + +impl From for Metric { + fn from(v: Gauge) -> Self { + Metric::FloatGauge(Box::new(v)) + } +} + +impl From for Metric { + fn from(v: Histogram) -> Self { + Metric::Histogram(Box::new(v)) + } } fn fq_name(c: &dyn Collector) -> String { @@ -81,34 +249,92 @@ impl Metric { #[inline(always)] fn fq_name(&self) -> String { match self { - Metric::C(c) => fq_name(c.as_ref()), - Metric::G(g) => fq_name(g.as_ref()), + Metric::IntCounter(c) => fq_name(c.as_ref()), + Metric::IntGauge(g) => fq_name(g.as_ref()), + Metric::FloatGauge(g) => fq_name(g.as_ref()), + Metric::Histogram(h) => fq_name(h.as_ref()), } } #[inline(always)] fn inc(&self) { match self { - Metric::C(c) => c.inc(), - Metric::G(g) => g.inc(), + Metric::IntCounter(c) => c.inc(), + Metric::IntGauge(g) => g.inc(), + Metric::FloatGauge(g) => g.inc(), + Metric::Histogram(_) => { + warn!("invalid operation: attempted to call increment on a histogram") + } } } #[inline(always)] fn inc_by(&self, value: i64) { match self { - Metric::C(c) => c.inc_by(value as u64), - Metric::G(g) => g.add(value), + Metric::IntCounter(c) => c.inc_by(value as u64), + Metric::IntGauge(g) => g.add(value), + Metric::FloatGauge(g) => { + warn!("attempted to increment a float gauge ('{}') by an integer - this is most likely a bug", self.fq_name()); + g.add(value as f64) + } + Metric::Histogram(_) => { + warn!("invalid operation: attempted to call increment on a histogram") + } } } #[inline(always)] fn set(&self, value: i64) { match self { - Metric::C(_c) => { + Metric::IntCounter(_c) => { + warn!("Cannot set value for counter {:?}", self.fq_name()); + } + Metric::IntGauge(g) => g.set(value), + Metric::FloatGauge(g) => { + warn!("attempted to set a float gauge ('{}') to an integer value - this is most likely a bug", self.fq_name()); + g.set(value as f64) + } + Metric::Histogram(_) => { + warn!("invalid operation: attempted to call set on a histogram") + } + } + } + + #[inline(always)] + fn set_float(&self, value: f64) { + match self { + Metric::IntCounter(_c) => { warn!("Cannot set value for counter {:?}", self.fq_name()); } - Metric::G(g) => g.set(value), + Metric::IntGauge(g) => { + warn!("attempted to set a integer gauge ('{}') to a float value - this is most likely a bug", self.fq_name()); + g.set(value as i64) + } + Metric::FloatGauge(g) => g.set(value), + Metric::Histogram(_) => { + warn!("invalid operation: attempted to call increment on a histogram") + } + } + } + + #[inline(always)] + fn add_histogram_observation(&self, value: f64) { + match self { + Metric::Histogram(h) => { + h.observe(value); + } + _ => warn!("attempted to add histogram observation on a non-histogram metric"), + } + } + + #[inline(always)] + fn start_timer(&self) -> Option { + match self { + Metric::Histogram(h) => Some(h.start_timer()), + _ => { + warn!("attempted to start histogram observation on a non-histogram metric"); + None + } } } } @@ -145,93 +371,165 @@ impl MetricsController { } } - pub fn set(&self, name: &str, value: i64) { + pub fn register_int_gauge<'a>(&self, name: &str, help: impl Into>) { + let Some(metric) = Metric::new_int_gauge(name, help.into().unwrap_or(name)) else { + return; + }; + self.register_metric(metric); + } + + pub fn register_float_gauge<'a>(&self, name: &str, help: impl Into>) { + let Some(metric) = Metric::new_float_gauge(name, help.into().unwrap_or(name)) else { + return; + }; + self.register_metric(metric); + } + + pub fn register_int_counter<'a>(&self, name: &str, help: impl Into>) { + let Some(metric) = Metric::new_int_counter(name, help.into().unwrap_or(name)) else { + return; + }; + self.register_metric(metric); + } + + pub fn register_histogram<'a>( + &self, + name: &str, + help: impl Into>, + buckets: Option<&[f64]>, + ) { + let Some(metric) = Metric::new_histogram(name, help.into().unwrap_or(name), buckets) else { + return; + }; + self.register_metric(metric); + } + + pub fn set(&self, name: &str, value: i64) -> bool { if let Some(metric) = self.registry_index.get(name) { metric.set(value); + true } else { - let gauge = match IntGauge::new(sanitize_metric_name(name), name) { - Ok(g) => g, - Err(e) => { - debug!("Failed to create gauge {:?}:\n{}", name, e); - return; - } - }; - self.register_gauge(Box::new(gauge)); - self.set(name, value) + false } } - pub fn inc(&self, name: &str) { + pub fn set_float(&self, name: &str, value: f64) -> bool { + if let Some(metric) = self.registry_index.get(name) { + metric.set_float(value); + true + } else { + false + } + } + + pub fn add_to_histogram(&self, name: &str, value: f64) -> bool { + if let Some(metric) = self.registry_index.get(name) { + metric.add_histogram_observation(value); + true + } else { + false + } + } + + pub fn start_timer(&self, name: &str) -> Option { + self.registry_index + .get(name) + .and_then(|metric| metric.start_timer()) + } + + pub fn inc(&self, name: &str) -> bool { if let Some(metric) = self.registry_index.get(name) { metric.inc(); + true } else { - let counter = match IntCounter::new(sanitize_metric_name(name), name) { - Ok(c) => c, - Err(e) => { - debug!("Failed to create counter {:?}:\n{}", name, e); - return; - } - }; - self.register_counter(Box::new(counter)); - self.inc(name) + false } } - pub fn inc_by(&self, name: &str, value: i64) { + pub fn inc_by(&self, name: &str, value: i64) -> bool { if let Some(metric) = self.registry_index.get(name) { metric.inc_by(value); + true } else { - let counter = match IntCounter::new(sanitize_metric_name(name), name) { - Ok(c) => c, - Err(e) => { - debug!("Failed to create counter {:?}:\n{}", name, e); - return; - } - }; - self.register_counter(Box::new(counter)); - self.inc_by(name, value) + false } } - fn register_gauge(&self, metric: Box) { - let fq_name = metric - .desc() - .first() - .map(|d| d.fq_name.clone()) - .unwrap_or_default(); + pub fn maybe_register_and_set<'a>( + &self, + name: &str, + value: i64, + help: impl Into>, + ) { + if !self.set(name, value) { + let help = help.into(); + self.register_int_gauge(name, help); + self.set(name, value); + } + } - if self.registry_index.contains_key(&fq_name) { - return; + pub fn maybe_register_and_set_float<'a>( + &self, + name: &str, + value: f64, + help: impl Into>, + ) { + if !self.set_float(name, value) { + let help = help.into(); + self.register_float_gauge(name, help); + self.set_float(name, value); } + } - match self.registry.register(metric.clone()) { - Ok(_) => { - self.registry_index - .insert(fq_name, Metric::G(metric.clone())); - } - Err(e) => { - debug!("Failed to register {:?}:\n{}", fq_name, e) - } + pub fn maybe_register_and_add_to_histogram<'a>( + &self, + name: &str, + value: f64, + buckets: Option<&[f64]>, + help: impl Into>, + ) { + if !self.add_to_histogram(name, value) { + let help = help.into(); + self.register_histogram(name, help, buckets); + self.add_to_histogram(name, value); + } + } + + pub fn maybe_register_and_inc<'a>(&self, name: &str, help: impl Into>) { + if !self.inc(name) { + let help = help.into(); + self.register_int_counter(name, help); + self.inc(name); } } - fn register_counter(&self, metric: Box) { - let fq_name = metric - .desc() - .first() - .map(|d| d.fq_name.clone()) - .unwrap_or_default(); + pub fn maybe_register_and_inc_by<'a>( + &self, + name: &str, + value: i64, + help: impl Into>, + ) { + if !self.inc_by(name, value) { + let help = help.into(); + self.register_int_counter(name, help); + self.inc_by(name, value); + } + } + + pub fn register_metric(&self, metric: impl Into) { + let m = metric.into(); + let fq_name = m.fq_name(); if self.registry_index.contains_key(&fq_name) { return; } - match self.registry.register(metric.clone()) { + + match self.registry.register(m.as_collector()) { Ok(_) => { - self.registry_index - .insert(fq_name, Metric::C(metric.clone())); + self.registry_index.insert(fq_name, m); } - Err(e) => { - debug!("Failed to register {:?}:\n{}", fq_name, e) + Err(err) => { + debug!("Failed to register '{fq_name}': {err}") } } } @@ -275,4 +573,15 @@ mod tests { "packets_sent_34_242_65_133:1789" ) } + + #[test] + fn prepend_package_name() { + let literal = prepend_package_name!("foo"); + assert_eq!(literal, "nym_metrics_foo"); + + let bar = "bar"; + let format = format!("foomp_{}", bar); + let formatted = prepend_package_name!(format); + assert_eq!(formatted, "nym_metrics_foomp_bar"); + } } diff --git a/gateway/src/node/client_handling/active_clients.rs b/gateway/src/node/client_handling/active_clients.rs index 3dacfd3f63f..4a3e55bef59 100644 --- a/gateway/src/node/client_handling/active_clients.rs +++ b/gateway/src/node/client_handling/active_clients.rs @@ -163,4 +163,11 @@ impl ActiveClientsStore { pub(crate) fn size(&self) -> usize { self.inner.len() } + + pub fn pending_packets(&self) -> usize { + self.inner + .iter() + .map(|client| client.get_sender_ref().len()) + .sum() + } } diff --git a/gateway/src/node/client_handling/websocket/common_state.rs b/gateway/src/node/client_handling/websocket/common_state.rs index 19a5943c532..223c534b268 100644 --- a/gateway/src/node/client_handling/websocket/common_state.rs +++ b/gateway/src/node/client_handling/websocket/common_state.rs @@ -7,6 +7,7 @@ use nym_crypto::asymmetric::identity; use nym_gateway_storage::GatewayStorage; use nym_mixnet_client::forwarder::MixForwardingSender; use nym_node_metrics::events::MetricEventsSender; +use nym_node_metrics::NymNodeMetrics; use std::sync::Arc; // I can see this being possible expanded with say storage or client store @@ -17,6 +18,7 @@ pub(crate) struct CommonHandlerState { pub(crate) local_identity: Arc, pub(crate) only_coconut_credentials: bool, pub(crate) bandwidth_cfg: BandwidthFlushingBehaviourConfig, + pub(crate) metrics: NymNodeMetrics, pub(crate) metrics_sender: MetricEventsSender, pub(crate) outbound_mix_sender: MixForwardingSender, pub(crate) active_clients_store: ActiveClientsStore, diff --git a/gateway/src/node/client_handling/websocket/listener.rs b/gateway/src/node/client_handling/websocket/listener.rs index ed484d17b65..faa659f0faa 100644 --- a/gateway/src/node/client_handling/websocket/listener.rs +++ b/gateway/src/node/client_handling/websocket/listener.rs @@ -61,7 +61,13 @@ impl Listener { remote_addr, shutdown, ); - tokio::spawn(handle.start_handling()); + tokio::spawn(async move { + // TODO: refactor it similarly to the mixnet listener on the nym-node + let metrics_ref = handle.shared_state.metrics.clone(); + metrics_ref.network.new_ingress_websocket_client(); + handle.start_handling().await; + metrics_ref.network.disconnected_ingress_websocket_client(); + }); } Err(err) => warn!("failed to get client: {err}"), } diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index b3b41961a1a..3304f223b32 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -252,6 +252,7 @@ impl GatewayTasksBuilder { local_identity: Arc::clone(&self.identity_keypair), only_coconut_credentials: self.config.gateway.enforce_zk_nyms, bandwidth_cfg: (&self.config).into(), + metrics: self.metrics.clone(), metrics_sender: self.metrics_sender.clone(), outbound_mix_sender: self.mix_packet_sender.clone(), active_clients_store: active_clients_store.clone(), diff --git a/nym-node/nym-node-metrics/src/lib.rs b/nym-node/nym-node-metrics/src/lib.rs index 57a8c74bb3d..904cdc14072 100644 --- a/nym-node/nym-node-metrics/src/lib.rs +++ b/nym-node/nym-node-metrics/src/lib.rs @@ -1,9 +1,15 @@ // Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +#![warn(clippy::expect_used)] +#![warn(clippy::unwrap_used)] +#![warn(clippy::todo)] +#![warn(clippy::dbg_macro)] + use crate::entry::EntryStats; use crate::mixnet::MixingStats; use crate::network::NetworkStats; +use crate::process::NodeStats; use crate::wireguard::WireguardStats; use std::ops::Deref; use std::sync::Arc; @@ -12,6 +18,8 @@ pub mod entry; pub mod events; pub mod mixnet; pub mod network; +pub mod process; +pub mod prometheus_wrapper; pub mod wireguard; #[derive(Clone, Default)] @@ -39,4 +47,5 @@ pub struct NymNodeMetricsInner { pub wireguard: WireguardStats, pub network: NetworkStats, + pub process: NodeStats, } diff --git a/nym-node/nym-node-metrics/src/mixnet.rs b/nym-node/nym-node-metrics/src/mixnet.rs index 043d2d1e7cb..32d84e509bc 100644 --- a/nym-node/nym-node-metrics/src/mixnet.rs +++ b/nym-node/nym-node-metrics/src/mixnet.rs @@ -131,13 +131,6 @@ impl MixingStats { .or_default() .dropped += 1; } - - pub fn egress_dropped_final_hop_packet(&self) { - todo!() - // self.egress - // .final_hop_packets_dropped - // .fetch_add(1, Ordering::Relaxed); - } } #[derive(Clone, Copy, Default, PartialEq)] @@ -148,6 +141,8 @@ pub struct EgressRecipientStats { #[derive(Default)] pub struct EgressMixingStats { + disk_persisted_packets: AtomicUsize, + // this includes ACKS! forward_hop_packets_sent: AtomicUsize, @@ -159,6 +154,14 @@ pub struct EgressMixingStats { } impl EgressMixingStats { + pub fn add_disk_persisted_packet(&self) { + self.disk_persisted_packets.fetch_add(1, Ordering::Relaxed); + } + + pub fn disk_persisted_packets(&self) -> usize { + self.disk_persisted_packets.load(Ordering::Relaxed) + } + pub fn forward_hop_packets_sent(&self) -> usize { self.forward_hop_packets_sent.load(Ordering::Relaxed) } diff --git a/nym-node/nym-node-metrics/src/network.rs b/nym-node/nym-node-metrics/src/network.rs index de00d785603..7de912fc3cf 100644 --- a/nym-node/nym-node-metrics/src/network.rs +++ b/nym-node/nym-node-metrics/src/network.rs @@ -2,11 +2,19 @@ // SPDX-License-Identifier: Apache-2.0 use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; #[derive(Default)] pub struct NetworkStats { // for now just experiment with basic data, we could always extend it active_ingress_mixnet_connections: AtomicUsize, + + active_ingress_websocket_connections: AtomicUsize, + + // the reason for additional `Arc` on this one is that the handler wasn't + // designed with metrics in mind and this single counter has been woven through + // the call stack + active_egress_mixnet_connections: Arc, } impl NetworkStats { @@ -20,8 +28,32 @@ impl NetworkStats { .fetch_sub(1, Ordering::Relaxed); } + pub fn new_ingress_websocket_client(&self) { + self.active_ingress_websocket_connections + .fetch_add(1, Ordering::Relaxed); + } + + pub fn disconnected_ingress_websocket_client(&self) { + self.active_ingress_websocket_connections + .fetch_sub(1, Ordering::Relaxed); + } + pub fn active_ingress_mixnet_connections_count(&self) -> usize { self.active_ingress_mixnet_connections .load(Ordering::Relaxed) } + + pub fn active_ingress_websocket_connections_count(&self) -> usize { + self.active_ingress_websocket_connections + .load(Ordering::Relaxed) + } + + pub fn active_egress_mixnet_connections_counter(&self) -> Arc { + self.active_egress_mixnet_connections.clone() + } + + pub fn active_egress_mixnet_connections_count(&self) -> usize { + self.active_egress_mixnet_connections + .load(Ordering::Relaxed) + } } diff --git a/nym-node/nym-node-metrics/src/process.rs b/nym-node/nym-node-metrics/src/process.rs new file mode 100644 index 00000000000..8e196882492 --- /dev/null +++ b/nym-node/nym-node-metrics/src/process.rs @@ -0,0 +1,57 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::atomic::{AtomicUsize, Ordering}; + +#[derive(Default)] +pub struct NodeStats { + pub final_hop_packets_pending_delivery: AtomicUsize, + + pub forward_hop_packets_pending_delivery: AtomicUsize, + + pub forward_hop_packets_being_delayed: AtomicUsize, + + // packets that haven't yet been delayed and are waiting for their chance + pub packet_forwarder_queue_size: AtomicUsize, +} + +impl NodeStats { + pub fn update_final_hop_packets_pending_delivery(&self, current: usize) { + self.final_hop_packets_pending_delivery + .store(current, Ordering::Relaxed); + } + + pub fn final_hop_packets_pending_delivery_count(&self) -> usize { + self.final_hop_packets_pending_delivery + .load(Ordering::Relaxed) + } + + pub fn update_forward_hop_packets_pending_delivery(&self, current: usize) { + self.forward_hop_packets_pending_delivery + .store(current, Ordering::Relaxed); + } + + pub fn forward_hop_packets_pending_delivery_count(&self) -> usize { + self.forward_hop_packets_pending_delivery + .load(Ordering::Relaxed) + } + + pub fn update_forward_hop_packets_being_delayed(&self, current: usize) { + self.forward_hop_packets_being_delayed + .store(current, Ordering::Relaxed); + } + + pub fn forward_hop_packets_being_delayed_count(&self) -> usize { + self.forward_hop_packets_being_delayed + .load(Ordering::Relaxed) + } + + pub fn update_packet_forwarder_queue_size(&self, current: usize) { + self.packet_forwarder_queue_size + .store(current, Ordering::Relaxed); + } + + pub fn packet_forwarder_queue_size(&self) -> usize { + self.packet_forwarder_queue_size.load(Ordering::Relaxed) + } +} diff --git a/nym-node/nym-node-metrics/src/prometheus_wrapper.rs b/nym-node/nym-node-metrics/src/prometheus_wrapper.rs new file mode 100644 index 00000000000..2d61ee55c4e --- /dev/null +++ b/nym-node/nym-node-metrics/src/prometheus_wrapper.rs @@ -0,0 +1,390 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use nym_metrics::{metrics_registry, HistogramTimer, Metric}; +use std::sync::LazyLock; +use strum::{Display, EnumCount, EnumIter, EnumProperty, IntoEnumIterator}; + +pub static PROMETHEUS_METRICS: LazyLock = + LazyLock::new(NymNodePrometheusMetrics::initialise); + +const CLIENT_SESSION_DURATION_BUCKETS: &[f64] = &[ + // sub 3s (implicitly) + 3., // 3s - 15s + 15., // 15s - 70s + 70., // 70s - 2min + 120., // 2 min - 5 min + 300., // 5min - 15min + 900., // 15min - 1h + 3600., // 1h - 12h + 43200., // 12h - 23.5h + 88200., // 23.5h - 24.5h + 86400., // 24.5h - 72h + 259200., // 72h+ (implicitly) +]; + +#[derive(Clone, Debug, EnumIter, Display, EnumProperty, EnumCount, Eq, Hash, PartialEq)] +#[strum(serialize_all = "snake_case", prefix = "nym_node_")] +pub enum PrometheusMetric { + // # MIXNET + // ## INGRESS + #[strum(props(help = "The number of ingress forward hop sphinx packets received"))] + MixnetIngressForwardPacketsReceived, + + #[strum(props(help = "The number of ingress final hop sphinx packets received"))] + MixnetIngressFinalHopPacketsReceived, + + #[strum(props(help = "The number of ingress malformed sphinx packets received"))] + MixnetIngressMalformedPacketsReceived, + + #[strum(props( + help = "The number of ingress forward sphinx packets that specified excessive delay received" + ))] + MixnetIngressExcessiveDelayPacketsReceived, + + #[strum(props(help = "The number of ingress forward hop sphinx packets dropped"))] + MixnetIngressForwardPacketsDropped, + + #[strum(props(help = "The number of ingress final hop sphinx packets dropped"))] + MixnetIngressFinalHopPacketsDropped, + + #[strum(props(help = "The current rate of receiving ingress forward hop sphinx packets"))] + MixnetIngressForwardPacketsReceivedRate, + + #[strum(props(help = "The current rate of receiving ingress final hop sphinx packets"))] + MixnetIngressFinalHopPacketsReceivedRate, + + #[strum(props(help = "The current rate of receiving ingress malformed sphinx packets"))] + MixnetIngressMalformedPacketsReceivedRate, + + #[strum(props( + help = "The current rate of receiving ingress sphinx packets that specified excessive delay" + ))] + MixnetIngressExcessiveDelayPacketsReceivedRate, + + #[strum(props(help = "The current rate of dropping ingress forward hop sphinx packets"))] + MixnetIngressForwardPacketsDroppedRate, + + #[strum(props(help = "The current rate of dropping ingress final hop sphinx packets"))] + MixnetIngressFinalHopPacketsDroppedRate, + + // ## EGRESS + #[strum(props( + help = "The number of unwrapped final hop packets stored on disk for offline clients" + ))] + MixnetEgressStoredOnDiskFinalHopPackets, + + #[strum(props(help = "The number of egress forward hop sphinx packets sent/forwarded"))] + MixnetEgressForwardPacketsSent, + + #[strum(props( + help = "The number of egress forward hop sphinx packets sent/forwarded (acks only)" + ))] + MixnetEgressAckSent, + + #[strum(props(help = "The number of egress forward hop sphinx packets dropped"))] + MixnetEgressForwardPacketsDropped, + + #[strum(props( + help = "The current rate of sending/forwarding egress forward hop sphinx packets" + ))] + MixnetEgressForwardPacketsSendRate, + + #[strum(props( + help = "The current rate of sending/forwarding egress forward hop sphinx packets (acks only)" + ))] + MixnetEgressAckSendRate, + + #[strum(props(help = "The current rate of dropping egress forward hop sphinx packets"))] + MixnetEgressForwardPacketsDroppedRate, + + // # ENTRY + #[strum(props(help = "The number of unique users"))] + EntryClientUniqueUsers, + + #[strum(props(help = "The number of client sessions started"))] + EntryClientSessionsStarted, + + #[strum(props(help = "The number of client sessions finished"))] + EntryClientSessionsFinished, + + #[strum(to_string = "entry_client_sessions_durations_{typ}")] + #[strum(props(help = "The distribution of client sessions duration of the specified type"))] + EntryClientSessionsDurations { typ: String }, + + // # WIREGUARD + #[strum(props(help = "The amount of bytes transmitted via wireguard"))] + WireguardBytesTx, + + #[strum(props(help = "The amount of bytes received via wireguard"))] + WireguardBytesRx, + + #[strum(props(help = "The current number of all registered wireguard peers"))] + WireguardTotalPeers, + + #[strum(props(help = "The current number of active wireguard peers"))] + WireguardActivePeers, + + #[strum(props(help = "The current sending rate of wireguard"))] + WireguardBytesTxRate, + + #[strum(props(help = "The current receiving rate of wireguard"))] + WireguardBytesRxRate, + + // # NETWORK + #[strum(props(help = "The number of active ingress mixnet connections"))] + NetworkActiveIngressMixnetConnections, + + #[strum(props(help = "The number of active ingress websocket connections"))] + NetworkActiveIngressWebSocketConnections, + + #[strum(props(help = "The number of active egress mixnet connections"))] + NetworkActiveEgressMixnetConnections, + + // # PROCESS + #[strum(props(help = "The current number of packets being delayed"))] + ProcessForwardHopPacketsBeingDelayed, + + #[strum(props( + help = "The current number of packets waiting in the queue to get delayed and sent into the mixnet" + ))] + ProcessPacketForwarderQueueSize, + + #[strum(props( + help = "The latency distribution of attempting to retrieve network topology (from nym-api)" + ))] + ProcessTopologyQueryResolutionLatency, + + #[strum(props( + help = "The current number of final hop packets stuck in channels waiting to get delivered to appropriate websocket connections" + ))] + ProcessFinalHopPacketsPendingDelivery, + + #[strum(props( + help = "The current number of forward hop packets stuck in channels waiting to get delivered to appropriate TCP connections" + ))] + ProcessForwardHopPacketsPendingDelivery, +} + +impl PrometheusMetric { + fn name(&self) -> String { + self.to_string() + } + + fn help(&self) -> &'static str { + // SAFETY: every variant has a `help` prop defined (and there's a unit test is checking for that) + #[allow(clippy::unwrap_used)] + self.get_str("help").unwrap() + } + + fn is_complex(&self) -> bool { + matches!(self, PrometheusMetric::EntryClientSessionsDurations { .. }) + // match self { + // PrometheusMetric::EntryClientSessionsDurations { .. } => true, + // _ => false, + // } + } + + fn to_registrable_metric(&self) -> Option { + let name = self.name(); + let help = self.help(); + + match self { + PrometheusMetric::MixnetIngressForwardPacketsReceived => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetIngressFinalHopPacketsReceived => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetIngressMalformedPacketsReceived => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetIngressExcessiveDelayPacketsReceived => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetIngressForwardPacketsDropped => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetIngressFinalHopPacketsDropped => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetIngressForwardPacketsReceivedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetIngressFinalHopPacketsReceivedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetIngressMalformedPacketsReceivedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetIngressExcessiveDelayPacketsReceivedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetIngressForwardPacketsDroppedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetIngressFinalHopPacketsDroppedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetEgressStoredOnDiskFinalHopPackets => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetEgressForwardPacketsSent => Metric::new_int_gauge(&name, help), + PrometheusMetric::MixnetEgressAckSent => Metric::new_int_gauge(&name, help), + PrometheusMetric::MixnetEgressForwardPacketsDropped => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::MixnetEgressForwardPacketsSendRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::MixnetEgressAckSendRate => Metric::new_float_gauge(&name, help), + PrometheusMetric::MixnetEgressForwardPacketsDroppedRate => { + Metric::new_float_gauge(&name, help) + } + PrometheusMetric::EntryClientUniqueUsers => Metric::new_int_gauge(&name, help), + PrometheusMetric::EntryClientSessionsStarted => Metric::new_int_gauge(&name, help), + PrometheusMetric::EntryClientSessionsFinished => Metric::new_int_gauge(&name, help), + PrometheusMetric::EntryClientSessionsDurations { .. } => { + Metric::new_histogram(&name, help, Some(CLIENT_SESSION_DURATION_BUCKETS)) + } + PrometheusMetric::WireguardBytesTx => Metric::new_int_gauge(&name, help), + PrometheusMetric::WireguardBytesRx => Metric::new_int_gauge(&name, help), + PrometheusMetric::WireguardTotalPeers => Metric::new_int_gauge(&name, help), + PrometheusMetric::WireguardActivePeers => Metric::new_int_gauge(&name, help), + PrometheusMetric::WireguardBytesTxRate => Metric::new_float_gauge(&name, help), + PrometheusMetric::WireguardBytesRxRate => Metric::new_float_gauge(&name, help), + PrometheusMetric::NetworkActiveIngressMixnetConnections => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::NetworkActiveIngressWebSocketConnections => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::NetworkActiveEgressMixnetConnections => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::ProcessForwardHopPacketsBeingDelayed => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::ProcessPacketForwarderQueueSize => Metric::new_int_gauge(&name, help), + PrometheusMetric::ProcessTopologyQueryResolutionLatency => { + Metric::new_histogram(&name, help, None) + } + PrometheusMetric::ProcessFinalHopPacketsPendingDelivery => { + Metric::new_int_gauge(&name, help) + } + PrometheusMetric::ProcessForwardHopPacketsPendingDelivery => { + Metric::new_int_gauge(&name, help) + } + } + } + + fn set(&self, value: i64) { + metrics_registry().set(&self.name(), value); + } + + fn set_float(&self, value: f64) { + metrics_registry().set_float(&self.name(), value); + } + + fn inc(&self) { + metrics_registry().inc(&self.name()); + } + + fn inc_by(&self, value: i64) { + metrics_registry().inc_by(&self.name(), value); + } + + fn observe_histogram(&self, value: f64) { + let reg = metrics_registry(); + if !reg.add_to_histogram(&self.name(), value) { + if let Some(registrable) = self.to_registrable_metric() { + reg.register_metric(registrable); + reg.add_to_histogram(&self.name(), value); + } + } + } + + fn start_timer(&self) -> Option { + metrics_registry().start_timer(&self.name()) + } +} + +#[non_exhaustive] +pub struct NymNodePrometheusMetrics {} + +impl NymNodePrometheusMetrics { + // initialise all fields on startup with default values so that they'd be immediately available for query + pub(crate) fn initialise() -> Self { + let registry = metrics_registry(); + + // we can't initialise complex metrics as their names will only be fully known at runtime + for kind in PrometheusMetric::iter() { + if !kind.is_complex() { + if let Some(metric) = kind.to_registrable_metric() { + registry.register_metric(metric); + } + } + } + + NymNodePrometheusMetrics {} + } + + pub fn set(&self, metric: PrometheusMetric, value: i64) { + metric.set(value) + } + + pub fn set_float(&self, metric: PrometheusMetric, value: f64) { + metric.set_float(value) + } + + pub fn inc(&self, metric: PrometheusMetric) { + metric.inc() + } + + pub fn inc_by(&self, metric: PrometheusMetric, value: i64) { + metric.inc_by(value) + } + + pub fn observe_histogram(&self, metric: PrometheusMetric, value: f64) { + metric.observe_histogram(value) + } + + pub fn start_timer(&self, metric: PrometheusMetric) -> Option { + metric.start_timer() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use strum::IntoEnumIterator; + + #[test] + fn prometheus_metrics() { + // a sanity check for anyone adding new metrics. if this test fails, + // make sure any methods on `PrometheusMetric` enum don't need updating + // or require custom Display impl + assert_eq!(37, PrometheusMetric::COUNT) + } + + #[test] + fn every_variant_has_help_property() { + for variant in PrometheusMetric::iter() { + assert!(variant.get_str("help").is_some()) + } + } + + #[test] + fn prometheus_metrics_names() { + // make sure nothing changed in our serialisation + let simple = PrometheusMetric::MixnetIngressForwardPacketsReceived.to_string(); + assert_eq!("nym_node_mixnet_ingress_forward_packets_received", simple); + + let parameterised = + PrometheusMetric::EntryClientSessionsDurations { typ: "vpn".into() }.to_string(); + assert_eq!( + "nym_node_entry_client_sessions_durations_vpn", + parameterised + ) + } +} diff --git a/nym-node/src/config/metrics.rs b/nym-node/src/config/metrics.rs index 80f9dcea6db..5bdec5bdbfd 100644 --- a/nym-node/src/config/metrics.rs +++ b/nym-node/src/config/metrics.rs @@ -25,6 +25,14 @@ pub struct Debug { #[serde(with = "humantime_serde")] pub stale_mixnet_metrics_cleaner_rate: Duration, + /// Specify the target rate of updating global prometheus counters. + #[serde(with = "humantime_serde")] + pub global_prometheus_counters_update_rate: Duration, + + /// Specify the target rate of updating egress packets pending delivery counter. + #[serde(with = "humantime_serde")] + pub pending_egress_packets_update_rate: Duration, + /// Specify the rate of updating clients sessions #[serde(with = "humantime_serde")] pub clients_sessions_update_rate: Duration, @@ -42,8 +50,10 @@ impl Debug { const DEFAULT_CONSOLE_LOGGING_INTERVAL: Duration = Duration::from_millis(60_000); const DEFAULT_LEGACY_MIXING_UPDATE_RATE: Duration = Duration::from_millis(30_000); const DEFAULT_AGGREGATOR_UPDATE_RATE: Duration = Duration::from_secs(5); - const DEFAULT_STALE_MIXNET_ETRICS_UPDATE_RATE: Duration = Duration::from_secs(3600); + const DEFAULT_STALE_MIXNET_METRICS_UPDATE_RATE: Duration = Duration::from_secs(3600); const DEFAULT_CLIENT_SESSIONS_UPDATE_RATE: Duration = Duration::from_secs(3600); + const GLOBAL_PROMETHEUS_COUNTERS_UPDATE_INTERVAL: Duration = Duration::from_secs(30); + const DEFAULT_PENDING_EGRESS_PACKETS_UPDATE_RATE: Duration = Duration::from_secs(30); } impl Default for Debug { @@ -53,7 +63,10 @@ impl Default for Debug { console_logging_update_interval: Self::DEFAULT_CONSOLE_LOGGING_INTERVAL, legacy_mixing_metrics_update_rate: Self::DEFAULT_LEGACY_MIXING_UPDATE_RATE, aggregator_update_rate: Self::DEFAULT_AGGREGATOR_UPDATE_RATE, - stale_mixnet_metrics_cleaner_rate: Self::DEFAULT_STALE_MIXNET_ETRICS_UPDATE_RATE, + stale_mixnet_metrics_cleaner_rate: Self::DEFAULT_STALE_MIXNET_METRICS_UPDATE_RATE, + global_prometheus_counters_update_rate: + Self::GLOBAL_PROMETHEUS_COUNTERS_UPDATE_INTERVAL, + pending_egress_packets_update_rate: Self::DEFAULT_PENDING_EGRESS_PACKETS_UPDATE_RATE, clients_sessions_update_rate: Self::DEFAULT_CLIENT_SESSIONS_UPDATE_RATE, } } diff --git a/nym-node/src/config/mod.rs b/nym-node/src/config/mod.rs index 6b996f24be0..8f0cb9f220e 100644 --- a/nym-node/src/config/mod.rs +++ b/nym-node/src/config/mod.rs @@ -444,6 +444,7 @@ pub struct Http { /// An optional bearer token for accessing certain http endpoints. /// Currently only used for obtaining mixnode's stats. #[serde(default)] + #[serde(deserialize_with = "de_maybe_stringified")] pub access_token: Option, /// Specify whether basic system information should be exposed. diff --git a/nym-node/src/node/http/error.rs b/nym-node/src/node/http/error.rs index 8019c5fd588..251705f5206 100644 --- a/nym-node/src/node/http/error.rs +++ b/nym-node/src/node/http/error.rs @@ -1,5 +1,5 @@ // Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 +// SPDX-License-Identifier: GPL-3.0-only use std::io; use std::net::SocketAddr; diff --git a/nym-node/src/node/http/router/api/v1/metrics/mod.rs b/nym-node/src/node/http/router/api/v1/metrics/mod.rs index 71a9760273e..51af714434d 100644 --- a/nym-node/src/node/http/router/api/v1/metrics/mod.rs +++ b/nym-node/src/node/http/router/api/v1/metrics/mod.rs @@ -10,7 +10,12 @@ use crate::node::http::state::metrics::MetricsAppState; use axum::extract::FromRef; use axum::routing::get; use axum::Router; +use nym_http_api_common::middleware::bearer_auth::AuthLayer; use nym_node_requests::routes::api::v1::metrics; +use nym_node_requests::routes::api::v1::metrics::prometheus_absolute; +use std::sync::Arc; +use tracing::info; +use zeroize::Zeroizing; pub mod legacy_mixing; pub mod packets_stats; @@ -21,16 +26,23 @@ pub mod wireguard; #[derive(Debug, Clone, Default)] pub struct Config { - // + pub bearer_token: Option>>, } #[allow(deprecated)] -pub(super) fn routes(_config: Config) -> Router +pub(super) fn routes(config: Config) -> Router where S: Send + Sync + 'static + Clone, MetricsAppState: FromRef, { - Router::new() + if config.bearer_token.is_none() { + info!( + "bearer token hasn't been set. '{}' route will not be exposed", + prometheus_absolute() + ) + } + + let router = Router::new() .route( metrics::LEGACY_MIXING, get(legacy_mixing::legacy_mixing_stats), @@ -38,6 +50,17 @@ where .route(metrics::PACKETS_STATS, get(packets_stats)) .route(metrics::WIREGUARD_STATS, get(wireguard_stats)) .route(metrics::SESSIONS, get(sessions_stats)) - .route(metrics::VERLOC, get(verloc_stats)) - .route(metrics::PROMETHEUS, get(prometheus_metrics)) + .route(metrics::VERLOC, get(verloc_stats)); + + let auth_middleware = config.bearer_token.map(AuthLayer::new); + + // don't expose prometheus route without bearer token set + if let Some(auth_middleware) = auth_middleware { + router.route( + metrics::PROMETHEUS, + get(prometheus_metrics).route_layer(auth_middleware), + ) + } else { + router + } } diff --git a/nym-node/src/node/http/router/api/v1/metrics/prometheus.rs b/nym-node/src/node/http/router/api/v1/metrics/prometheus.rs index bdb0050d795..2bd8a3356b5 100644 --- a/nym-node/src/node/http/router/api/v1/metrics/prometheus.rs +++ b/nym-node/src/node/http/router/api/v1/metrics/prometheus.rs @@ -1,12 +1,6 @@ // Copyright 2024 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use crate::node::http::state::metrics::MetricsAppState; -use axum::extract::State; -use axum::http::StatusCode; -use axum_extra::TypedHeader; -use headers::authorization::Bearer; -use headers::Authorization; use nym_metrics::metrics; /// Returns `prometheus` compatible metrics @@ -25,21 +19,7 @@ use nym_metrics::metrics; ("prometheus_token" = []) ) )] -pub(crate) async fn prometheus_metrics( - TypedHeader(authorization): TypedHeader>, - State(state): State, -) -> Result { - if authorization.token().is_empty() { - return Err(StatusCode::UNAUTHORIZED); - } - // TODO: is 500 the correct error code here? - let Some(metrics_key) = state.prometheus_access_token else { - return Err(StatusCode::INTERNAL_SERVER_ERROR); - }; - - if metrics_key != authorization.token() { - return Err(StatusCode::UNAUTHORIZED); - } - - Ok(metrics!()) +// the AuthLayer is protecting access to this endpoint +pub(crate) async fn prometheus_metrics() -> String { + metrics!() } diff --git a/nym-node/src/node/http/router/mod.rs b/nym-node/src/node/http/router/mod.rs index 449638c8d6c..3d40e9a67ef 100644 --- a/nym-node/src/node/http/router/mod.rs +++ b/nym-node/src/node/http/router/mod.rs @@ -20,6 +20,8 @@ use nym_node_requests::api::SignedHostInformation; use nym_node_requests::routes; use std::net::SocketAddr; use std::path::Path; +use std::sync::Arc; +use zeroize::Zeroizing; pub mod api; pub mod landing_page; @@ -115,6 +117,11 @@ impl HttpServerConfig { self.api.v1_config.authenticator.details = Some(authenticator); self } + + pub fn with_prometheus_bearer_token(mut self, bearer_token: Option) -> Self { + self.api.v1_config.metrics.bearer_token = bearer_token.map(|b| Arc::new(Zeroizing::new(b))); + self + } } pub struct NymNodeRouter { diff --git a/nym-node/src/node/http/state/metrics.rs b/nym-node/src/node/http/state/metrics.rs index c9156c021e4..8b10fb0a30a 100644 --- a/nym-node/src/node/http/state/metrics.rs +++ b/nym-node/src/node/http/state/metrics.rs @@ -9,8 +9,6 @@ pub use nym_verloc::measurements::metrics::SharedVerlocStats; #[derive(Clone)] pub struct MetricsAppState { - pub(crate) prometheus_access_token: Option, - pub(crate) metrics: NymNodeMetrics, pub(crate) verloc: SharedVerlocStats, diff --git a/nym-node/src/node/http/state/mod.rs b/nym-node/src/node/http/state/mod.rs index b9b58c0cce0..0e2fa98786c 100644 --- a/nym-node/src/node/http/state/mod.rs +++ b/nym-node/src/node/http/state/mod.rs @@ -24,17 +24,7 @@ impl AppState { // does it have to be? // also no. startup_time: Instant::now(), - metrics: MetricsAppState { - prometheus_access_token: None, - metrics, - verloc, - }, + metrics: MetricsAppState { metrics, verloc }, } } - - #[must_use] - pub fn with_metrics_key(mut self, bearer_token: impl Into>) -> Self { - self.metrics.prometheus_access_token = bearer_token.into(); - self - } } diff --git a/nym-node/src/node/metrics/aggregator.rs b/nym-node/src/node/metrics/aggregator.rs index c166eb0d3cb..8f76b14edb6 100644 --- a/nym-node/src/node/metrics/aggregator.rs +++ b/nym-node/src/node/metrics/aggregator.rs @@ -44,7 +44,7 @@ impl MetricsAggregator { self.event_sender.clone() } - pub fn register_handler(&mut self, handler: H, update_interval: Duration) + pub fn register_handler(&mut self, handler: H, update_interval: impl Into>) where H: MetricsHandler, { diff --git a/nym-node/src/node/metrics/events_listener.rs b/nym-node/src/node/metrics/events_listener.rs index 755fb6cc8b8..939f19b3a90 100644 --- a/nym-node/src/node/metrics/events_listener.rs +++ b/nym-node/src/node/metrics/events_listener.rs @@ -1,2 +1,2 @@ // Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 +// SPDX-License-Identifier: GPL-3.0-only diff --git a/nym-node/src/node/metrics/handler/client_sessions.rs b/nym-node/src/node/metrics/handler/client_sessions.rs index 8394eb1d894..2f2326c541b 100644 --- a/nym-node/src/node/metrics/handler/client_sessions.rs +++ b/nym-node/src/node/metrics/handler/client_sessions.rs @@ -1,5 +1,5 @@ // Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 +// SPDX-License-Identifier: GPL-3.0-only use crate::node::metrics::handler::{ MetricsHandler, OnStartMetricsHandler, OnUpdateMetricsHandler, @@ -10,6 +10,8 @@ use nym_gateway_stats_storage::error::StatsStorageError; use nym_gateway_stats_storage::models::{TicketType, ToSessionType}; use nym_node_metrics::entry::{ActiveSession, ClientSessions, FinishedSession}; use nym_node_metrics::events::GatewaySessionEvent; +use nym_node_metrics::prometheus_wrapper::PrometheusMetric::EntryClientSessionsDurations; +use nym_node_metrics::prometheus_wrapper::PROMETHEUS_METRICS; use nym_node_metrics::NymNodeMetrics; use nym_sphinx_types::DestinationAddressBytes; use time::{Date, Duration, OffsetDateTime}; @@ -53,6 +55,13 @@ impl GatewaySessionStatsHandler { ) -> Result<(), StatsStorageError> { if let Some(session) = self.storage.get_active_session(client).await? { if let Some(finished_session) = session.end_at(stop_time) { + PROMETHEUS_METRICS.observe_histogram( + EntryClientSessionsDurations { + typ: finished_session.typ.to_string(), + }, + finished_session.duration.as_secs_f64(), + ); + self.storage .insert_finished_session(self.current_day, finished_session) .await?; diff --git a/nym-node/src/node/metrics/handler/global_prometheus_updater/at_last_update.rs b/nym-node/src/node/metrics/handler/global_prometheus_updater/at_last_update.rs new file mode 100644 index 00000000000..af16be7ecbb --- /dev/null +++ b/nym-node/src/node/metrics/handler/global_prometheus_updater/at_last_update.rs @@ -0,0 +1,219 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use nym_node_metrics::mixnet::{EgressMixingStats, IngressMixingStats, MixingStats}; +use nym_node_metrics::wireguard::WireguardStats; +use nym_node_metrics::NymNodeMetrics; +use time::OffsetDateTime; + +// used to calculate traffic rates +#[derive(Debug)] +pub(crate) struct AtLastUpdate { + time: OffsetDateTime, + + mixnet: LastMixnet, + wireguard: LastWireguard, +} + +impl AtLastUpdate { + pub(crate) fn is_initial(&self) -> bool { + self.time == OffsetDateTime::UNIX_EPOCH + } + + pub(crate) fn rates(&self, previous: &Self) -> RateSinceUpdate { + let delta_secs = (self.time - previous.time).as_seconds_f64(); + + RateSinceUpdate { + mixnet: self.mixnet.rates(&previous.mixnet, delta_secs), + wireguard: self.wireguard.rates(&previous.wireguard, delta_secs), + } + } +} + +impl Default for AtLastUpdate { + fn default() -> Self { + AtLastUpdate { + time: OffsetDateTime::now_utc(), + mixnet: Default::default(), + wireguard: Default::default(), + } + } +} + +impl From<&NymNodeMetrics> for AtLastUpdate { + fn from(metrics: &NymNodeMetrics) -> Self { + AtLastUpdate { + time: OffsetDateTime::now_utc(), + mixnet: (&metrics.mixnet).into(), + wireguard: (&metrics.wireguard).into(), + } + } +} + +#[derive(Debug, Default)] +struct LastMixnet { + ingres: LastMixnetIngress, + egress: LastMixnetEgress, +} + +impl LastMixnet { + fn rates(&self, previous: &Self, time_delta_secs: f64) -> MixnetRateSinceUpdate { + MixnetRateSinceUpdate { + ingress: self.ingres.rates(&previous.ingres, time_delta_secs), + egress: self.egress.rates(&previous.egress, time_delta_secs), + } + } +} + +impl From<&MixingStats> for LastMixnet { + fn from(value: &MixingStats) -> Self { + LastMixnet { + ingres: (&value.ingress).into(), + egress: (&value.egress).into(), + } + } +} + +#[derive(Debug, Default)] +struct LastMixnetIngress { + forward_hop_packets_received: usize, + final_hop_packets_received: usize, + malformed_packets_received: usize, + excessive_delay_packets: usize, + forward_hop_packets_dropped: usize, + final_hop_packets_dropped: usize, +} + +impl LastMixnetIngress { + fn rates(&self, previous: &Self, time_delta_secs: f64) -> MixnetIngressRateSinceUpdate { + let forward_hop_packets_received_delta = + self.forward_hop_packets_received - previous.forward_hop_packets_received; + let final_hop_packets_received_delta = + self.final_hop_packets_received - previous.final_hop_packets_received; + let malformed_packets_received_delta = + self.malformed_packets_received - previous.malformed_packets_received; + let excessive_delay_packets_delta = + self.excessive_delay_packets - previous.excessive_delay_packets; + let forward_hop_packets_dropped_delta = + self.forward_hop_packets_dropped - previous.forward_hop_packets_dropped; + let final_hop_packets_dropped_delta = + self.final_hop_packets_dropped - previous.final_hop_packets_dropped; + + MixnetIngressRateSinceUpdate { + forward_hop_packets_received_sec: forward_hop_packets_received_delta as f64 + / time_delta_secs, + final_hop_packets_received_sec: final_hop_packets_received_delta as f64 + / time_delta_secs, + malformed_packets_received_sec: malformed_packets_received_delta as f64 + / time_delta_secs, + excessive_delay_packets_sec: excessive_delay_packets_delta as f64 / time_delta_secs, + forward_hop_packets_dropped_sec: forward_hop_packets_dropped_delta as f64 + / time_delta_secs, + final_hop_packets_dropped_sec: final_hop_packets_dropped_delta as f64 / time_delta_secs, + } + } +} + +impl From<&IngressMixingStats> for LastMixnetIngress { + fn from(value: &IngressMixingStats) -> Self { + LastMixnetIngress { + forward_hop_packets_received: value.forward_hop_packets_received(), + final_hop_packets_received: value.final_hop_packets_received(), + malformed_packets_received: value.malformed_packets_received(), + excessive_delay_packets: value.excessive_delay_packets(), + forward_hop_packets_dropped: value.forward_hop_packets_dropped(), + final_hop_packets_dropped: value.final_hop_packets_dropped(), + } + } +} + +#[derive(Debug, Default)] +struct LastMixnetEgress { + forward_hop_packets_sent: usize, + ack_packets_sent: usize, + forward_hop_packets_dropped: usize, +} + +impl LastMixnetEgress { + fn rates(&self, previous: &Self, time_delta_secs: f64) -> MixnetEgressRateSinceUpdate { + let forward_hop_packets_sent_delta = + self.forward_hop_packets_sent - previous.forward_hop_packets_sent; + let ack_packets_sent_delta = self.ack_packets_sent - previous.ack_packets_sent; + let forward_hop_packets_dropped_delta = + self.forward_hop_packets_dropped - previous.forward_hop_packets_dropped; + + MixnetEgressRateSinceUpdate { + forward_hop_packets_sent_sec: forward_hop_packets_sent_delta as f64 / time_delta_secs, + ack_packets_sent_sec: ack_packets_sent_delta as f64 / time_delta_secs, + forward_hop_packets_dropped_sec: forward_hop_packets_dropped_delta as f64 + / time_delta_secs, + } + } +} + +impl From<&EgressMixingStats> for LastMixnetEgress { + fn from(value: &EgressMixingStats) -> Self { + LastMixnetEgress { + forward_hop_packets_sent: value.forward_hop_packets_sent(), + ack_packets_sent: value.ack_packets_sent(), + forward_hop_packets_dropped: value.forward_hop_packets_dropped(), + } + } +} + +#[derive(Debug, Default)] +struct LastWireguard { + bytes_tx: usize, + bytes_rx: usize, +} + +impl LastWireguard { + fn rates(&self, previous: &Self, time_delta_secs: f64) -> WireguardRateSinceUpdate { + let bytes_tx_delta = self.bytes_tx - previous.bytes_tx; + let bytes_rx_delta = self.bytes_rx - previous.bytes_rx; + + WireguardRateSinceUpdate { + bytes_tx_sec: bytes_tx_delta as f64 / time_delta_secs, + bytes_rx_sec: bytes_rx_delta as f64 / time_delta_secs, + } + } +} + +impl From<&WireguardStats> for LastWireguard { + fn from(value: &WireguardStats) -> Self { + LastWireguard { + bytes_tx: value.bytes_tx(), + bytes_rx: value.bytes_rx(), + } + } +} + +pub(crate) struct RateSinceUpdate { + pub(crate) mixnet: MixnetRateSinceUpdate, + pub(crate) wireguard: WireguardRateSinceUpdate, +} + +pub(crate) struct MixnetRateSinceUpdate { + pub(crate) ingress: MixnetIngressRateSinceUpdate, + pub(crate) egress: MixnetEgressRateSinceUpdate, +} + +pub(crate) struct MixnetIngressRateSinceUpdate { + pub(crate) forward_hop_packets_received_sec: f64, + pub(crate) final_hop_packets_received_sec: f64, + pub(crate) malformed_packets_received_sec: f64, + pub(crate) excessive_delay_packets_sec: f64, + pub(crate) forward_hop_packets_dropped_sec: f64, + pub(crate) final_hop_packets_dropped_sec: f64, +} + +pub(crate) struct MixnetEgressRateSinceUpdate { + pub(crate) forward_hop_packets_sent_sec: f64, + pub(crate) ack_packets_sent_sec: f64, + pub(crate) forward_hop_packets_dropped_sec: f64, +} + +pub(crate) struct WireguardRateSinceUpdate { + pub(crate) bytes_tx_sec: f64, + pub(crate) bytes_rx_sec: f64, +} diff --git a/nym-node/src/node/metrics/handler/global_prometheus_updater/mod.rs b/nym-node/src/node/metrics/handler/global_prometheus_updater/mod.rs new file mode 100644 index 00000000000..81c61d9db8b --- /dev/null +++ b/nym-node/src/node/metrics/handler/global_prometheus_updater/mod.rs @@ -0,0 +1,223 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::node::metrics::handler::global_prometheus_updater::at_last_update::AtLastUpdate; +use crate::node::metrics::handler::{ + MetricsHandler, OnStartMetricsHandler, OnUpdateMetricsHandler, +}; +use async_trait::async_trait; +use nym_node_metrics::prometheus_wrapper::{ + NymNodePrometheusMetrics, PrometheusMetric, PROMETHEUS_METRICS, +}; +use nym_node_metrics::NymNodeMetrics; + +mod at_last_update; + +// it can be anything, we just need a unique type_id to register our handler +pub struct GlobalPrometheusData; + +pub struct PrometheusGlobalNodeMetricsRegistryUpdater { + metrics: NymNodeMetrics, + prometheus_wrapper: &'static NymNodePrometheusMetrics, + at_last_update: AtLastUpdate, +} + +impl PrometheusGlobalNodeMetricsRegistryUpdater { + pub(crate) fn new(metrics: NymNodeMetrics) -> Self { + Self { + metrics, + prometheus_wrapper: &PROMETHEUS_METRICS, + at_last_update: Default::default(), + } + } +} + +#[async_trait] +impl OnStartMetricsHandler for PrometheusGlobalNodeMetricsRegistryUpdater {} + +#[async_trait] +impl OnUpdateMetricsHandler for PrometheusGlobalNodeMetricsRegistryUpdater { + async fn on_update(&mut self) { + let entry_guard = self.metrics.entry.client_sessions().await; + use PrometheusMetric::*; + + // # MIXNET + // ## INGRESS + self.prometheus_wrapper.set( + MixnetIngressForwardPacketsReceived, + self.metrics.mixnet.ingress.forward_hop_packets_received() as i64, + ); + self.prometheus_wrapper.set( + MixnetIngressFinalHopPacketsReceived, + self.metrics.mixnet.ingress.final_hop_packets_received() as i64, + ); + self.prometheus_wrapper.set( + MixnetIngressMalformedPacketsReceived, + self.metrics.mixnet.ingress.malformed_packets_received() as i64, + ); + self.prometheus_wrapper.set( + MixnetIngressExcessiveDelayPacketsReceived, + self.metrics.mixnet.ingress.excessive_delay_packets() as i64, + ); + self.prometheus_wrapper.set( + MixnetEgressForwardPacketsDropped, + self.metrics.mixnet.ingress.forward_hop_packets_dropped() as i64, + ); + self.prometheus_wrapper.set( + MixnetIngressFinalHopPacketsDropped, + self.metrics.mixnet.ingress.final_hop_packets_dropped() as i64, + ); + + // ## EGRESS + self.prometheus_wrapper.set( + MixnetEgressStoredOnDiskFinalHopPackets, + self.metrics.mixnet.egress.disk_persisted_packets() as i64, + ); + self.prometheus_wrapper.set( + MixnetEgressForwardPacketsSent, + self.metrics.mixnet.egress.forward_hop_packets_sent() as i64, + ); + self.prometheus_wrapper.set( + MixnetEgressAckSent, + self.metrics.mixnet.egress.ack_packets_sent() as i64, + ); + self.prometheus_wrapper.set( + MixnetEgressForwardPacketsDropped, + self.metrics.mixnet.egress.forward_hop_packets_dropped() as i64, + ); + + // # ENTRY + self.prometheus_wrapper.set( + EntryClientUniqueUsers, + entry_guard.unique_users.len() as i64, + ); + self.prometheus_wrapper.set( + EntryClientSessionsStarted, + entry_guard.sessions_started as i64, + ); + self.prometheus_wrapper.set( + EntryClientSessionsFinished, + entry_guard.finished_sessions.len() as i64, + ); + + // # WIREGUARD + self.prometheus_wrapper + .set(WireguardBytesRx, self.metrics.wireguard.bytes_rx() as i64); + self.prometheus_wrapper + .set(WireguardBytesTx, self.metrics.wireguard.bytes_tx() as i64); + self.prometheus_wrapper.set( + WireguardTotalPeers, + self.metrics.wireguard.total_peers() as i64, + ); + self.prometheus_wrapper.set( + WireguardActivePeers, + self.metrics.wireguard.active_peers() as i64, + ); + + // # NETWORK + self.prometheus_wrapper.set( + NetworkActiveIngressMixnetConnections, + self.metrics + .network + .active_ingress_mixnet_connections_count() as i64, + ); + self.prometheus_wrapper.set( + NetworkActiveIngressWebSocketConnections, + self.metrics + .network + .active_ingress_websocket_connections_count() as i64, + ); + self.prometheus_wrapper.set( + NetworkActiveIngressWebSocketConnections, + self.metrics + .network + .active_egress_mixnet_connections_count() as i64, + ); + + // # PROCESS + self.prometheus_wrapper.set( + ProcessForwardHopPacketsBeingDelayed, + self.metrics + .process + .forward_hop_packets_being_delayed_count() as i64, + ); + self.prometheus_wrapper.set( + ProcessPacketForwarderQueueSize, + self.metrics.process.packet_forwarder_queue_size() as i64, + ); + self.prometheus_wrapper.set( + ProcessFinalHopPacketsPendingDelivery, + self.metrics + .process + .final_hop_packets_pending_delivery_count() as i64, + ); + self.prometheus_wrapper.set( + ProcessForwardHopPacketsPendingDelivery, + self.metrics + .process + .forward_hop_packets_pending_delivery_count() as i64, + ); + + let updated = AtLastUpdate::from(&self.metrics); + + // # RATES + if !self.at_last_update.is_initial() { + let diff = updated.rates(&self.at_last_update); + + self.prometheus_wrapper.set_float( + MixnetIngressForwardPacketsReceivedRate, + diff.mixnet.ingress.forward_hop_packets_received_sec, + ); + self.prometheus_wrapper.set_float( + MixnetIngressFinalHopPacketsReceivedRate, + diff.mixnet.ingress.final_hop_packets_received_sec, + ); + self.prometheus_wrapper.set_float( + MixnetIngressMalformedPacketsReceivedRate, + diff.mixnet.ingress.malformed_packets_received_sec, + ); + self.prometheus_wrapper.set_float( + MixnetIngressExcessiveDelayPacketsReceivedRate, + diff.mixnet.ingress.excessive_delay_packets_sec, + ); + self.prometheus_wrapper.set_float( + MixnetIngressForwardPacketsDroppedRate, + diff.mixnet.ingress.forward_hop_packets_dropped_sec, + ); + self.prometheus_wrapper.set_float( + MixnetIngressFinalHopPacketsDroppedRate, + diff.mixnet.ingress.final_hop_packets_dropped_sec, + ); + + // ## EGRESS + self.prometheus_wrapper.set_float( + MixnetEgressForwardPacketsSendRate, + diff.mixnet.egress.forward_hop_packets_sent_sec, + ); + self.prometheus_wrapper.set_float( + MixnetEgressAckSendRate, + diff.mixnet.egress.ack_packets_sent_sec, + ); + self.prometheus_wrapper.set_float( + MixnetEgressForwardPacketsDroppedRate, + diff.mixnet.egress.forward_hop_packets_dropped_sec, + ); + + // # WIREGUARD + self.prometheus_wrapper + .set_float(WireguardBytesRxRate, diff.wireguard.bytes_rx_sec); + self.prometheus_wrapper + .set_float(WireguardBytesTxRate, diff.wireguard.bytes_tx_sec); + } + self.at_last_update = updated; + } +} + +#[async_trait] +impl MetricsHandler for PrometheusGlobalNodeMetricsRegistryUpdater { + type Events = GlobalPrometheusData; + + async fn handle_event(&mut self, _event: Self::Events) { + panic!("this should have never been called! MetricsHandler has been incorrectly called on PrometheusNodeMetricsRegistryUpdater") + } +} diff --git a/nym-node/src/node/metrics/handler/legacy_packet_data.rs b/nym-node/src/node/metrics/handler/legacy_packet_data.rs index 4b649e831a1..6240993797b 100644 --- a/nym-node/src/node/metrics/handler/legacy_packet_data.rs +++ b/nym-node/src/node/metrics/handler/legacy_packet_data.rs @@ -1,5 +1,5 @@ // Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 +// SPDX-License-Identifier: GPL-3.0-only use crate::node::metrics::handler::{ MetricsHandler, OnStartMetricsHandler, OnUpdateMetricsHandler, diff --git a/nym-node/src/node/metrics/handler/mod.rs b/nym-node/src/node/metrics/handler/mod.rs index 025b1419b7f..764acc0ff98 100644 --- a/nym-node/src/node/metrics/handler/mod.rs +++ b/nym-node/src/node/metrics/handler/mod.rs @@ -1,5 +1,5 @@ // Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 +// SPDX-License-Identifier: GPL-3.0-only use async_trait::async_trait; use std::any; @@ -9,8 +9,11 @@ use tokio::time::Instant; use tracing::trace; pub(crate) mod client_sessions; +pub(crate) mod global_prometheus_updater; pub(crate) mod legacy_packet_data; pub(crate) mod mixnet_data_cleaner; +pub(crate) mod pending_egress_packets_updater; +pub(crate) mod prometheus_events_handler; pub(crate) trait RegistrableHandler: Downcast + OnStartMetricsHandler + OnUpdateMetricsHandler + Send + Sync + 'static @@ -63,23 +66,23 @@ pub(crate) trait OnStartMetricsHandler { #[async_trait] pub(crate) trait OnUpdateMetricsHandler { - async fn on_update(&mut self); + async fn on_update(&mut self) {} } pub(crate) struct HandlerWrapper { handler: Box>, - update_interval: Duration, + update_interval: Option, last_updated: Instant, } impl HandlerWrapper { - pub fn new(update_interval: Duration, handler: U) -> Self + pub fn new(update_interval: impl Into>, handler: U) -> Self where U: MetricsHandler, { HandlerWrapper { handler: Box::new(handler), - update_interval, + update_interval: update_interval.into(), last_updated: Instant::now(), } } @@ -107,11 +110,15 @@ impl OnStartMetricsHandler for HandlerWrapper { #[async_trait] impl OnUpdateMetricsHandler for HandlerWrapper { async fn on_update(&mut self) { + let Some(update_interval) = self.update_interval else { + return; + }; + let name = any::type_name::(); trace!("on update for handler for events of type {name}"); let elapsed = self.last_updated.elapsed(); - if elapsed < self.update_interval { + if elapsed < update_interval { trace!("too soon for updates"); return; } diff --git a/nym-node/src/node/metrics/handler/pending_egress_packets_updater.rs b/nym-node/src/node/metrics/handler/pending_egress_packets_updater.rs new file mode 100644 index 00000000000..d1fde40f70e --- /dev/null +++ b/nym-node/src/node/metrics/handler/pending_egress_packets_updater.rs @@ -0,0 +1,60 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::node::metrics::handler::{ + MetricsHandler, OnStartMetricsHandler, OnUpdateMetricsHandler, +}; +use async_trait::async_trait; +use nym_gateway::node::ActiveClientsStore; +use nym_mixnet_client::client::ActiveConnections; +use nym_node_metrics::NymNodeMetrics; + +// it can be anything, we just need a unique type_id to register our handler +pub struct PendingEgressPackets; + +pub struct PendingEgressPacketsUpdater { + metrics: NymNodeMetrics, + active_websocket_clients: ActiveClientsStore, + active_mixnet_connections: ActiveConnections, +} + +impl PendingEgressPacketsUpdater { + pub(crate) fn new( + metrics: NymNodeMetrics, + active_clients: ActiveClientsStore, + active_mixnet_connections: ActiveConnections, + ) -> Self { + PendingEgressPacketsUpdater { + metrics, + active_websocket_clients: active_clients, + active_mixnet_connections, + } + } +} + +#[async_trait] +impl OnStartMetricsHandler for PendingEgressPacketsUpdater {} + +#[async_trait] +impl OnUpdateMetricsHandler for PendingEgressPacketsUpdater { + async fn on_update(&mut self) { + let pending_final = self.active_websocket_clients.pending_packets(); + self.metrics + .process + .update_final_hop_packets_pending_delivery(pending_final); + + let pending_forward = self.active_mixnet_connections.pending_packets(); + self.metrics + .process + .update_forward_hop_packets_pending_delivery(pending_forward) + } +} + +#[async_trait] +impl MetricsHandler for PendingEgressPacketsUpdater { + type Events = PendingEgressPackets; + + async fn handle_event(&mut self, _event: Self::Events) { + panic!("this should have never been called! MetricsHandler has been incorrectly called on PendingEgressPacketsUpdater") + } +} diff --git a/nym-node/src/node/metrics/handler/prometheus_events_handler.rs b/nym-node/src/node/metrics/handler/prometheus_events_handler.rs new file mode 100644 index 00000000000..28a16962787 --- /dev/null +++ b/nym-node/src/node/metrics/handler/prometheus_events_handler.rs @@ -0,0 +1,6 @@ +// Copyright 2024 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +// pub struct PrometheusEventsHandler { +// // +// } diff --git a/nym-node/src/node/mixnet/handler.rs b/nym-node/src/node/mixnet/handler.rs index 6cfb7d50d11..e645636c963 100644 --- a/nym-node/src/node/mixnet/handler.rs +++ b/nym-node/src/node/mixnet/handler.rs @@ -112,7 +112,14 @@ impl ConnectionHandler { .await { Err(err) => error!("Failed to store client data - {err}"), - Ok(_) => trace!("Stored packet for {client}"), + Ok(_) => { + self.shared + .metrics + .mixnet + .egress + .add_disk_persisted_packet(); + trace!("Stored packet for {client}") + } } } Ok(_) => trace!("Pushed received packet to {client}"), diff --git a/nym-node/src/node/mixnet/packet_forwarding.rs b/nym-node/src/node/mixnet/packet_forwarding.rs index bcd51a52a9e..5e70841c866 100644 --- a/nym-node/src/node/mixnet/packet_forwarding.rs +++ b/nym-node/src/node/mixnet/packet_forwarding.rs @@ -1,5 +1,5 @@ // Copyright 2024 - Nym Technologies SA -// SPDX-License-Identifier: Apache-2.0 +// SPDX-License-Identifier: GPL-3.0-only use futures::StreamExt; use nym_mixnet_client::forwarder::{ @@ -80,7 +80,7 @@ impl PacketForwarder { C: SendWithoutResponse, { let delayed_packet = packet.into_inner(); - self.forward_packet(delayed_packet) + self.forward_packet(delayed_packet); } fn handle_new_packet(&mut self, new_packet: PacketToForward) @@ -102,6 +102,18 @@ impl PacketForwarder { } } + fn update_queue_len_metric(&self) { + self.metrics + .process + .update_forward_hop_packets_being_delayed(self.delay_queue.len()); + } + + fn update_channel_size_metric(&self, channel_size: usize) { + self.metrics + .process + .update_packet_forwarder_queue_size(channel_size) + } + pub async fn run(&mut self) where C: SendWithoutResponse, @@ -125,18 +137,21 @@ impl PacketForwarder { // and hence it can't happen that ALL senders are dropped #[allow(clippy::unwrap_used)] self.handle_new_packet(new_packet.unwrap()); + let channel_len = self.packet_sender.len(); if processed % 1000 == 0 { - let queue_len = self.packet_sender.len(); - match queue_len { + match channel_len { n if n > 200 => error!("there are currently {n} mix packets waiting to get forwarded!"), n if n > 50 => warn!("there are currently {n} mix packets waiting to get forwarded"), n => trace!("there are currently {n} mix packets waiting to get forwarded"), } } - + self.update_channel_size_metric(channel_len); processed += 1; } } + + // update the metrics on either new packet being inserted or packet being removed + self.update_queue_len_metric(); } trace!("PacketForwarder: Exiting"); } diff --git a/nym-node/src/node/mod.rs b/nym-node/src/node/mod.rs index 0f2ac89fe0a..f91fce211b7 100644 --- a/nym-node/src/node/mod.rs +++ b/nym-node/src/node/mod.rs @@ -21,8 +21,10 @@ use crate::node::http::{HttpServerConfig, NymNodeHttpServer, NymNodeRouter}; use crate::node::metrics::aggregator::MetricsAggregator; use crate::node::metrics::console_logger::ConsoleLogger; use crate::node::metrics::handler::client_sessions::GatewaySessionStatsHandler; +use crate::node::metrics::handler::global_prometheus_updater::PrometheusGlobalNodeMetricsRegistryUpdater; use crate::node::metrics::handler::legacy_packet_data::LegacyMixingStatsUpdater; use crate::node::metrics::handler::mixnet_data_cleaner::MixnetMetricsCleaner; +use crate::node::metrics::handler::pending_egress_packets_updater::PendingEgressPacketsUpdater; use crate::node::mixnet::packet_forwarding::PacketForwarder; use crate::node::mixnet::shared::ProcessingConfig; use crate::node::mixnet::SharedFinalHopData; @@ -30,6 +32,7 @@ use crate::node::shared_topology::NymNodeTopologyProvider; use nym_bin_common::bin_info; use nym_crypto::asymmetric::{ed25519, x25519}; use nym_gateway::node::{ActiveClientsStore, GatewayTasksBuilder}; +use nym_mixnet_client::client::ActiveConnections; use nym_mixnet_client::forwarder::MixForwardingSender; use nym_network_requester::{ set_active_gateway, setup_fs_gateways_storage, store_gateway_details, CustomGatewayDetails, @@ -751,7 +754,8 @@ impl NymNode { .with_authenticator_details(auth_details) .with_used_exit_policy(exit_policy_details) .with_description(self.description.clone()) - .with_auxiliary_details(auxiliary_details); + .with_auxiliary_details(auxiliary_details) + .with_prometheus_bearer_token(self.config.http.access_token.clone()); if self.config.http.expose_system_info { config = config.with_system_info(get_system_info( @@ -772,8 +776,7 @@ impl NymNode { config.api.v1_config.node.roles.ip_packet_router_enabled = true; } - let app_state = AppState::new(self.metrics.clone(), self.verloc_stats.clone()) - .with_metrics_key(self.config.http.access_token.clone()); + let app_state = AppState::new(self.metrics.clone(), self.verloc_stats.clone()); Ok(NymNodeRouter::new(config, app_state) .build_server(&self.config.http.bind_address) @@ -844,7 +847,12 @@ impl NymNode { tokio::spawn(async move { verloc_measurer.run().await }); } - pub(crate) fn setup_metrics_backend(&self, shutdown: TaskClient) -> MetricEventsSender { + pub(crate) fn setup_metrics_backend( + &self, + active_clients_store: ActiveClientsStore, + active_egress_mixnet_connections: ActiveConnections, + shutdown: TaskClient, + ) -> MetricEventsSender { info!("setting up node metrics..."); // aggregator (to listen for any metrics events) @@ -870,12 +878,35 @@ impl NymNode { self.config.metrics.debug.clients_sessions_update_rate, ); - // handler for periodically cleaning up stale recipient/sender darta + // handler for periodically cleaning up stale recipient/sender data metrics_aggregator.register_handler( MixnetMetricsCleaner::new(self.metrics.clone()), self.config.metrics.debug.stale_mixnet_metrics_cleaner_rate, ); + // handler for updating the value of forward/final hop packets pending delivery + metrics_aggregator.register_handler( + PendingEgressPacketsUpdater::new( + self.metrics.clone(), + active_clients_store, + active_egress_mixnet_connections, + ), + self.config.metrics.debug.pending_egress_packets_update_rate, + ); + + // handler for updating the prometheus registry from the global atomic metrics counters + // such as number of packets received + metrics_aggregator.register_handler( + PrometheusGlobalNodeMetricsRegistryUpdater::new(self.metrics.clone()), + self.config + .metrics + .debug + .global_prometheus_counters_update_rate, + ); + + // handler for handling prometheus metrics events + // metrics_aggregator.register_handler(PrometheusEventsHandler{}, None); + // note: we're still measuring things such as number of mixed packets, // but since they're stored as atomic integers, they are incremented directly at source // rather than going through event pipeline @@ -909,7 +940,7 @@ impl NymNode { &self, active_clients_store: &ActiveClientsStore, shutdown: TaskClient, - ) -> MixForwardingSender { + ) -> (MixForwardingSender, ActiveConnections) { let processing_config = ProcessingConfig::new(&self.config); // we're ALWAYS listening for mixnet packets, either for forward or final hops (or both) @@ -926,7 +957,13 @@ impl NymNode { self.config.mixnet.debug.initial_connection_timeout, self.config.mixnet.debug.maximum_connection_buffer_size, ); - let mixnet_client = nym_mixnet_client::Client::new(mixnet_client_config); + let mixnet_client = nym_mixnet_client::Client::new( + mixnet_client_config, + self.metrics + .network + .active_egress_mixnet_connections_counter(), + ); + let active_connections = mixnet_client.active_connections(); let mut packet_forwarder = PacketForwarder::new( mixnet_client, @@ -951,7 +988,7 @@ impl NymNode { ); mixnet::Listener::new(self.config.mixnet.bind_address, shared).start(); - mix_packet_sender + (mix_packet_sender, active_connections) } pub(crate) async fn run(mut self) -> Result<(), NymNodeError> { @@ -981,14 +1018,19 @@ impl NymNode { self.start_verloc_measurements(task_manager.subscribe_named("verloc-measurements")); - let metrics_sender = self.setup_metrics_backend(task_manager.subscribe_named("metrics")); let active_clients_store = ActiveClientsStore::new(); - let mix_packet_sender = self.start_mixnet_listener( + let (mix_packet_sender, active_egress_mixnet_connections) = self.start_mixnet_listener( &active_clients_store, task_manager.subscribe_named("mixnet-traffic"), ); + let metrics_sender = self.setup_metrics_backend( + active_clients_store.clone(), + active_egress_mixnet_connections, + task_manager.subscribe_named("metrics"), + ); + self.start_gateway_tasks( metrics_sender, active_clients_store, diff --git a/nym-node/src/node/shared_topology.rs b/nym-node/src/node/shared_topology.rs index b65f1e1a5bb..f3ba72fd009 100644 --- a/nym-node/src/node/shared_topology.rs +++ b/nym-node/src/node/shared_topology.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use nym_gateway::node::{NymApiTopologyProvider, NymApiTopologyProviderConfig, UserAgent}; +use nym_node_metrics::prometheus_wrapper::{PrometheusMetric, PROMETHEUS_METRICS}; use nym_topology::node::RoutingNode; use nym_topology::{NymTopology, Role, TopologyProvider}; use std::sync::Arc; @@ -11,7 +12,6 @@ use time::OffsetDateTime; use tokio::sync::Mutex; use tracing::debug; use url::Url; - // I wouldn't be surprised if this became the start of the node topology cache #[derive(Clone)] @@ -97,6 +97,10 @@ impl TopologyProvider for NymNodeTopologyProvider { if let Some(cached) = guard.cached_topology() { return Some(cached); } + + // the observation will be included on drop + let _timer = + PROMETHEUS_METRICS.start_timer(PrometheusMetric::ProcessTopologyQueryResolutionLatency); guard.update_cache().await } } diff --git a/service-providers/ip-packet-router/src/error.rs b/service-providers/ip-packet-router/src/error.rs index 28e21e6edd1..7c8a4590d5d 100644 --- a/service-providers/ip-packet-router/src/error.rs +++ b/service-providers/ip-packet-router/src/error.rs @@ -29,7 +29,7 @@ pub enum IpPacketRouterError { #[error("failed to connect to mixnet: {source}")] FailedToConnectToMixnet { source: nym_sdk::Error }, - #[error("the entity wrapping the network requester has disconnected")] + #[error("the entity wrapping the ip packet router has disconnected")] DisconnectedParent, #[error("received packet has an invalid version: {0}")] diff --git a/service-providers/ip-packet-router/src/request_filter/mod.rs b/service-providers/ip-packet-router/src/request_filter/mod.rs index c11fb6c9d6b..599a00a1d20 100644 --- a/service-providers/ip-packet-router/src/request_filter/mod.rs +++ b/service-providers/ip-packet-router/src/request_filter/mod.rs @@ -56,13 +56,10 @@ impl RequestFilter { pub(crate) async fn check_address(&self, address: &SocketAddr) -> bool { match &*self.inner { RequestFilterInner::ExitPolicy { policy_filter } => { - match policy_filter.check(address).await { - Err(err) => { - warn!("failed to validate '{address}' against the exit policy: {err}"); - false - } - Ok(res) => res, - } + policy_filter.check(address).await.unwrap_or_else(|err| { + warn!("failed to validate '{address}' against the exit policy: {err}"); + false + }) } } }