diff --git a/e2e/nomostest/prometheus_metrics.go b/e2e/nomostest/prometheus_metrics.go index 36bf9b938f..c74c72b201 100644 --- a/e2e/nomostest/prometheus_metrics.go +++ b/e2e/nomostest/prometheus_metrics.go @@ -273,9 +273,9 @@ func ReconcilerErrorMetrics(nt *NT, syncLabels prometheusmodel.LabelSet, commitH var predicates []MetricsPredicate // Metrics aggregated by total count - predicates = append(predicates, metricResourceFightsHasValueAtLeast(nt, syncLabels, summary.Fights)) - predicates = append(predicates, metricResourceConflictsHasValueAtLeast(nt, syncLabels, commitHash, summary.Conflicts)) - predicates = append(predicates, metricInternalErrorsHasValueAtLeast(nt, syncLabels, summary.Internal)) + predicates = append(predicates, metricResourceFightsHasValueAtLeast(nt, syncLabels, summary.Fights, 1*time.Minute)) + predicates = append(predicates, metricResourceConflictsHasValueAtLeast(nt, syncLabels, commitHash, summary.Conflicts, 1*time.Minute)) + predicates = append(predicates, metricInternalErrorsHasValueAtLeast(nt, syncLabels, summary.Internal, 1*time.Minute)) // Metrics aggregated by last value predicates = append(predicates, metricReconcilerErrorsHasValue(nt, syncLabels, componentRendering, summary.Rendering)) predicates = append(predicates, metricReconcilerErrorsHasValue(nt, syncLabels, componentSource, summary.Source)) @@ -320,7 +320,7 @@ func metricReconcilerErrorsHasValue(nt *NT, syncLabels prometheusmodel.LabelSet, metricName := ocmetrics.ReconcilerErrorsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyExportedComponent.Name()): prometheusmodel.LabelValue(componentName), }.Merge(syncLabels) // ReconcilerErrorsView only keeps the LastValue, so we don't need to aggregate @@ -337,63 +337,61 @@ func metricReconcilerErrorsHasValue(nt *NT, syncLabels prometheusmodel.LabelSet, // metricResourceFightsHasValueAtLeast returns a MetricsPredicate that validates // that ResourceFights has at least the expected value. // If the expected value is zero, the metric must be zero or not found. -func metricResourceFightsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int) MetricsPredicate { +func metricResourceFightsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int, duration time.Duration) MetricsPredicate { return func(ctx context.Context, v1api prometheusv1.API) error { metricName := ocmetrics.ResourceFightsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, }.Merge(syncLabels) - // ResourceFightsView counts the total number of ResourceFights, so we don't need to aggregate - query := fmt.Sprintf("%s%s", metricName, labels) if value == 0 { - // Tolerate missing metrics when expecting a zero value. - // Don't allow any value other than zero. - return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value)) + query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String()) + return metricExistsWithValue(ctx, nt, v1api, query, 0) } + query := fmt.Sprintf("%s%s", metricName, labels) return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value)) } } // metricResourceConflictsHasValueAtLeast returns a MetricsPredicate that -// validates that ResourceConflicts has at least the expected value. -// If the expected value is zero, the metric must be zero or not found. -func metricResourceConflictsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, commitHash string, value int) MetricsPredicate { +// validates whether ResourceConflicts has been recorded recently +// If the expected value is zero, the rate at which ResourceConflicts occur is also zero. +func metricResourceConflictsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, commitHash string, value int, duration time.Duration) MetricsPredicate { return func(ctx context.Context, v1api prometheusv1.API) error { metricName := ocmetrics.ResourceConflictsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) + if value == 0 { + labels := prometheusmodel.LabelSet{ + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, + prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): ocmetrics.Initialization, + }.Merge(syncLabels) + query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String()) + return metricExistsWithValue(ctx, nt, v1api, query, 0) + } labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), }.Merge(syncLabels) - // ResourceConflictsView counts the total number of ResourceConflicts, so we don't need to aggregate query := fmt.Sprintf("%s%s", metricName, labels) - if value == 0 { - // Tolerate missing metrics when expecting a zero value. - // Don't allow any value other than zero. - return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value)) - } return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value)) } } // metricInternalErrorsHasValueAtLeast returns a MetricsPredicate that validates -// that InternalErrors has at least the expected value. -// If the expected value is zero, the metric must be zero or not found. -func metricInternalErrorsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int) MetricsPredicate { +// whether InternalErrors has been recorded recently. +// If the expected value is zero, the rate at which InternalErrors occur is also zero. +func metricInternalErrorsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int, duration time.Duration) MetricsPredicate { return func(ctx context.Context, v1api prometheusv1.API) error { metricName := ocmetrics.InternalErrorsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, }.Merge(syncLabels) - // InternalErrorsView counts the total number of InternalErrors, so we don't need to aggregate - query := fmt.Sprintf("%s%s", metricName, labels) if value == 0 { - // Tolerate missing metrics when expecting a zero value. - // Don't allow any value other than zero. - return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value)) + query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String()) + return metricExistsWithValue(ctx, nt, v1api, query, 0) } + query := fmt.Sprintf("%s%s", metricName, labels) return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value)) } } @@ -402,7 +400,7 @@ func metricLastSyncTimestampHasStatus(ctx context.Context, nt *NT, v1api prometh metricName := ocmetrics.LastSyncTimestampView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -415,7 +413,7 @@ func metricLastApplyTimestampHasStatus(ctx context.Context, nt *NT, v1api promet metricName := ocmetrics.LastApplyTimestampView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -429,7 +427,7 @@ func metricApplyDurationViewHasStatus(ctx context.Context, nt *NT, v1api prometh // ApplyDurationView is a distribution. Query count to aggregate. metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -441,7 +439,7 @@ func metricDeclaredResourcesViewHasValue(ctx context.Context, nt *NT, v1api prom metricName := ocmetrics.DeclaredResourcesView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash), }.Merge(syncLabels) // DeclaredResourcesView only keeps the LastValue, so we don't need to aggregate @@ -454,7 +452,7 @@ func metricAPICallDurationViewOperationHasStatus(ctx context.Context, nt *NT, v1 // APICallDurationView is a distribution. Query count to aggregate. metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -466,8 +464,8 @@ func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api metricName := ocmetrics.ApplyOperationsView.Name metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), - prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(ocmetrics.ApplierController), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, + prometheusmodel.LabelName(ocmetrics.KeyController.Name()): ocmetrics.ApplierController, prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation), prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) @@ -481,7 +479,7 @@ func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api pro // RemediateDurationView is a distribution. Query count to aggregate. metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix) labels := prometheusmodel.LabelSet{ - prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName), + prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName, prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status), }.Merge(syncLabels) query := fmt.Sprintf("%s%s", metricName, labels) diff --git a/pkg/metrics/record.go b/pkg/metrics/record.go index ed8b9fe0af..fcd9914cc6 100644 --- a/pkg/metrics/record.go +++ b/pkg/metrics/record.go @@ -27,6 +27,9 @@ import ( "kpt.dev/configsync/pkg/status" ) +// Initialization is the tag value to initiate a Count +const Initialization = "cs_counter_initialization" + func record(ctx context.Context, ms ...stats.Measurement) { stats.Record(ctx, ms...) if klog.V(5).Enabled() { @@ -136,11 +139,12 @@ func RecordApplyDuration(ctx context.Context, status, commit string, startTime t } // RecordResourceFight produces measurements for the ResourceFights view. -func RecordResourceFight(ctx context.Context, _ string) { - //tagCtx, _ := tag.New(ctx, - //tag.Upsert(KeyName, GetResourceLabels()), - //tag.Upsert(KeyOperation, operation), - //) +func RecordResourceFight(ctx context.Context, tag string) { + if tag == Initialization { + measurement := ResourceFights.M(0) + record(ctx, measurement) + return + } measurement := ResourceFights.M(1) record(ctx, measurement) } @@ -157,9 +161,13 @@ func RecordRemediateDuration(ctx context.Context, status string, startTime time. // RecordResourceConflict produces measurements for the ResourceConflicts view. func RecordResourceConflict(ctx context.Context, commit string) { tagCtx, _ := tag.New(ctx, - // tag.Upsert(KeyName, GetResourceLabels()), tag.Upsert(KeyCommit, commit), ) + if commit == Initialization { + measurement := ResourceConflicts.M(0) + record(tagCtx, measurement) + return + } measurement := ResourceConflicts.M(1) record(tagCtx, measurement) } @@ -167,6 +175,11 @@ func RecordResourceConflict(ctx context.Context, commit string) { // RecordInternalError produces measurements for the InternalErrors view. func RecordInternalError(ctx context.Context, source string) { tagCtx, _ := tag.New(ctx, tag.Upsert(KeyInternalErrorSource, source)) + if source == Initialization { + measurement := InternalErrors.M(0) + record(tagCtx, measurement) + return + } measurement := InternalErrors.M(1) record(tagCtx, measurement) } diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 1e46c064ca..888c1ce5e3 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -79,6 +79,12 @@ type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcil // Run keeps checking whether a parse-apply-watch loop is necessary and starts a loop if needed. func Run(ctx context.Context, p Parser, nsControllerState *namespacecontroller.State, runOpts RunOpts) { + // Pre-initialize count metrics to ensure visibility, even if errors haven't occurred. + // Special label indicates initial state. Metrics are cumulative and used for rate aggregation + // to detect ongoing errors. Zero value means no recent errors. + metrics.RecordResourceFight(ctx, metrics.Initialization) + metrics.RecordInternalError(ctx, metrics.Initialization) + metrics.RecordResourceConflict(ctx, metrics.Initialization) opts := p.options() // Use timers, not tickers. // Tickers can cause memory leaks and continuous execution, when execution