From f9d46f95e71f42bae8618a7b747a147a9d06951d Mon Sep 17 00:00:00 2001 From: Tiffany Pei Date: Tue, 28 May 2024 21:43:10 +0000 Subject: [PATCH] [metric] #4 Initialize count metrics on reconciler start The Count views won't show up in query results until data is sent when error condition happens. This change initialize these metrics and adjust the e2e test to check for Rate aggregation. The public documentation should also be revised to recommend the Rate aggregation for this type of metrics. --- e2e/nomostest/prometheus_metrics.go | 74 ++++++++++++++--------------- pkg/metrics/record.go | 25 +++++++--- pkg/parse/run.go | 6 +++ 3 files changed, 61 insertions(+), 44 deletions(-) diff --git a/e2e/nomostest/prometheus_metrics.go b/e2e/nomostest/prometheus_metrics.go index 36bf9b938f..c74c72b201 100644 --- a/e2e/nomostest/prometheus_metrics.go +++ b/e2e/nomostest/prometheus_metrics.go @@ -273,9 +273,9 @@ func ReconcilerErrorMetrics(nt *NT, syncLabels prometheusmodel.LabelSet, commitH var predicates []MetricsPredicate // Metrics aggregated by total count - predicates = append(predicates, metricResourceFightsHasValueAtLeast(nt, syncLabels, summary.Fights)) - predicates = append(predicates, metricResourceConflictsHasValueAtLeast(nt, syncLabels, commitHash, summary.Conflicts)) - predicates = append(predicates, metricInternalErrorsHasValueAtLeast(nt, syncLabels, summary.Internal)) + predicates = append(predicates, metricResourceFightsHasValueAtLeast(nt, syncLabels, summary.Fights, 1*time.Minute)) + predicates = append(predicates, metricResourceConflictsHasValueAtLeast(nt, syncLabels, commitHash, summary.Conflicts, 1*time.Minute)) + predicates = append(predicates, metricInternalErrorsHasValueAtLeast(nt, syncLabels, summary.Internal, 1*time.Minute)) // Metrics aggregated by last value predicates = append(predicates, metricReconcilerErrorsHasValue(nt, syncLabels, componentRendering, summary.Rendering)) predicates = append(predicates, metricReconcilerErrorsHasValue(nt, syncLabels, componentSource, summary.Source)) @@ -320,7 +320,7 @@ func metricReconcilerErrorsHasValue(nt *NT, syncLabels prometheusmodel.LabelSet, metricName := ocmetrics.ReconcilerErrorsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyExportedComponent.Name()): prometheusmodel.LabelValue(componentName), }.Merge(syncLabels) // ReconcilerErrorsView only keeps the LastValue, so we don't need to aggregate @@ -337,63 +337,61 @@ func metricReconcilerErrorsHasValue(nt *NT, syncLabels prometheusmodel.LabelSet, // metricResourceFightsHasValueAtLeast returns a MetricsPredicate that validates // that ResourceFights has at least the expected value. // If the expected value is zero, the metric must be zero or not found. -func metricResourceFightsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int) MetricsPredicate { +func metricResourceFightsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int, duration time.Duration) MetricsPredicate { return func(ctx context.Context, v1api prometheusv1.API) error { metricName := ocmetrics.ResourceFightsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, }.Merge(syncLabels) - // ResourceFightsView counts the total number of ResourceFights, so we don't need to aggregate - query := fmt.Sprintf("%s%s", metricName, labels) if value == 0 { - // Tolerate missing metrics when expecting a zero value. - // Don't allow any value other than zero. - return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value)) + query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String()) + return metricExistsWithValue(ctx, nt, v1api, query, 0) } + query := fmt.Sprintf("%s%s", metricName, labels) return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value)) } } // metricResourceConflictsHasValueAtLeast returns a MetricsPredicate that -// validates that ResourceConflicts has at least the expected value. -// If the expected value is zero, the metric must be zero or not found. -func metricResourceConflictsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, commitHash string, value int) MetricsPredicate { +// validates whether ResourceConflicts has been recorded recently +// If the expected value is zero, the rate at which ResourceConflicts occur is also zero. +func metricResourceConflictsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, commitHash string, value int, duration time.Duration) MetricsPredicate { return func(ctx context.Context, v1api prometheusv1.API) error { metricName := ocmetrics.ResourceConflictsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) + if value == 0 { + labels := prometheusmodel.LabelSet{ + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, + prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): ocmetrics.Initialization, + }.Merge(syncLabels) + query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String()) + return metricExistsWithValue(ctx, nt, v1api, query, 0) + } labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), }.Merge(syncLabels) - // ResourceConflictsView counts the total number of ResourceConflicts, so we don't need to aggregate query := fmt.Sprintf("%s%s", metricName, labels) - if value == 0 { - // Tolerate missing metrics when expecting a zero value. - // Don't allow any value other than zero. - return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value)) - } return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value)) } } // metricInternalErrorsHasValueAtLeast returns a MetricsPredicate that validates -// that InternalErrors has at least the expected value. -// If the expected value is zero, the metric must be zero or not found. -func metricInternalErrorsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int) MetricsPredicate { +// whether InternalErrors has been recorded recently. +// If the expected value is zero, the rate at which InternalErrors occur is also zero. +func metricInternalErrorsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int, duration time.Duration) MetricsPredicate { return func(ctx context.Context, v1api prometheusv1.API) error { metricName := ocmetrics.InternalErrorsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, }.Merge(syncLabels) - // InternalErrorsView counts the total number of InternalErrors, so we don't need to aggregate - query := fmt.Sprintf("%s%s", metricName, labels) if value == 0 { - // Tolerate missing metrics when expecting a zero value. - // Don't allow any value other than zero. - return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value)) + query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String()) + return metricExistsWithValue(ctx, nt, v1api, query, 0) } + query := fmt.Sprintf("%s%s", metricName, labels) return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value)) } } @@ -402,7 +400,7 @@ func metricLastSyncTimestampHasStatus(ctx context.Context, nt *NT, v1api prometh metricName := ocmetrics.LastSyncTimestampView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -415,7 +413,7 @@ func metricLastApplyTimestampHasStatus(ctx context.Context, nt *NT, v1api promet metricName := ocmetrics.LastApplyTimestampView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -429,7 +427,7 @@ func metricApplyDurationViewHasStatus(ctx context.Context, nt *NT, v1api prometh // ApplyDurationView is a distribution. Query count to aggregate. metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -441,7 +439,7 @@ func metricDeclaredResourcesViewHasValue(ctx context.Context, nt *NT, v1api prom metricName := ocmetrics.DeclaredResourcesView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), }.Merge(syncLabels) // DeclaredResourcesView only keeps the LastValue, so we don't need to aggregate @@ -454,7 +452,7 @@ func metricAPICallDurationViewOperationHasStatus(ctx context.Context, nt *NT, v1 // APICallDurationView is a distribution. Query count to aggregate. metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -466,8 +464,8 @@ func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api metricName := ocmetrics.ApplyOperationsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), - prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(ocmetrics.ApplierController), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, + prometheusmodel.LabelName(ocmetrics.KeyController.Name()): ocmetrics.ApplierController, prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -481,7 +479,7 @@ func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api pro // RemediateDurationView is a distribution. Query count to aggregate. metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) query := fmt.Sprintf("%s%s", metricName, labels) diff --git a/pkg/metrics/record.go b/pkg/metrics/record.go index ed8b9fe0af..fcd9914cc6 100644 --- a/pkg/metrics/record.go +++ b/pkg/metrics/record.go @@ -27,6 +27,9 @@ import ( "kpt.dev/configsync/pkg/status" ) +// Initialization is the tag value to initiate a Count +const Initialization = "cs_counter_initialization" + func record(ctx context.Context, ms ...stats.Measurement) { stats.Record(ctx, ms...) if klog.V(5).Enabled() { @@ -136,11 +139,12 @@ func RecordApplyDuration(ctx context.Context, status, commit string, startTime t } // RecordResourceFight produces measurements for the ResourceFights view. -func RecordResourceFight(ctx context.Context, _ string) { - //tagCtx, _ := tag.New(ctx, - //tag.Upsert(KeyName, GetResourceLabels()), - //tag.Upsert(KeyOperation, operation), - //) +func RecordResourceFight(ctx context.Context, tag string) { + if tag == Initialization { + measurement := ResourceFights.M(0) + record(ctx, measurement) + return + } measurement := ResourceFights.M(1) record(ctx, measurement) } @@ -157,9 +161,13 @@ func RecordRemediateDuration(ctx context.Context, status string, startTime time. // RecordResourceConflict produces measurements for the ResourceConflicts view. func RecordResourceConflict(ctx context.Context, commit string) { tagCtx, _ := tag.New(ctx, - // tag.Upsert(KeyName, GetResourceLabels()), tag.Upsert(KeyCommit, commit), ) + if commit == Initialization { + measurement := ResourceConflicts.M(0) + record(tagCtx, measurement) + return + } measurement := ResourceConflicts.M(1) record(tagCtx, measurement) } @@ -167,6 +175,11 @@ func RecordResourceConflict(ctx context.Context, commit string) { // RecordInternalError produces measurements for the InternalErrors view. func RecordInternalError(ctx context.Context, source string) { tagCtx, _ := tag.New(ctx, tag.Upsert(KeyInternalErrorSource, source)) + if source == Initialization { + measurement := InternalErrors.M(0) + record(tagCtx, measurement) + return + } measurement := InternalErrors.M(1) record(tagCtx, measurement) } diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 1e46c064ca..888c1ce5e3 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -79,6 +79,12 @@ type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcil // Run keeps checking whether a parse-apply-watch loop is necessary and starts a loop if needed. func Run(ctx context.Context, p Parser, nsControllerState *namespacecontroller.State, runOpts RunOpts) { + // Pre-initialize count metrics to ensure visibility, even if errors haven't occurred. + // Special label indicates initial state. Metrics are cumulative and used for rate aggregation + // to detect ongoing errors. Zero value means no recent errors. + metrics.RecordResourceFight(ctx, metrics.Initialization) + metrics.RecordInternalError(ctx, metrics.Initialization) + metrics.RecordResourceConflict(ctx, metrics.Initialization) opts := p.options() // Use timers, not tickers. // Tickers can cause memory leaks and continuous execution, when execution