diff --git a/lib/autoupdate/rollout/controller.go b/lib/autoupdate/rollout/controller.go index 7ac3861e2bce0..8f05e2c45ccca 100644 --- a/lib/autoupdate/rollout/controller.go +++ b/lib/autoupdate/rollout/controller.go @@ -25,6 +25,7 @@ import ( "github.com/gravitational/trace" "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/utils/retryutils" @@ -45,13 +46,14 @@ type Controller struct { clock clockwork.Clock log *slog.Logger period time.Duration + metrics *metrics } // NewController creates a new Controller for the autoupdate_agent_rollout kind. // The period can be specified to control the sync frequency. This is mainly // used to speed up tests or for demo purposes. When empty, the controller picks // a sane default value. -func NewController(client Client, log *slog.Logger, clock clockwork.Clock, period time.Duration) (*Controller, error) { +func NewController(client Client, log *slog.Logger, clock clockwork.Clock, period time.Duration, reg prometheus.Registerer) (*Controller, error) { if client == nil { return nil, trace.BadParameter("missing client") } @@ -61,6 +63,9 @@ func NewController(client Client, log *slog.Logger, clock clockwork.Clock, perio if clock == nil { return nil, trace.BadParameter("missing clock") } + if reg == nil { + return nil, trace.BadParameter("missing prometheus.Registerer") + } if period <= 0 { period = defaultReconcilerPeriod @@ -77,13 +82,17 @@ func NewController(client Client, log *slog.Logger, clock clockwork.Clock, perio return nil, trace.Wrap(err, "failed to initialize time-based strategy") } + m := newMetrics(reg) + return &Controller{ - clock: clock, - log: log, + metrics: m, + clock: clock, + log: log, reconciler: reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: m, rolloutStrategies: []rolloutStrategy{ timeBased, haltOnError, @@ -122,13 +131,22 @@ func (c *Controller) Run(ctx context.Context) error { // tryAndCatch tries to run the controller reconciliation logic and recovers from potential panic by converting them // into errors. This ensures that a critical bug in the reconciler cannot bring down the whole Teleport cluster. func (c *Controller) tryAndCatch(ctx context.Context) (err error) { + startTime := c.clock.Now() // If something terribly bad happens during the reconciliation, we recover and return an error defer func() { if r := recover(); r != nil { c.log.ErrorContext(ctx, "Recovered from panic in the autoupdate_agent_rollout controller", "panic", r) err = trace.NewAggregate(err, trace.Errorf("Panic recovered during reconciliation: %v", r)) + c.metrics.observeReconciliation(metricsReconciliationResultLabelValuePanic, c.clock.Now().Sub(startTime)) } }() + err = trace.Wrap(c.reconciler.reconcile(ctx)) + endTime := c.clock.Now() + result := metricsReconciliationResultLabelValueSuccess + if err != nil { + result = metricsReconciliationResultLabelValueFail + } + c.metrics.observeReconciliation(result, endTime.Sub(startTime)) return } diff --git a/lib/autoupdate/rollout/metrics.go b/lib/autoupdate/rollout/metrics.go new file mode 100644 index 0000000000000..b18e6fc0bb8cc --- /dev/null +++ b/lib/autoupdate/rollout/metrics.go @@ -0,0 +1,332 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package rollout + +import ( + "fmt" + "maps" + "slices" + "strconv" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/constraints" + + "github.com/gravitational/teleport" + autoupdatepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" +) + +const ( + metricsSubsystem = "agent_autoupdates" + metricVersionLabelRetention = 24 * time.Hour +) + +type metrics struct { + // lock protects previousVersions and groupCount. + // This should only be acquired by setVersionMetric. + lock sync.Mutex + + // previousVersions is a list of the version we exported metrics for. + // We track those to zero every old version if metrics labels contain the version. + previousVersions map[string]time.Time + groupCount int + + // controller metrics + reconciliations *prometheus.CounterVec + reconciliationDuration *prometheus.HistogramVec + reconciliationTries *prometheus.CounterVec + reconciliationTryDuration *prometheus.HistogramVec + + // resource spec metrics + versionPresent prometheus.Gauge + versionStart *prometheus.GaugeVec + versionTarget *prometheus.GaugeVec + versionMode prometheus.Gauge + + configPresent prometheus.Gauge + configMode prometheus.Gauge + + rolloutPresent prometheus.Gauge + rolloutStart *prometheus.GaugeVec + rolloutTarget *prometheus.GaugeVec + rolloutMode prometheus.Gauge + + // rollout status metrics + rolloutTimeOverride prometheus.Gauge + rolloutState prometheus.Gauge + rolloutGroupState *prometheus.GaugeVec +} + +const ( + metricsReconciliationResultLabelName = "result" + metricsReconciliationResultLabelValueFail = "fail" + metricsReconciliationResultLabelValuePanic = "panic" + metricsReconciliationResultLabelValueRetry = "retry" + metricsReconciliationResultLabelValueSuccess = "success" + + metricsGroupNumberLabelName = "group_number" + metricsVersionLabelName = "version" +) + +func newMetrics(reg prometheus.Registerer) *metrics { + m := metrics{ + previousVersions: make(map[string]time.Time), + reconciliations: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "reconciliations_total", + Help: "Count the rollout reconciliations triggered by the controller, and their result (success, failure, panic). One reconciliation might imply several tries in case of conflict.", + }, []string{metricsReconciliationResultLabelName}), + reconciliationDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "reconciliation_duration_seconds", + Help: "Time spent reconciling the autoupdate_agent_rollout resource. One reconciliation might imply several tries in case of conflict.", + }, []string{metricsReconciliationResultLabelName}), + reconciliationTries: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "reconciliation_tries_total", + Help: "Count the rollout reconciliations tried by the controller, and their result (success, failure, conflict).", + }, []string{metricsReconciliationResultLabelName}), + reconciliationTryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "reconciliation_try_duration_seconds", + Help: "Time spent trying to reconcile the autoupdate_agent_rollout resource.", + }, []string{metricsReconciliationResultLabelName}), + + versionPresent: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_version_present", + Help: "Boolean describing if an autoupdate_version resource exists in Teleport and its 'spec.agents' field is not nil.", + }), + versionTarget: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_version_target", + Help: "Metric describing the agent target version from the autoupdate_version resource.", + }, []string{metricsVersionLabelName}), + versionStart: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_version_start", + Help: "Metric describing the agent start version from the autoupdate_version resource.", + }, []string{"version"}), + versionMode: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_version_mode", + Help: fmt.Sprintf("Metric describing the agent update mode from the autoupdate_version resource. %s", valuesHelpString(codeToAgentMode)), + }), + + configPresent: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_config_present", + Help: "Boolean describing if an autoupdate_config resource exists in Teleport and its 'spec.agents' field is not nil.", + }), + configMode: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_config_mode", + Help: fmt.Sprintf("Metric describing the agent update mode from the autoupdate_agent_config resource. %s", valuesHelpString(codeToAgentMode)), + }), + + rolloutPresent: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_present", + Help: "Boolean describing if an autoupdate_agent_rollout resource exists in Teleport.", + }), + rolloutTarget: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_target", + Help: "Metric describing the agent target version from the autoupdate_gent_rollout resource.", + }, []string{metricsVersionLabelName}), + rolloutStart: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_start", + Help: "Metric describing the agent start version from the autoupdate_agent_rollout resource.", + }, []string{metricsVersionLabelName}), + rolloutMode: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_mode", + Help: fmt.Sprintf("Metric describing the agent update mode from the autoupdate_agent_rollout resource. %s", valuesHelpString(codeToAgentMode)), + }), + rolloutTimeOverride: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_time_override_timestamp_seconds", + Help: "Describes the autoupdate_agent_rollout time override if set in (seconds since epoch). Zero means no time override.", + }), + rolloutState: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_state", + Help: fmt.Sprintf("Describes the autoupdate_agent_rollout state. %s", valuesHelpString(autoupdatepb.AutoUpdateAgentRolloutState_name)), + }), + rolloutGroupState: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: teleport.MetricNamespace, + Subsystem: metricsSubsystem, + Name: "agent_rollout_group_state", + Help: fmt.Sprintf("Describes the autoupdate_agent_rollout state for each group. Groups are identified by their position in the schedule. %s", valuesHelpString(autoupdatepb.AutoUpdateAgentGroupState_name)), + }, []string{metricsGroupNumberLabelName}), + } + + reg.MustRegister(m.reconciliations) + reg.MustRegister(m.reconciliationDuration) + reg.MustRegister(m.reconciliationTries) + reg.MustRegister(m.reconciliationTryDuration) + + reg.MustRegister(m.versionPresent) + reg.MustRegister(m.versionTarget) + reg.MustRegister(m.versionStart) + reg.MustRegister(m.versionMode) + reg.MustRegister(m.configPresent) + reg.MustRegister(m.configMode) + reg.MustRegister(m.rolloutPresent) + reg.MustRegister(m.rolloutTarget) + reg.MustRegister(m.rolloutStart) + reg.MustRegister(m.rolloutMode) + + reg.MustRegister(m.rolloutTimeOverride) + reg.MustRegister(m.rolloutState) + reg.MustRegister(m.rolloutGroupState) + + return &m +} + +func valuesHelpString[K constraints.Integer](possibleValues map[K]string) string { + sb := strings.Builder{} + sb.WriteString("Possible values are") + + // maps are nor ordered, so we must sort keys to consistently generate the help message. + keys := maps.Keys(possibleValues) + for _, k := range slices.Sorted(keys) { + sb.WriteString(fmt.Sprintf(" %d:%s", k, possibleValues[k])) + } + + sb.WriteRune('.') + return sb.String() +} + +func (m *metrics) setVersionMetric(version string, metric *prometheus.GaugeVec, now time.Time) { + m.lock.Lock() + defer m.lock.Unlock() + + // for every version we've seen + for v, ts := range m.previousVersions { + labels := prometheus.Labels{metricsVersionLabelName: v} + // if the version is too old, we forget about it to limit cardinality + if now.After(ts.Add(metricVersionLabelRetention)) { + metric.Delete(labels) + delete(m.previousVersions, v) + } else { + // Else we just mark the version as not set anymore + metric.With(labels).Set(0) + } + } + // We set the new version + metric.With(prometheus.Labels{metricsVersionLabelName: version}).Set(1) + m.previousVersions[version] = now +} + +func (m *metrics) observeReconciliation(result string, duration time.Duration) { + m.reconciliations.With(prometheus.Labels{metricsReconciliationResultLabelName: result}) + m.reconciliationDuration.With(prometheus.Labels{metricsReconciliationResultLabelName: result}).Observe(duration.Seconds()) +} + +func (m *metrics) observeReconciliationTry(result string, duration time.Duration) { + m.reconciliationTries.With(prometheus.Labels{metricsReconciliationResultLabelName: result}) + m.reconciliationTryDuration.With(prometheus.Labels{metricsReconciliationResultLabelName: result}).Observe(duration.Seconds()) +} + +func (m *metrics) observeConfig(config *autoupdatepb.AutoUpdateConfig) { + if config.GetSpec().GetAgents() == nil { + m.configPresent.Set(0) + m.configMode.Set(float64(agentModeCode[defaultConfigMode])) + return + } + m.configPresent.Set(1) + m.configMode.Set(float64(agentModeCode[config.GetSpec().GetAgents().GetMode()])) +} + +func (m *metrics) observeVersion(version *autoupdatepb.AutoUpdateVersion, now time.Time) { + if version.GetSpec().GetAgents() == nil { + m.versionPresent.Set(0) + m.versionMode.Set(float64(agentModeCode[defaultConfigMode])) + return + } + m.versionPresent.Set(1) + m.versionMode.Set(float64(agentModeCode[version.GetSpec().GetAgents().GetMode()])) + m.setVersionMetric(version.GetSpec().GetAgents().GetStartVersion(), m.versionStart, now) + m.setVersionMetric(version.GetSpec().GetAgents().GetTargetVersion(), m.versionTarget, now) +} + +func (m *metrics) setGroupStates(groups []*autoupdatepb.AutoUpdateAgentRolloutStatusGroup) { + m.lock.Lock() + defer m.lock.Unlock() + + // Set the state for the groups specified in the rollout. + for i, group := range groups { + labels := prometheus.Labels{metricsGroupNumberLabelName: strconv.Itoa(i)} + m.rolloutGroupState.With(labels).Set(float64(group.State)) + } + + // If we have as many or more groups than before, no cleanup to do. + if len(groups) >= m.groupCount { + m.groupCount = len(groups) + return + } + + // If we have less groups than before, we must unset the metrics for higher group numbers. + for i := len(groups); i < m.groupCount; i++ { + labels := prometheus.Labels{metricsGroupNumberLabelName: strconv.Itoa(i)} + m.rolloutGroupState.With(labels).Set(float64(0)) + } + m.groupCount = len(groups) +} + +func (m *metrics) observeRollout(rollout *autoupdatepb.AutoUpdateAgentRollout, now time.Time) { + if rollout.GetSpec() == nil { + m.rolloutPresent.Set(0) + m.rolloutMode.Set(0) + } else { + m.rolloutPresent.Set(1) + m.rolloutMode.Set(float64(agentModeCode[rollout.GetSpec().GetAutoupdateMode()])) + m.setVersionMetric(rollout.GetSpec().GetStartVersion(), m.rolloutStart, now) + m.setVersionMetric(rollout.GetSpec().GetTargetVersion(), m.rolloutTarget, now) + } + + if to := rollout.GetStatus().GetTimeOverride().AsTime(); !(to.IsZero() || to.Unix() == 0) { + m.rolloutTimeOverride.Set(float64(to.Second())) + } else { + m.rolloutTimeOverride.Set(0) + } + + m.rolloutState.Set(float64(rollout.GetStatus().GetState())) + m.setGroupStates(rollout.GetStatus().GetGroups()) +} diff --git a/lib/autoupdate/rollout/metrics_test.go b/lib/autoupdate/rollout/metrics_test.go new file mode 100644 index 0000000000000..acb5596706cec --- /dev/null +++ b/lib/autoupdate/rollout/metrics_test.go @@ -0,0 +1,289 @@ +/* + * Teleport + * Copyright (C) 2025 Gravitational, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package rollout + +import ( + "testing" + "time" + + "github.com/jonboulle/clockwork" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + + autoupdatepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/autoupdate/v1" +) + +func newMetricsForTest(t *testing.T) *metrics { + reg := prometheus.NewRegistry() + return newMetrics(reg) +} + +func Test_setVersionMetric(t *testing.T) { + now := clockwork.NewFakeClock().Now() + aMinuteAgo := now.Add(-time.Minute) + aWeekAgo := now.Add(-time.Hour * 24 * 7) + testVersion := "1.2.3-alpha.1" + previousVersion := "1.2.1" + testMetricLabels := []string{metricsVersionLabelName} + tests := []struct { + name string + previousVersions map[string]time.Time + previousMetrics map[string]float64 + expectedVersions map[string]time.Time + expectedMetrics map[string]float64 + }{ + { + name: "no versions", + previousVersions: map[string]time.Time{}, + previousMetrics: map[string]float64{}, + expectedVersions: map[string]time.Time{ + testVersion: now, + }, + expectedMetrics: map[string]float64{ + testVersion: 1, + }, + }, + { + name: "same version, not expired", + previousVersions: map[string]time.Time{ + testVersion: aMinuteAgo, + }, + previousMetrics: map[string]float64{ + testVersion: 1, + }, + expectedVersions: map[string]time.Time{ + testVersion: now, + }, + expectedMetrics: map[string]float64{ + testVersion: 1, + }, + }, + { + name: "same version, expired", + previousVersions: map[string]time.Time{ + testVersion: aWeekAgo, + }, + previousMetrics: map[string]float64{ + testVersion: 1, + }, + expectedVersions: map[string]time.Time{ + testVersion: now, + }, + expectedMetrics: map[string]float64{ + testVersion: 1, + }, + }, + { + name: "old non-expired versions", + previousVersions: map[string]time.Time{ + previousVersion: aMinuteAgo, + }, + previousMetrics: map[string]float64{ + previousVersion: 1, + }, + expectedVersions: map[string]time.Time{ + previousVersion: aMinuteAgo, + testVersion: now, + }, + expectedMetrics: map[string]float64{ + previousVersion: 0, + testVersion: 1, + }, + }, + { + name: "old expired versions", + previousVersions: map[string]time.Time{ + previousVersion: aWeekAgo, + }, + previousMetrics: map[string]float64{ + previousVersion: 1, + }, + expectedVersions: map[string]time.Time{ + testVersion: now, + }, + expectedMetrics: map[string]float64{ + testVersion: 1, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + // Test setup: create metrics and load previous metrics. + m := metrics{ + previousVersions: test.previousVersions, + } + + testGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, testMetricLabels) + for k, v := range test.previousMetrics { + testGauge.With(prometheus.Labels{testMetricLabels[0]: k}).Set(v) + } + + // Test execution: set the version metric. + m.setVersionMetric(testVersion, testGauge, now) + + // Test validation: collect the metrics and check that the state match what we expect. + require.Equal(t, test.expectedVersions, m.previousVersions) + metricsChan := make(chan prometheus.Metric, 100) + testGauge.Collect(metricsChan) + close(metricsChan) + metricsResult := collectMetricsByLabel(t, metricsChan, testMetricLabels[0]) + require.Equal(t, test.expectedMetrics, metricsResult) + }) + } +} + +func Test_setGroupStates(t *testing.T) { + testMetricLabels := []string{metricsGroupNumberLabelName} + testGroups := []*autoupdatepb.AutoUpdateAgentRolloutStatusGroup{ + {State: autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE}, + {State: autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE}, + {State: autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED}, + } + tests := []struct { + name string + previousGroupCount int + previousMetrics map[string]float64 + expectedGroupCount int + expectedMetrics map[string]float64 + }{ + { + name: "no groups", + previousGroupCount: 0, + previousMetrics: map[string]float64{}, + expectedGroupCount: len(testGroups), + expectedMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + }, + { + name: "same groups, same states", + previousGroupCount: len(testGroups), + previousMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + expectedGroupCount: len(testGroups), + expectedMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + }, + { + name: "same groups, different states", + previousGroupCount: len(testGroups), + previousMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + expectedGroupCount: len(testGroups), + expectedMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + }, + { + name: "less groups", + previousGroupCount: 1, + previousMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + }, + expectedGroupCount: len(testGroups), + expectedMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + }, + { + name: "more groups", + previousGroupCount: 5, + previousMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + "3": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + "4": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + }, + expectedGroupCount: len(testGroups), + expectedMetrics: map[string]float64{ + "0": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_DONE), + "1": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_ACTIVE), + "2": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSTARTED), + "3": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSPECIFIED), + "4": float64(autoupdatepb.AutoUpdateAgentGroupState_AUTO_UPDATE_AGENT_GROUP_STATE_UNSPECIFIED), + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + + testGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{}, testMetricLabels) + for k, v := range test.previousMetrics { + testGauge.With(prometheus.Labels{testMetricLabels[0]: k}).Set(v) + } + + // Test setup: create metrics and load previous metrics. + m := metrics{ + groupCount: test.previousGroupCount, + rolloutGroupState: testGauge, + } + + // Test execution: set the version metric. + m.setGroupStates(testGroups) + + // Test validation: collect the metrics and check that the state match what we expect. + require.Equal(t, test.expectedGroupCount, m.groupCount) + metricsChan := make(chan prometheus.Metric, 100) + m.rolloutGroupState.Collect(metricsChan) + close(metricsChan) + metricsResult := collectMetricsByLabel(t, metricsChan, testMetricLabels[0]) + require.Equal(t, test.expectedMetrics, metricsResult) + + }) + } +} + +func collectMetricsByLabel(t *testing.T, ch <-chan prometheus.Metric, labelName string) map[string]float64 { + t.Helper() + result := make(map[string]float64) + + var protoMetric dto.Metric + for { + m, ok := <-ch + if !ok { + return result + } + require.NoError(t, m.Write(&protoMetric)) + ll := protoMetric.GetLabel() + require.Len(t, ll, 1) + require.Equal(t, labelName, ll[0].GetName()) + gg := protoMetric.GetGauge() + require.NotNil(t, gg) + result[ll[0].GetValue()] = gg.GetValue() + } +} diff --git a/lib/autoupdate/rollout/reconciler.go b/lib/autoupdate/rollout/reconciler.go index 2a3282fa4670b..96ebf8791f257 100644 --- a/lib/autoupdate/rollout/reconciler.go +++ b/lib/autoupdate/rollout/reconciler.go @@ -56,9 +56,10 @@ var ( // - we reconcile 2 resources with one // - both input and output are singletons, we don't need the multi resource logic nor stream/paginated APIs type reconciler struct { - clt Client - log *slog.Logger - clock clockwork.Clock + clt Client + log *slog.Logger + clock clockwork.Clock + metrics *metrics rolloutStrategies []rolloutStrategy @@ -74,6 +75,8 @@ func (r *reconciler) reconcile(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, reconciliationTimeout) defer cancel() + + var startTime time.Time tries := 0 var err error for tries < maxConflictRetry { @@ -82,17 +85,22 @@ func (r *reconciler) reconcile(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() default: + startTime = r.clock.Now() err = r.tryReconcile(ctx) + duration := r.clock.Since(startTime) switch { case err == nil: + r.metrics.observeReconciliationTry(metricsReconciliationResultLabelValueSuccess, duration) return nil case trace.IsCompareFailed(err), trace.IsNotFound(err): // The resource changed since we last saw it // We must have raced against another auth // Let's retry the reconciliation r.log.DebugContext(ctx, "retrying reconciliation", "error", err) + r.metrics.observeReconciliationTry(metricsReconciliationResultLabelValueRetry, duration) default: // error is non-nil and non-retryable + r.metrics.observeReconciliationTry(metricsReconciliationResultLabelValueFail, duration) return trace.Wrap(err, "failed to reconcile rollout") } } @@ -105,7 +113,7 @@ func (r *reconciler) reconcile(ctx context.Context) error { // The creation/update/deletion can fail with a trace.CompareFailedError or trace.NotFoundError // if the resource change while we were computing it. // The caller must handle those error and retry the reconciliation. -func (r *reconciler) tryReconcile(ctx context.Context) error { +func (r *reconciler) tryReconcile(ctx context.Context) (err error) { // get autoupdate_config var config *autoupdate.AutoUpdateConfig if c, err := r.clt.GetAutoUpdateConfig(ctx); err == nil { @@ -113,6 +121,7 @@ func (r *reconciler) tryReconcile(ctx context.Context) error { } else if !trace.IsNotFound(err) { return trace.Wrap(err, "getting autoupdate_config") } + r.metrics.observeConfig(config) // get autoupdate_version var version *autoupdate.AutoUpdateVersion @@ -121,10 +130,11 @@ func (r *reconciler) tryReconcile(ctx context.Context) error { } else if !trace.IsNotFound(err) { return trace.Wrap(err, "getting autoupdate version") } + r.metrics.observeVersion(version, r.clock.Now()) // get autoupdate_agent_rollout rolloutExists := true - existingRollout, err := r.clt.GetAutoUpdateAgentRollout(ctx) + rollout, err := r.clt.GetAutoUpdateAgentRollout(ctx) if err != nil && !trace.IsNotFound(err) { return trace.Wrap(err, "getting autoupdate_agent_rollout") } @@ -133,13 +143,24 @@ func (r *reconciler) tryReconcile(ctx context.Context) error { rolloutExists = false } + // We observe the current rollout. + r.metrics.observeRollout(rollout, r.clock.Now()) + // If the reconciliation succeeded, we observe the rollout again to reflect its new values. + defer func() { + if err != nil { + return + } + r.metrics.observeRollout(rollout, r.clock.Now()) + }() + // if autoupdate_version does not exist or does not contain spec.agents, we should not configure a rollout if version.GetSpec().GetAgents() == nil { if !rolloutExists { // the rollout doesn't exist, nothing to do return nil } - // the rollout exists, we must delete it + // the rollout exists, we must delete it. We also clear the rollout object for metrics purposes. + rollout = nil return r.clt.DeleteAutoUpdateAgentRollout(ctx) } @@ -148,14 +169,14 @@ func (r *reconciler) tryReconcile(ctx context.Context) error { if err != nil { return trace.Wrap(err, "mutating rollout") } - newStatus, err := r.computeStatus(ctx, existingRollout, newSpec, config.GetSpec().GetAgents().GetSchedules()) + newStatus, err := r.computeStatus(ctx, rollout, newSpec, config.GetSpec().GetAgents().GetSchedules()) if err != nil { return trace.Wrap(err, "computing rollout status") } // We compute if something changed. - specChanged := !proto.Equal(existingRollout.GetSpec(), newSpec) - statusChanged := !proto.Equal(existingRollout.GetStatus(), newStatus) + specChanged := !proto.Equal(rollout.GetSpec(), newSpec) + statusChanged := !proto.Equal(rollout.GetStatus(), newStatus) rolloutChanged := specChanged || statusChanged // if nothing changed, no need to update the resource @@ -167,25 +188,25 @@ func (r *reconciler) tryReconcile(ctx context.Context) error { // if there are no existing rollout, we create a new one and set the status if !rolloutExists { r.log.DebugContext(ctx, "creating rollout") - rollout, err := update.NewAutoUpdateAgentRollout(newSpec) + rollout, err = update.NewAutoUpdateAgentRollout(newSpec) rollout.Status = newStatus if err != nil { return trace.Wrap(err, "validating new rollout") } - _, err = r.clt.CreateAutoUpdateAgentRollout(ctx, rollout) + rollout, err = r.clt.CreateAutoUpdateAgentRollout(ctx, rollout) return trace.Wrap(err, "creating rollout") } r.log.DebugContext(ctx, "updating rollout") // If there was a previous rollout, we update its spec and status and do an update. // We don't create a new resource to keep the metadata containing the revision ID. - existingRollout.Spec = newSpec - existingRollout.Status = newStatus - err = update.ValidateAutoUpdateAgentRollout(existingRollout) + rollout.Spec = newSpec + rollout.Status = newStatus + err = update.ValidateAutoUpdateAgentRollout(rollout) if err != nil { return trace.Wrap(err, "validating mutated rollout") } - _, err = r.clt.UpdateAutoUpdateAgentRollout(ctx, existingRollout) + rollout, err = r.clt.UpdateAutoUpdateAgentRollout(ctx, rollout) return trace.Wrap(err, "updating rollout") } @@ -216,14 +237,14 @@ func (r *reconciler) buildRolloutSpec(config *autoupdate.AutoUpdateConfigSpecAge // When config and version modes don't match, the lowest integer takes precedence. var ( agentModeCode = map[string]int{ - update.AgentsUpdateModeDisabled: 0, - update.AgentsUpdateModeSuspended: 1, - update.AgentsUpdateModeEnabled: 2, + update.AgentsUpdateModeDisabled: 1, + update.AgentsUpdateModeSuspended: 2, + update.AgentsUpdateModeEnabled: 3, } codeToAgentMode = map[int]string{ - 0: update.AgentsUpdateModeDisabled, - 1: update.AgentsUpdateModeSuspended, - 2: update.AgentsUpdateModeEnabled, + 1: update.AgentsUpdateModeDisabled, + 2: update.AgentsUpdateModeSuspended, + 3: update.AgentsUpdateModeEnabled, } ) diff --git a/lib/autoupdate/rollout/reconciler_test.go b/lib/autoupdate/rollout/reconciler_test.go index af14136a3d156..ea867fb01a742 100644 --- a/lib/autoupdate/rollout/reconciler_test.go +++ b/lib/autoupdate/rollout/reconciler_test.go @@ -20,7 +20,6 @@ package rollout import ( "context" - "sync" "testing" "time" @@ -317,9 +316,10 @@ func TestTryReconcile(t *testing.T) { // Test execution: Running the reconciliation reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } require.NoError(t, reconciler.tryReconcile(ctx)) @@ -389,8 +389,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -411,9 +413,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -436,9 +439,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -477,9 +481,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -516,9 +521,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -541,9 +547,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -572,9 +579,10 @@ func TestReconciler_Reconcile(t *testing.T) { client := newMockClient(t, stubs) reconciler := &reconciler{ - clt: client, - log: log, - clock: clock, + clt: client, + log: log, + clock: clock, + metrics: newMetricsForTest(t), } // Test execution: run the reconciliation loop @@ -874,7 +882,7 @@ func Test_reconciler_computeStatus(t *testing.T) { log: log, clock: clock, rolloutStrategies: []rolloutStrategy{strategy}, - mutex: sync.Mutex{}, + metrics: newMetricsForTest(t), } result, err := r.computeStatus(ctx, tt.existingRollout, tt.newSpec, schedules) require.NoError(t, err) diff --git a/lib/service/service.go b/lib/service/service.go index 5bf46b842b8ea..106fc17dcae05 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2465,7 +2465,7 @@ func (process *TeleportProcess) initAuthService() error { return trace.Wrap(spiffeFedSyncer.Run(process.GracefulExitContext()), "running SPIFFEFederation Syncer") }) - agentRolloutController, err := rollout.NewController(authServer, logger, process.Clock, cfg.Auth.AgentRolloutControllerSyncPeriod) + agentRolloutController, err := rollout.NewController(authServer, logger, process.Clock, cfg.Auth.AgentRolloutControllerSyncPeriod, process.metricsRegistry) if err != nil { return trace.Wrap(err, "creating the rollout controller") }