Skip to content

Commit

Permalink
Merge pull request MaterializeInc#30568 from teskje/storage-wallclock…
Browse files Browse the repository at this point in the history
…-metrics

controller: export wallclock lag metrics also for storage collections
  • Loading branch information
teskje authored Dec 3, 2024
2 parents 44b073b + 7220e11 commit 6c0ebf5
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 128 deletions.
1 change: 1 addition & 0 deletions src/cluster-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use anyhow::bail;
use serde::{Deserialize, Serialize};

pub mod client;
pub mod metrics;

/// A function that computes the lag between the given time and wallclock time.
pub type WallclockLagFn<T> = Arc<dyn Fn(&T) -> Duration + Send + Sync>;
Expand Down
134 changes: 134 additions & 0 deletions src/cluster-client/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Metrics shared by both compute and storage.
use std::time::Duration;

use mz_ore::metric;
use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, GaugeVec, IntCounterVec, MetricsRegistry,
};
use mz_ore::stats::SlidingMinMax;
use prometheus::core::{AtomicF64, AtomicU64};

/// Controller metrics.
#[derive(Debug, Clone)]
pub struct ControllerMetrics {
dataflow_wallclock_lag_seconds: GaugeVec,
dataflow_wallclock_lag_seconds_sum: CounterVec,
dataflow_wallclock_lag_seconds_count: IntCounterVec,
}

impl ControllerMetrics {
/// Create a metrics instance registered into the given registry.
pub fn new(metrics_registry: &MetricsRegistry) -> Self {
Self {
// The next three metrics immitate a summary metric type. The `prometheus` crate lacks
// support for summaries, so we roll our own. Note that we also only expose the 0- and
// the 1-quantile, i.e., minimum and maximum lag values.
dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds",
help: "A summary of the second-by-second lag of the dataflow frontier relative \
to wallclock time, aggregated over the last minute.",
var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
)),
dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_sum",
help: "The total sum of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_count",
help: "The total count of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
}
}

/// Return an object that tracks wallclock lag metrics for the given collection on the given
/// cluster and replica.
pub fn wallclock_lag_metrics(
&self,
collection_id: String,
instance_id: Option<String>,
replica_id: Option<String>,
) -> WallclockLagMetrics {
let labels = vec![
instance_id.unwrap_or_default(),
replica_id.unwrap_or_default(),
collection_id,
];

let labels_with_quantile = |quantile: &str| {
labels
.iter()
.cloned()
.chain([quantile.to_string()])
.collect()
};

let wallclock_lag_seconds_min = self
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("0"));
let wallclock_lag_seconds_max = self
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("1"));
let wallclock_lag_seconds_sum = self
.dataflow_wallclock_lag_seconds_sum
.get_delete_on_drop_metric(labels.clone());
let wallclock_lag_seconds_count = self
.dataflow_wallclock_lag_seconds_count
.get_delete_on_drop_metric(labels);
let wallclock_lag_minmax = SlidingMinMax::new(60);

WallclockLagMetrics {
wallclock_lag_seconds_min,
wallclock_lag_seconds_max,
wallclock_lag_seconds_sum,
wallclock_lag_seconds_count,
wallclock_lag_minmax,
}
}
}

/// Metrics tracking frontier wallclock lag for a collection.
#[derive(Debug)]
pub struct WallclockLagMetrics {
/// Gauge tracking minimum dataflow wallclock lag.
wallclock_lag_seconds_min: DeleteOnDropGauge<'static, AtomicF64, Vec<String>>,
/// Gauge tracking maximum dataflow wallclock lag.
wallclock_lag_seconds_max: DeleteOnDropGauge<'static, AtomicF64, Vec<String>>,
/// Counter tracking the total sum of dataflow wallclock lag.
wallclock_lag_seconds_sum: DeleteOnDropCounter<'static, AtomicF64, Vec<String>>,
/// Counter tracking the total count of dataflow wallclock lag measurements.
wallclock_lag_seconds_count: DeleteOnDropCounter<'static, AtomicU64, Vec<String>>,

/// State maintaining minimum and maximum wallclock lag.
wallclock_lag_minmax: SlidingMinMax<f32>,
}

impl WallclockLagMetrics {
/// Observe a new wallclock lag measurement.
pub fn observe(&mut self, lag: Duration) {
let lag_secs = lag.as_secs_f32();

self.wallclock_lag_minmax.add_sample(lag_secs);

let (&min, &max) = self
.wallclock_lag_minmax
.get()
.expect("just added a sample");

self.wallclock_lag_seconds_min.set(min.into());
self.wallclock_lag_seconds_max.set(max.into());
self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
self.wallclock_lag_seconds_count.inc();
}
}
8 changes: 6 additions & 2 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::ClusterReplicaLocation;
use mz_cluster_client::metrics::ControllerMetrics;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
Expand Down Expand Up @@ -239,7 +240,8 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
storage_collections: StorageCollections<T>,
envd_epoch: NonZeroI64,
read_only: bool,
metrics_registry: MetricsRegistry,
metrics_registry: &MetricsRegistry,
controller_metrics: ControllerMetrics,
now: NowFn,
wallclock_lag: WallclockLagFn<T>,
) -> Self {
Expand Down Expand Up @@ -291,6 +293,8 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
}
});

let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);

Self {
instances: BTreeMap::new(),
instance_workload_classes,
Expand All @@ -302,7 +306,7 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
arrangement_exert_proportionality: 16,
stashed_response: None,
envd_epoch,
metrics: ComputeControllerMetrics::new(metrics_registry),
metrics,
now,
wallclock_lag,
dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
}

if let Some(metrics) = &mut collection.metrics {
metrics.observe_wallclock_lag(lag);
metrics.wallclock_lag.observe(lag);
};
}
}
Expand Down
103 changes: 17 additions & 86 deletions src/compute-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ use std::borrow::Borrow;
use std::sync::Arc;
use std::time::Duration;

use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
use mz_cluster_client::ReplicaId;
use mz_compute_types::ComputeInstanceId;
use mz_ore::cast::CastFrom;
use mz_ore::metric;
use mz_ore::metrics::raw::UIntGaugeVec;
use mz_ore::metrics::{
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec,
HistogramVec, IntCounterVec, MetricVecExt, MetricsRegistry,
DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
IntCounterVec, MetricVecExt, MetricsRegistry,
};
use mz_ore::stats::{histogram_seconds_buckets, SlidingMinMax};
use mz_ore::stats::histogram_seconds_buckets;
use mz_repr::GlobalId;
use mz_service::codec::StatsCollector;
use prometheus::core::{AtomicF64, AtomicU64};

use crate::protocol::command::{ComputeCommand, ProtoComputeCommand};
use crate::protocol::response::{PeekResponse, ProtoComputeResponse};

type Counter = DeleteOnDropCounter<'static, AtomicF64, Vec<String>>;
pub(crate) type IntCounter = DeleteOnDropCounter<'static, AtomicU64, Vec<String>>;
type Gauge = DeleteOnDropGauge<'static, AtomicF64, Vec<String>>;
/// TODO(database-issues#7533): Add documentation.
Expand Down Expand Up @@ -68,14 +68,14 @@ pub struct ComputeControllerMetrics {

// dataflows
dataflow_initial_output_duration_seconds: GaugeVec,
dataflow_wallclock_lag_seconds: GaugeVec,
dataflow_wallclock_lag_seconds_sum: CounterVec,
dataflow_wallclock_lag_seconds_count: IntCounterVec,

/// Metrics shared with the storage controller.
shared: ControllerMetrics,
}

impl ComputeControllerMetrics {
/// Create a metrics instance registered into the given registry.
pub fn new(metrics_registry: MetricsRegistry) -> Self {
pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
ComputeControllerMetrics {
commands_total: metrics_registry.register(metric!(
name: "mz_compute_commands_total",
Expand Down Expand Up @@ -174,25 +174,7 @@ impl ComputeControllerMetrics {
var_labels: ["instance_id", "replica_id", "collection_id"],
)),

// The next three metrics immitate a summary metric type. The `prometheus` crate lacks
// support for summaries, so we roll our own. Note that we also only expose the 0- and
// the 1-quantile, i.e., minimum and maximum lag values.
dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds",
help: "A summary of the second-by-second lag of the dataflow frontier relative \
to wallclock time, aggregated over the last minute.",
var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
)),
dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_sum",
help: "The total sum of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_count",
help: "The total count of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
shared,
}
}

Expand Down Expand Up @@ -418,44 +400,20 @@ impl ReplicaMetrics {
collection_id.to_string(),
];

let labels_with_quantile = |quantile: &str| {
labels
.iter()
.cloned()
.chain([quantile.to_string()])
.collect()
};

let initial_output_duration_seconds = self
.metrics
.dataflow_initial_output_duration_seconds
.get_delete_on_drop_metric(labels.clone());

let wallclock_lag_seconds_min = self
.metrics
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("0"));
let wallclock_lag_seconds_max = self
.metrics
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("1"));
let wallclock_lag_seconds_sum = self
.metrics
.dataflow_wallclock_lag_seconds_sum
.get_delete_on_drop_metric(labels.clone());
let wallclock_lag_seconds_count = self
.metrics
.dataflow_wallclock_lag_seconds_count
.get_delete_on_drop_metric(labels);
let wallclock_lag_minmax = SlidingMinMax::new(60);
let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
collection_id.to_string(),
Some(self.instance_id.to_string()),
Some(self.replica_id.to_string()),
);

Some(ReplicaCollectionMetrics {
initial_output_duration_seconds,
wallclock_lag_seconds_min,
wallclock_lag_seconds_max,
wallclock_lag_seconds_sum,
wallclock_lag_seconds_count,
wallclock_lag_minmax,
wallclock_lag,
})
}
}
Expand Down Expand Up @@ -484,35 +442,8 @@ impl StatsCollector<ProtoComputeCommand, ProtoComputeResponse> for ReplicaMetric
pub(crate) struct ReplicaCollectionMetrics {
/// Gauge tracking dataflow hydration time.
pub initial_output_duration_seconds: Gauge,
/// Gauge tracking minimum dataflow wallclock lag.
wallclock_lag_seconds_min: Gauge,
/// Gauge tracking maximum dataflow wallclock lag.
wallclock_lag_seconds_max: Gauge,
/// Counter tracking the total sum of dataflow wallclock lag.
wallclock_lag_seconds_sum: Counter,
/// Counter tracking the total count of dataflow wallclock lag measurements.
wallclock_lag_seconds_count: IntCounter,

/// State maintaining minimum and maximum wallclock lag.
wallclock_lag_minmax: SlidingMinMax<f32>,
}

impl ReplicaCollectionMetrics {
pub fn observe_wallclock_lag(&mut self, lag: Duration) {
let lag_secs = lag.as_secs_f32();

self.wallclock_lag_minmax.add_sample(lag_secs);

let (&min, &max) = self
.wallclock_lag_minmax
.get()
.expect("just added a sample");

self.wallclock_lag_seconds_min.set(min.into());
self.wallclock_lag_seconds_max.set(max.into());
self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
self.wallclock_lag_seconds_count.inc();
}
/// Metrics tracking dataflow wallclock lag.
pub wallclock_lag: WallclockLagMetrics,
}

/// Metrics keyed by `ComputeCommand` type.
Expand Down
9 changes: 7 additions & 2 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::time::Duration;

use futures::future::BoxFuture;
use mz_build_info::BuildInfo;
use mz_cluster_client::metrics::ControllerMetrics;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_client::controller::{
ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
Expand Down Expand Up @@ -649,6 +650,8 @@ where
Duration::from(lag_ts)
});

let controller_metrics = ControllerMetrics::new(&config.metrics_registry);

let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
let collections_ctl = storage_collections::StorageCollectionsImpl::new(
config.persist_location.clone(),
Expand All @@ -675,7 +678,8 @@ where
Arc::clone(&txns_metrics),
envd_epoch,
read_only,
config.metrics_registry.clone(),
&config.metrics_registry,
controller_metrics.clone(),
config.connection_context,
storage_txn,
Arc::clone(&collections_ctl),
Expand All @@ -688,7 +692,8 @@ where
storage_collections,
envd_epoch,
read_only,
config.metrics_registry.clone(),
&config.metrics_registry,
controller_metrics,
config.now.clone(),
wallclock_lag,
);
Expand Down
Loading

0 comments on commit 6c0ebf5

Please sign in to comment.