Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metric] #4 Initialize count metrics on reconciler start #1237

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 36 additions & 38 deletions e2e/nomostest/prometheus_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,9 @@ func ReconcilerErrorMetrics(nt *NT, syncLabels prometheusmodel.LabelSet, commitH

var predicates []MetricsPredicate
// Metrics aggregated by total count
predicates = append(predicates, metricResourceFightsHasValueAtLeast(nt, syncLabels, summary.Fights))
predicates = append(predicates, metricResourceConflictsHasValueAtLeast(nt, syncLabels, commitHash, summary.Conflicts))
predicates = append(predicates, metricInternalErrorsHasValueAtLeast(nt, syncLabels, summary.Internal))
predicates = append(predicates, metricResourceFightsHasValueAtLeast(nt, syncLabels, summary.Fights, 1*time.Minute))
predicates = append(predicates, metricResourceConflictsHasValueAtLeast(nt, syncLabels, commitHash, summary.Conflicts, 1*time.Minute))
predicates = append(predicates, metricInternalErrorsHasValueAtLeast(nt, syncLabels, summary.Internal, 1*time.Minute))
// Metrics aggregated by last value
predicates = append(predicates, metricReconcilerErrorsHasValue(nt, syncLabels, componentRendering, summary.Rendering))
predicates = append(predicates, metricReconcilerErrorsHasValue(nt, syncLabels, componentSource, summary.Source))
Expand Down Expand Up @@ -320,7 +320,7 @@ func metricReconcilerErrorsHasValue(nt *NT, syncLabels prometheusmodel.LabelSet,
metricName := ocmetrics.ReconcilerErrorsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyExportedComponent.Name()): prometheusmodel.LabelValue(componentName),
}.Merge(syncLabels)
// ReconcilerErrorsView only keeps the LastValue, so we don't need to aggregate
Expand All @@ -337,63 +337,61 @@ func metricReconcilerErrorsHasValue(nt *NT, syncLabels prometheusmodel.LabelSet,
// metricResourceFightsHasValueAtLeast returns a MetricsPredicate that validates
// that ResourceFights has at least the expected value.
// If the expected value is zero, the metric must be zero or not found.
func metricResourceFightsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int) MetricsPredicate {
func metricResourceFightsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int, duration time.Duration) MetricsPredicate {
return func(ctx context.Context, v1api prometheusv1.API) error {
metricName := ocmetrics.ResourceFightsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
}.Merge(syncLabels)
// ResourceFightsView counts the total number of ResourceFights, so we don't need to aggregate
query := fmt.Sprintf("%s%s", metricName, labels)
if value == 0 {
// Tolerate missing metrics when expecting a zero value.
// Don't allow any value other than zero.
return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value))
query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String())
return metricExistsWithValue(ctx, nt, v1api, query, 0)
}
query := fmt.Sprintf("%s%s", metricName, labels)
return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value))
}
}

// metricResourceConflictsHasValueAtLeast returns a MetricsPredicate that
// validates that ResourceConflicts has at least the expected value.
// If the expected value is zero, the metric must be zero or not found.
func metricResourceConflictsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, commitHash string, value int) MetricsPredicate {
// validates whether ResourceConflicts has been recorded recently
// If the expected value is zero, the rate at which ResourceConflicts occur is also zero.
func metricResourceConflictsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, commitHash string, value int, duration time.Duration) MetricsPredicate {
return func(ctx context.Context, v1api prometheusv1.API) error {
metricName := ocmetrics.ResourceConflictsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
if value == 0 {
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): ocmetrics.Initialization,
}.Merge(syncLabels)
query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String())
return metricExistsWithValue(ctx, nt, v1api, query, 0)
}
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash),
}.Merge(syncLabels)
// ResourceConflictsView counts the total number of ResourceConflicts, so we don't need to aggregate
query := fmt.Sprintf("%s%s", metricName, labels)
if value == 0 {
// Tolerate missing metrics when expecting a zero value.
// Don't allow any value other than zero.
return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value))
}
return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value))
}
}

// metricInternalErrorsHasValueAtLeast returns a MetricsPredicate that validates
// that InternalErrors has at least the expected value.
// If the expected value is zero, the metric must be zero or not found.
func metricInternalErrorsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int) MetricsPredicate {
// whether InternalErrors has been recorded recently.
// If the expected value is zero, the rate at which InternalErrors occur is also zero.
func metricInternalErrorsHasValueAtLeast(nt *NT, syncLabels prometheusmodel.LabelSet, value int, duration time.Duration) MetricsPredicate {
return func(ctx context.Context, v1api prometheusv1.API) error {
metricName := ocmetrics.InternalErrorsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
}.Merge(syncLabels)
// InternalErrorsView counts the total number of InternalErrors, so we don't need to aggregate
query := fmt.Sprintf("%s%s", metricName, labels)
if value == 0 {
// Tolerate missing metrics when expecting a zero value.
// Don't allow any value other than zero.
return metricExistsWithValueOrDoesNotExist(ctx, nt, v1api, query, float64(value))
query := fmt.Sprintf("rate(%s%s[%s])", metricName, labels, duration.String())
return metricExistsWithValue(ctx, nt, v1api, query, 0)
}
query := fmt.Sprintf("%s%s", metricName, labels)
return metricExistsWithValueAtLeast(ctx, nt, v1api, query, float64(value))
}
}
Expand All @@ -402,7 +400,7 @@ func metricLastSyncTimestampHasStatus(ctx context.Context, nt *NT, v1api prometh
metricName := ocmetrics.LastSyncTimestampView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -415,7 +413,7 @@ func metricLastApplyTimestampHasStatus(ctx context.Context, nt *NT, v1api promet
metricName := ocmetrics.LastApplyTimestampView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -429,7 +427,7 @@ func metricApplyDurationViewHasStatus(ctx context.Context, nt *NT, v1api prometh
// ApplyDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -441,7 +439,7 @@ func metricDeclaredResourcesViewHasValue(ctx context.Context, nt *NT, v1api prom
metricName := ocmetrics.DeclaredResourcesView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyCommit.Name()): prometheusmodel.LabelValue(commitHash),
}.Merge(syncLabels)
// DeclaredResourcesView only keeps the LastValue, so we don't need to aggregate
Expand All @@ -454,7 +452,7 @@ func metricAPICallDurationViewOperationHasStatus(ctx context.Context, nt *NT, v1
// APICallDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -466,8 +464,8 @@ func metricApplyOperationsViewHasValueAtLeast(ctx context.Context, nt *NT, v1api
metricName := ocmetrics.ApplyOperationsView.Name
metricName = fmt.Sprintf("%s%s", prometheusConfigSyncMetricPrefix, metricName)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): prometheusmodel.LabelValue(ocmetrics.ApplierController),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyController.Name()): ocmetrics.ApplierController,
prometheusmodel.LabelName(ocmetrics.KeyOperation.Name()): prometheusmodel.LabelValue(operation),
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
Expand All @@ -481,7 +479,7 @@ func metricRemediateDurationViewHasStatus(ctx context.Context, nt *NT, v1api pro
// RemediateDurationView is a distribution. Query count to aggregate.
metricName = fmt.Sprintf("%s%s%s", prometheusConfigSyncMetricPrefix, metricName, prometheusDistributionCountSuffix)
labels := prometheusmodel.LabelSet{
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): prometheusmodel.LabelValue(ocmetrics.OtelCollectorName),
prometheusmodel.LabelName(ocmetrics.KeyComponent.Name()): ocmetrics.OtelCollectorName,
prometheusmodel.LabelName(ocmetrics.KeyStatus.Name()): prometheusmodel.LabelValue(status),
}.Merge(syncLabels)
query := fmt.Sprintf("%s%s", metricName, labels)
Expand Down
25 changes: 19 additions & 6 deletions pkg/metrics/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"kpt.dev/configsync/pkg/status"
)

// Initialization is the tag value to initiate a Count
const Initialization = "cs_counter_initialization"

func record(ctx context.Context, ms ...stats.Measurement) {
stats.Record(ctx, ms...)
if klog.V(5).Enabled() {
Expand Down Expand Up @@ -136,11 +139,12 @@ func RecordApplyDuration(ctx context.Context, status, commit string, startTime t
}

// RecordResourceFight produces measurements for the ResourceFights view.
func RecordResourceFight(ctx context.Context, _ string) {
//tagCtx, _ := tag.New(ctx,
//tag.Upsert(KeyName, GetResourceLabels()),
//tag.Upsert(KeyOperation, operation),
//)
func RecordResourceFight(ctx context.Context, tag string) {
if tag == Initialization {
measurement := ResourceFights.M(0)
record(ctx, measurement)
return
}
measurement := ResourceFights.M(1)
record(ctx, measurement)
}
Expand All @@ -157,16 +161,25 @@ func RecordRemediateDuration(ctx context.Context, status string, startTime time.
// RecordResourceConflict produces measurements for the ResourceConflicts view.
func RecordResourceConflict(ctx context.Context, commit string) {
tagCtx, _ := tag.New(ctx,
// tag.Upsert(KeyName, GetResourceLabels()),
tag.Upsert(KeyCommit, commit),
)
if commit == Initialization {
measurement := ResourceConflicts.M(0)
record(tagCtx, measurement)
return
}
measurement := ResourceConflicts.M(1)
record(tagCtx, measurement)
}

// RecordInternalError produces measurements for the InternalErrors view.
func RecordInternalError(ctx context.Context, source string) {
tagCtx, _ := tag.New(ctx, tag.Upsert(KeyInternalErrorSource, source))
if source == Initialization {
measurement := InternalErrors.M(0)
record(tagCtx, measurement)
return
}
measurement := InternalErrors.M(1)
record(tagCtx, measurement)
}
6 changes: 6 additions & 0 deletions pkg/parse/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ type RunFunc func(ctx context.Context, p Parser, trigger string, state *reconcil

// Run keeps checking whether a parse-apply-watch loop is necessary and starts a loop if needed.
func Run(ctx context.Context, p Parser, nsControllerState *namespacecontroller.State, runOpts RunOpts) {
// Pre-initialize count metrics to ensure visibility, even if errors haven't occurred.
// Special label indicates initial state. Metrics are cumulative and used for rate aggregation
// to detect ongoing errors. Zero value means no recent errors.
metrics.RecordResourceFight(ctx, metrics.Initialization)
metrics.RecordInternalError(ctx, metrics.Initialization)
metrics.RecordResourceConflict(ctx, metrics.Initialization)
opts := p.options()
// Use timers, not tickers.
// Tickers can cause memory leaks and continuous execution, when execution
Expand Down