diff --git a/quickwit/quickwit-cluster/src/cluster.rs b/quickwit/quickwit-cluster/src/cluster.rs
index 8ee6fe30ac8..9dc17e3a61f 100644
--- a/quickwit/quickwit-cluster/src/cluster.rs
+++ b/quickwit/quickwit-cluster/src/cluster.rs
@@ -47,6 +47,7 @@ use crate::member::{
GRPC_ADVERTISE_ADDR_KEY, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
+use crate::metrics::spawn_metrics_task;
use crate::ClusterNode;
const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) {
@@ -161,8 +162,11 @@ impl Cluster {
let chitchat = chitchat_handle.chitchat();
let live_nodes_stream = chitchat.lock().await.live_nodes_watcher();
let (ready_members_tx, ready_members_rx) = watch::channel(Vec::new());
-
spawn_ready_members_task(cluster_id.clone(), live_nodes_stream, ready_members_tx);
+
+ let weak_chitchat = Arc::downgrade(&chitchat);
+ spawn_metrics_task(weak_chitchat, self_node.chitchat_id());
+
let inner = InnerCluster {
cluster_id: cluster_id.clone(),
self_chitchat_id: self_node.chitchat_id(),
diff --git a/quickwit/quickwit-cluster/src/lib.rs b/quickwit/quickwit-cluster/src/lib.rs
index 8d6e0e2ce94..ab98c004f35 100644
--- a/quickwit/quickwit-cluster/src/lib.rs
+++ b/quickwit/quickwit-cluster/src/lib.rs
@@ -102,13 +102,17 @@ impl Transport for CountingUdpTransport {
let socket = UdpSocket::open(listen_addr).await?;
Ok(Box::new(CountingUdpSocket {
socket,
- gossip_recv: crate::metrics::CLUSTER_METRICS.gossip_recv_total.clone(),
+ gossip_recv: crate::metrics::CLUSTER_METRICS
+ .gossip_recv_messages_total
+ .clone(),
gossip_recv_bytes: crate::metrics::CLUSTER_METRICS
.gossip_recv_bytes_total
.clone(),
- gossip_send: crate::metrics::CLUSTER_METRICS.gossip_send_total.clone(),
+ gossip_send: crate::metrics::CLUSTER_METRICS
+ .gossip_sent_messages_total
+ .clone(),
gossip_send_bytes: crate::metrics::CLUSTER_METRICS
- .gossip_send_bytes_total
+ .gossip_sent_bytes_total
.clone(),
}))
}
diff --git a/quickwit/quickwit-cluster/src/member.rs b/quickwit/quickwit-cluster/src/member.rs
index 9fcaf935451..83e7f6878e8 100644
--- a/quickwit/quickwit-cluster/src/member.rs
+++ b/quickwit/quickwit-cluster/src/member.rs
@@ -18,11 +18,12 @@
// along with this program. If not, see .
use std::collections::HashSet;
+use std::mem::size_of;
use std::net::SocketAddr;
use std::str::FromStr;
use anyhow::Context;
-use chitchat::{ChitchatId, NodeState};
+use chitchat::{ChitchatId, NodeState, Version};
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::{error, warn};
@@ -46,6 +47,8 @@ pub(crate) trait NodeStateExt {
fn grpc_advertise_addr(&self) -> anyhow::Result;
fn is_ready(&self) -> bool;
+
+ fn size_bytes(&self) -> usize;
}
impl NodeStateExt for NodeState {
@@ -66,6 +69,16 @@ impl NodeStateExt for NodeState {
.map(|health_value| health_value == READINESS_VALUE_READY)
.unwrap_or(false)
}
+
+ // TODO: Expose more accurate size of the state in Chitchat.
+ fn size_bytes(&self) -> usize {
+ const SIZE_OF_VERSION: usize = size_of::();
+ const SIZE_OF_TOMBSTONE: usize = size_of::();
+
+ self.key_values()
+ .map(|(key, value)| key.len() + value.value.len() + SIZE_OF_VERSION + SIZE_OF_TOMBSTONE)
+ .sum()
+ }
}
/// Cluster member.
diff --git a/quickwit/quickwit-cluster/src/metrics.rs b/quickwit/quickwit-cluster/src/metrics.rs
index 2358843f779..8ec0f4b9bfd 100644
--- a/quickwit/quickwit-cluster/src/metrics.rs
+++ b/quickwit/quickwit-cluster/src/metrics.rs
@@ -17,21 +17,71 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
+use std::net::SocketAddr;
+use std::sync::Weak;
+use std::time::Duration;
+
+use chitchat::{Chitchat, ChitchatId};
use once_cell::sync::Lazy;
-use quickwit_common::metrics::{new_counter, IntCounter};
+use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge};
+use tokio::sync::Mutex;
+
+use crate::member::NodeStateExt;
pub struct ClusterMetrics {
- pub gossip_recv_total: IntCounter,
+ pub live_nodes: IntGauge,
+ pub ready_nodes: IntGauge,
+ pub zombie_nodes: IntGauge,
+ pub dead_nodes: IntGauge,
+ pub cluster_state_size_bytes: IntGauge,
+ pub node_state_size_bytes: IntGauge,
+ pub node_state_keys: IntGauge,
+ pub gossip_recv_messages_total: IntCounter,
pub gossip_recv_bytes_total: IntCounter,
- pub gossip_send_total: IntCounter,
- pub gossip_send_bytes_total: IntCounter,
+ pub gossip_sent_messages_total: IntCounter,
+ pub gossip_sent_bytes_total: IntCounter,
}
impl Default for ClusterMetrics {
fn default() -> Self {
ClusterMetrics {
- gossip_recv_total: new_counter(
- "gossip_recv_total",
+ live_nodes: new_gauge(
+ "live_nodes",
+ "The number of live nodes observed locally.",
+ "cluster",
+ ),
+ ready_nodes: new_gauge(
+ "ready_nodes",
+ "The number of ready nodes observed locally.",
+ "cluster",
+ ),
+ zombie_nodes: new_gauge(
+ "zombie_nodes",
+ "The number of zombie nodes observed locally.",
+ "cluster",
+ ),
+ dead_nodes: new_gauge(
+ "dead_nodes",
+ "The number of dead nodes observed locally.",
+ "cluster",
+ ),
+ cluster_state_size_bytes: new_gauge(
+ "cluster_state_size_bytes",
+ "The size of the cluster state in bytes.",
+ "cluster",
+ ),
+ node_state_keys: new_gauge(
+ "node_state_keys",
+ "The number of keys in the node state.",
+ "cluster",
+ ),
+ node_state_size_bytes: new_gauge(
+ "node_state_size_bytes",
+ "The size of the node state in bytes.",
+ "cluster",
+ ),
+ gossip_recv_messages_total: new_counter(
+ "gossip_recv_messages_total",
"Total number of gossip messages received.",
"cluster",
),
@@ -40,13 +90,13 @@ impl Default for ClusterMetrics {
"Total amount of gossip data received in bytes.",
"cluster",
),
- gossip_send_total: new_counter(
- "gossip_send_total",
+ gossip_sent_messages_total: new_counter(
+ "gossip_sent_messages_total",
"Total number of gossip messages sent.",
"cluster",
),
- gossip_send_bytes_total: new_counter(
- "gossip_send_bytes_total",
+ gossip_sent_bytes_total: new_counter(
+ "gossip_sent_bytes_total",
"Total amount of gossip data sent in bytes.",
"cluster",
),
@@ -55,3 +105,61 @@ impl Default for ClusterMetrics {
}
pub static CLUSTER_METRICS: Lazy = Lazy::new(ClusterMetrics::default);
+
+pub(crate) fn spawn_metrics_task(
+ weak_chitchat: Weak>,
+ self_chitchat_id: ChitchatId,
+) {
+ const METRICS_INTERVAL: Duration = Duration::from_secs(15);
+
+ const SIZE_OF_GENERATION_ID: usize = std::mem::size_of::();
+ const SIZE_OF_SOCKET_ADDR: usize = std::mem::size_of::();
+
+ let future = async move {
+ let mut interval = tokio::time::interval(METRICS_INTERVAL);
+
+ while let Some(chitchat) = weak_chitchat.upgrade() {
+ interval.tick().await;
+
+ let mut num_ready_nodes = 0;
+ let mut cluster_state_size_bytes = 0;
+
+ let chitchat_guard = chitchat.lock().await;
+
+ let num_live_nodes = chitchat_guard.live_nodes().count();
+ let num_zombie_nodes = chitchat_guard.scheduled_for_deletion_nodes().count();
+ let num_dead_nodes = chitchat_guard.dead_nodes().count();
+
+ for (chitchat_id, node_state) in chitchat_guard.node_states() {
+ if node_state.is_ready() {
+ num_ready_nodes += 1;
+ }
+ let chitchat_id_size_bytes =
+ chitchat_id.node_id.len() + SIZE_OF_GENERATION_ID + SIZE_OF_SOCKET_ADDR;
+ let node_state_size_bytes = node_state.size_bytes();
+
+ cluster_state_size_bytes += chitchat_id_size_bytes + node_state_size_bytes;
+
+ if *chitchat_id == self_chitchat_id {
+ CLUSTER_METRICS
+ .node_state_keys
+ .set(node_state.num_key_values() as i64);
+ CLUSTER_METRICS
+ .node_state_size_bytes
+ .set(node_state_size_bytes as i64);
+ }
+ }
+ drop(chitchat_guard);
+
+ CLUSTER_METRICS.live_nodes.set(num_live_nodes as i64);
+ CLUSTER_METRICS.ready_nodes.set(num_ready_nodes as i64);
+ CLUSTER_METRICS.zombie_nodes.set(num_zombie_nodes as i64);
+ CLUSTER_METRICS.dead_nodes.set(num_dead_nodes as i64);
+
+ CLUSTER_METRICS
+ .cluster_state_size_bytes
+ .set(cluster_state_size_bytes as i64);
+ }
+ };
+ tokio::spawn(future);
+}