Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gather more cluster metrics #4597

Merged
merged 1 commit into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::member::{
GRPC_ADVERTISE_ADDR_KEY, PIPELINE_METRICS_PREFIX, READINESS_KEY, READINESS_VALUE_NOT_READY,
READINESS_VALUE_READY,
};
use crate::metrics::spawn_metrics_task;
use crate::ClusterNode;

const MARKED_FOR_DELETION_GRACE_PERIOD: usize = if cfg!(any(test, feature = "testsuite")) {
Expand Down Expand Up @@ -161,8 +162,11 @@ impl Cluster {
let chitchat = chitchat_handle.chitchat();
let live_nodes_stream = chitchat.lock().await.live_nodes_watcher();
let (ready_members_tx, ready_members_rx) = watch::channel(Vec::new());

spawn_ready_members_task(cluster_id.clone(), live_nodes_stream, ready_members_tx);

let weak_chitchat = Arc::downgrade(&chitchat);
spawn_metrics_task(weak_chitchat, self_node.chitchat_id());

let inner = InnerCluster {
cluster_id: cluster_id.clone(),
self_chitchat_id: self_node.chitchat_id(),
Expand Down
10 changes: 7 additions & 3 deletions quickwit/quickwit-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,17 @@ impl Transport for CountingUdpTransport {
let socket = UdpSocket::open(listen_addr).await?;
Ok(Box::new(CountingUdpSocket {
socket,
gossip_recv: crate::metrics::CLUSTER_METRICS.gossip_recv_total.clone(),
gossip_recv: crate::metrics::CLUSTER_METRICS
.gossip_recv_messages_total
.clone(),
gossip_recv_bytes: crate::metrics::CLUSTER_METRICS
.gossip_recv_bytes_total
.clone(),
gossip_send: crate::metrics::CLUSTER_METRICS.gossip_send_total.clone(),
gossip_send: crate::metrics::CLUSTER_METRICS
.gossip_sent_messages_total
.clone(),
gossip_send_bytes: crate::metrics::CLUSTER_METRICS
.gossip_send_bytes_total
.gossip_sent_bytes_total
.clone(),
}))
}
Expand Down
15 changes: 14 additions & 1 deletion quickwit/quickwit-cluster/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::HashSet;
use std::mem::size_of;
use std::net::SocketAddr;
use std::str::FromStr;

use anyhow::Context;
use chitchat::{ChitchatId, NodeState};
use chitchat::{ChitchatId, NodeState, Version};
use quickwit_proto::indexing::{CpuCapacity, IndexingTask};
use quickwit_proto::types::NodeId;
use tracing::{error, warn};
Expand All @@ -46,6 +47,8 @@ pub(crate) trait NodeStateExt {
fn grpc_advertise_addr(&self) -> anyhow::Result<SocketAddr>;

fn is_ready(&self) -> bool;

fn size_bytes(&self) -> usize;
}

impl NodeStateExt for NodeState {
Expand All @@ -66,6 +69,16 @@ impl NodeStateExt for NodeState {
.map(|health_value| health_value == READINESS_VALUE_READY)
.unwrap_or(false)
}

// TODO: Expose more accurate size of the state in Chitchat.
fn size_bytes(&self) -> usize {
const SIZE_OF_VERSION: usize = size_of::<Version>();
const SIZE_OF_TOMBSTONE: usize = size_of::<u64>();

self.key_values()
.map(|(key, value)| key.len() + value.value.len() + SIZE_OF_VERSION + SIZE_OF_TOMBSTONE)
.sum()
}
}

/// Cluster member.
Expand Down
128 changes: 118 additions & 10 deletions quickwit/quickwit-cluster/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,71 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::net::SocketAddr;
use std::sync::Weak;
use std::time::Duration;

use chitchat::{Chitchat, ChitchatId};
use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, IntCounter};
use quickwit_common::metrics::{new_counter, new_gauge, IntCounter, IntGauge};
use tokio::sync::Mutex;

use crate::member::NodeStateExt;

pub struct ClusterMetrics {
pub gossip_recv_total: IntCounter,
pub live_nodes: IntGauge,
pub ready_nodes: IntGauge,
pub zombie_nodes: IntGauge,
pub dead_nodes: IntGauge,
pub cluster_state_size_bytes: IntGauge,
pub node_state_size_bytes: IntGauge,
pub node_state_keys: IntGauge,
pub gossip_recv_messages_total: IntCounter,
pub gossip_recv_bytes_total: IntCounter,
pub gossip_send_total: IntCounter,
pub gossip_send_bytes_total: IntCounter,
pub gossip_sent_messages_total: IntCounter,
pub gossip_sent_bytes_total: IntCounter,
}

impl Default for ClusterMetrics {
fn default() -> Self {
ClusterMetrics {
gossip_recv_total: new_counter(
"gossip_recv_total",
live_nodes: new_gauge(
"live_nodes",
"The number of live nodes observed locally.",
"cluster",
),
ready_nodes: new_gauge(
"ready_nodes",
"The number of ready nodes observed locally.",
"cluster",
),
zombie_nodes: new_gauge(
"zombie_nodes",
"The number of zombie nodes observed locally.",
"cluster",
),
dead_nodes: new_gauge(
"dead_nodes",
"The number of dead nodes observed locally.",
"cluster",
),
cluster_state_size_bytes: new_gauge(
"cluster_state_size_bytes",
"The size of the cluster state in bytes.",
"cluster",
),
node_state_keys: new_gauge(
"node_state_keys",
"The number of keys in the node state.",
"cluster",
),
node_state_size_bytes: new_gauge(
"node_state_size_bytes",
"The size of the node state in bytes.",
"cluster",
),
gossip_recv_messages_total: new_counter(
"gossip_recv_messages_total",
"Total number of gossip messages received.",
"cluster",
),
Expand All @@ -40,13 +90,13 @@ impl Default for ClusterMetrics {
"Total amount of gossip data received in bytes.",
"cluster",
),
gossip_send_total: new_counter(
"gossip_send_total",
gossip_sent_messages_total: new_counter(
"gossip_sent_messages_total",
"Total number of gossip messages sent.",
"cluster",
),
gossip_send_bytes_total: new_counter(
"gossip_send_bytes_total",
gossip_sent_bytes_total: new_counter(
"gossip_sent_bytes_total",
"Total amount of gossip data sent in bytes.",
"cluster",
),
Expand All @@ -55,3 +105,61 @@ impl Default for ClusterMetrics {
}

pub static CLUSTER_METRICS: Lazy<ClusterMetrics> = Lazy::new(ClusterMetrics::default);

pub(crate) fn spawn_metrics_task(
weak_chitchat: Weak<Mutex<Chitchat>>,
self_chitchat_id: ChitchatId,
) {
const METRICS_INTERVAL: Duration = Duration::from_secs(15);

const SIZE_OF_GENERATION_ID: usize = std::mem::size_of::<u64>();
const SIZE_OF_SOCKET_ADDR: usize = std::mem::size_of::<SocketAddr>();

let future = async move {
let mut interval = tokio::time::interval(METRICS_INTERVAL);

while let Some(chitchat) = weak_chitchat.upgrade() {
interval.tick().await;

let mut num_ready_nodes = 0;
let mut cluster_state_size_bytes = 0;

let chitchat_guard = chitchat.lock().await;

let num_live_nodes = chitchat_guard.live_nodes().count();
let num_zombie_nodes = chitchat_guard.scheduled_for_deletion_nodes().count();
let num_dead_nodes = chitchat_guard.dead_nodes().count();

for (chitchat_id, node_state) in chitchat_guard.node_states() {
if node_state.is_ready() {
num_ready_nodes += 1;
}
let chitchat_id_size_bytes =
chitchat_id.node_id.len() + SIZE_OF_GENERATION_ID + SIZE_OF_SOCKET_ADDR;
let node_state_size_bytes = node_state.size_bytes();

cluster_state_size_bytes += chitchat_id_size_bytes + node_state_size_bytes;

if *chitchat_id == self_chitchat_id {
CLUSTER_METRICS
.node_state_keys
.set(node_state.num_key_values() as i64);
CLUSTER_METRICS
.node_state_size_bytes
.set(node_state_size_bytes as i64);
}
}
drop(chitchat_guard);

CLUSTER_METRICS.live_nodes.set(num_live_nodes as i64);
CLUSTER_METRICS.ready_nodes.set(num_ready_nodes as i64);
CLUSTER_METRICS.zombie_nodes.set(num_zombie_nodes as i64);
CLUSTER_METRICS.dead_nodes.set(num_dead_nodes as i64);

CLUSTER_METRICS
.cluster_state_size_bytes
.set(cluster_state_size_bytes as i64);
}
};
tokio::spawn(future);
}
Loading