Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Merge pull request #30568 from teskje/storage-wallclock-metrics" #30854

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/cluster-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use anyhow::bail;
use serde::{Deserialize, Serialize};

pub mod client;
pub mod metrics;

/// A function that computes the lag between the given time and wallclock time.
pub type WallclockLagFn<T> = Arc<dyn Fn(&T) -> Duration + Send + Sync>;
Expand Down
134 changes: 0 additions & 134 deletions src/cluster-client/src/metrics.rs

This file was deleted.

8 changes: 2 additions & 6 deletions src/compute-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use mz_build_info::BuildInfo;
use mz_cluster_client::client::ClusterReplicaLocation;
use mz_cluster_client::metrics::ControllerMetrics;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::COMPUTE_REPLICA_EXPIRATION_OFFSET;
Expand Down Expand Up @@ -240,8 +239,7 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
storage_collections: StorageCollections<T>,
envd_epoch: NonZeroI64,
read_only: bool,
metrics_registry: &MetricsRegistry,
controller_metrics: ControllerMetrics,
metrics_registry: MetricsRegistry,
now: NowFn,
wallclock_lag: WallclockLagFn<T>,
) -> Self {
Expand Down Expand Up @@ -293,8 +291,6 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
}
});

let metrics = ComputeControllerMetrics::new(metrics_registry, controller_metrics);

Self {
instances: BTreeMap::new(),
instance_workload_classes,
Expand All @@ -306,7 +302,7 @@ impl<T: ComputeControllerTimestamp> ComputeController<T> {
arrangement_exert_proportionality: 16,
stashed_response: None,
envd_epoch,
metrics,
metrics: ComputeControllerMetrics::new(metrics_registry),
now,
wallclock_lag,
dyncfg: Arc::new(mz_dyncfgs::all_dyncfgs()),
Expand Down
2 changes: 1 addition & 1 deletion src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl<T: ComputeControllerTimestamp> Instance<T> {
}

if let Some(metrics) = &mut collection.metrics {
metrics.wallclock_lag.observe(lag);
metrics.observe_wallclock_lag(lag);
};
}
}
Expand Down
103 changes: 86 additions & 17 deletions src/compute-client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ use std::borrow::Borrow;
use std::sync::Arc;
use std::time::Duration;

use mz_cluster_client::metrics::{ControllerMetrics, WallclockLagMetrics};
use mz_cluster_client::ReplicaId;
use mz_compute_types::ComputeInstanceId;
use mz_ore::cast::CastFrom;
use mz_ore::metric;
use mz_ore::metrics::raw::UIntGaugeVec;
use mz_ore::metrics::{
DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec, HistogramVec,
IntCounterVec, MetricVecExt, MetricsRegistry,
CounterVec, DeleteOnDropCounter, DeleteOnDropGauge, DeleteOnDropHistogram, GaugeVec,
HistogramVec, IntCounterVec, MetricVecExt, MetricsRegistry,
};
use mz_ore::stats::histogram_seconds_buckets;
use mz_ore::stats::{histogram_seconds_buckets, SlidingMinMax};
use mz_repr::GlobalId;
use mz_service::codec::StatsCollector;
use prometheus::core::{AtomicF64, AtomicU64};

use crate::protocol::command::{ComputeCommand, ProtoComputeCommand};
use crate::protocol::response::{PeekResponse, ProtoComputeResponse};

type Counter = DeleteOnDropCounter<'static, AtomicF64, Vec<String>>;
pub(crate) type IntCounter = DeleteOnDropCounter<'static, AtomicU64, Vec<String>>;
type Gauge = DeleteOnDropGauge<'static, AtomicF64, Vec<String>>;
/// TODO(database-issues#7533): Add documentation.
Expand Down Expand Up @@ -68,14 +68,14 @@ pub struct ComputeControllerMetrics {

// dataflows
dataflow_initial_output_duration_seconds: GaugeVec,

/// Metrics shared with the storage controller.
shared: ControllerMetrics,
dataflow_wallclock_lag_seconds: GaugeVec,
dataflow_wallclock_lag_seconds_sum: CounterVec,
dataflow_wallclock_lag_seconds_count: IntCounterVec,
}

impl ComputeControllerMetrics {
/// Create a metrics instance registered into the given registry.
pub fn new(metrics_registry: &MetricsRegistry, shared: ControllerMetrics) -> Self {
pub fn new(metrics_registry: MetricsRegistry) -> Self {
ComputeControllerMetrics {
commands_total: metrics_registry.register(metric!(
name: "mz_compute_commands_total",
Expand Down Expand Up @@ -174,7 +174,25 @@ impl ComputeControllerMetrics {
var_labels: ["instance_id", "replica_id", "collection_id"],
)),

shared,
// The next three metrics immitate a summary metric type. The `prometheus` crate lacks
// support for summaries, so we roll our own. Note that we also only expose the 0- and
// the 1-quantile, i.e., minimum and maximum lag values.
dataflow_wallclock_lag_seconds: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds",
help: "A summary of the second-by-second lag of the dataflow frontier relative \
to wallclock time, aggregated over the last minute.",
var_labels: ["instance_id", "replica_id", "collection_id", "quantile"],
)),
dataflow_wallclock_lag_seconds_sum: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_sum",
help: "The total sum of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
dataflow_wallclock_lag_seconds_count: metrics_registry.register(metric!(
name: "mz_dataflow_wallclock_lag_seconds_count",
help: "The total count of dataflow wallclock lag measurements.",
var_labels: ["instance_id", "replica_id", "collection_id"],
)),
}
}

Expand Down Expand Up @@ -400,20 +418,44 @@ impl ReplicaMetrics {
collection_id.to_string(),
];

let labels_with_quantile = |quantile: &str| {
labels
.iter()
.cloned()
.chain([quantile.to_string()])
.collect()
};

let initial_output_duration_seconds = self
.metrics
.dataflow_initial_output_duration_seconds
.get_delete_on_drop_metric(labels.clone());

let wallclock_lag = self.metrics.shared.wallclock_lag_metrics(
collection_id.to_string(),
Some(self.instance_id.to_string()),
Some(self.replica_id.to_string()),
);
let wallclock_lag_seconds_min = self
.metrics
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("0"));
let wallclock_lag_seconds_max = self
.metrics
.dataflow_wallclock_lag_seconds
.get_delete_on_drop_metric(labels_with_quantile("1"));
let wallclock_lag_seconds_sum = self
.metrics
.dataflow_wallclock_lag_seconds_sum
.get_delete_on_drop_metric(labels.clone());
let wallclock_lag_seconds_count = self
.metrics
.dataflow_wallclock_lag_seconds_count
.get_delete_on_drop_metric(labels);
let wallclock_lag_minmax = SlidingMinMax::new(60);

Some(ReplicaCollectionMetrics {
initial_output_duration_seconds,
wallclock_lag,
wallclock_lag_seconds_min,
wallclock_lag_seconds_max,
wallclock_lag_seconds_sum,
wallclock_lag_seconds_count,
wallclock_lag_minmax,
})
}
}
Expand Down Expand Up @@ -442,8 +484,35 @@ impl StatsCollector<ProtoComputeCommand, ProtoComputeResponse> for ReplicaMetric
pub(crate) struct ReplicaCollectionMetrics {
/// Gauge tracking dataflow hydration time.
pub initial_output_duration_seconds: Gauge,
/// Metrics tracking dataflow wallclock lag.
pub wallclock_lag: WallclockLagMetrics,
/// Gauge tracking minimum dataflow wallclock lag.
wallclock_lag_seconds_min: Gauge,
/// Gauge tracking maximum dataflow wallclock lag.
wallclock_lag_seconds_max: Gauge,
/// Counter tracking the total sum of dataflow wallclock lag.
wallclock_lag_seconds_sum: Counter,
/// Counter tracking the total count of dataflow wallclock lag measurements.
wallclock_lag_seconds_count: IntCounter,

/// State maintaining minimum and maximum wallclock lag.
wallclock_lag_minmax: SlidingMinMax<f32>,
}

impl ReplicaCollectionMetrics {
pub fn observe_wallclock_lag(&mut self, lag: Duration) {
let lag_secs = lag.as_secs_f32();

self.wallclock_lag_minmax.add_sample(lag_secs);

let (&min, &max) = self
.wallclock_lag_minmax
.get()
.expect("just added a sample");

self.wallclock_lag_seconds_min.set(min.into());
self.wallclock_lag_seconds_max.set(max.into());
self.wallclock_lag_seconds_sum.inc_by(lag_secs.into());
self.wallclock_lag_seconds_count.inc();
}
}

/// Metrics keyed by `ComputeCommand` type.
Expand Down
9 changes: 2 additions & 7 deletions src/controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use std::time::Duration;

use futures::future::BoxFuture;
use mz_build_info::BuildInfo;
use mz_cluster_client::metrics::ControllerMetrics;
use mz_cluster_client::{ReplicaId, WallclockLagFn};
use mz_compute_client::controller::{
ComputeController, ComputeControllerResponse, ComputeControllerTimestamp, PeekNotification,
Expand Down Expand Up @@ -650,8 +649,6 @@ where
Duration::from(lag_ts)
});

let controller_metrics = ControllerMetrics::new(&config.metrics_registry);

let txns_metrics = Arc::new(TxnMetrics::new(&config.metrics_registry));
let collections_ctl = storage_collections::StorageCollectionsImpl::new(
config.persist_location.clone(),
Expand All @@ -678,8 +675,7 @@ where
Arc::clone(&txns_metrics),
envd_epoch,
read_only,
&config.metrics_registry,
controller_metrics.clone(),
config.metrics_registry.clone(),
config.connection_context,
storage_txn,
Arc::clone(&collections_ctl),
Expand All @@ -692,8 +688,7 @@ where
storage_collections,
envd_epoch,
read_only,
&config.metrics_registry,
controller_metrics,
config.metrics_registry.clone(),
config.now.clone(),
wallclock_lag,
);
Expand Down
Loading
Loading